1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "dsched_continue_manager.h"
17
18 #include <chrono>
19 #include <sys/prctl.h>
20
21 #include "cJSON.h"
22
23 #include "continue_scene_session_handler.h"
24 #include "dfx/distributed_radar.h"
25 #include "distributed_sched_utils.h"
26 #include "dsched_transport_softbus_adapter.h"
27 #include "dtbschedmgr_device_info_storage.h"
28 #include "dtbschedmgr_log.h"
29 #include "mission/distributed_bm_storage.h"
30 #include "mission/dms_continue_send_manager.h"
31 #include "mission/dms_continue_recv_manager.h"
32
33 namespace OHOS {
34 namespace DistributedSchedule {
35 namespace {
36 const std::string TAG = "DSchedContinueManager";
37 const std::string DSCHED_CONTINUE_MANAGER = "dsched_continue_manager";
38 const std::string CONTINUE_TIMEOUT_TASK = "continue_timeout_task";
39 }
40
41 IMPLEMENT_SINGLE_INSTANCE(DSchedContinueManager);
42
DSchedContinueManager()43 DSchedContinueManager::DSchedContinueManager()
44 {
45 }
46
~DSchedContinueManager()47 DSchedContinueManager::~DSchedContinueManager()
48 {
49 HILOGI("DSchedContinueManager delete");
50 UnInit();
51 }
52
Init()53 void DSchedContinueManager::Init()
54 {
55 HILOGI("Init DSchedContinueManager start");
56 if (eventHandler_ != nullptr) {
57 HILOGI("DSchedContinueManager already inited, end.");
58 return;
59 }
60 DSchedTransportSoftbusAdapter::GetInstance().InitChannel();
61 softbusListener_ = std::make_shared<DSchedContinueManager::SoftbusListener>();
62 DSchedTransportSoftbusAdapter::GetInstance().RegisterListener(SERVICE_TYPE_CONTINUE, softbusListener_);
63 eventThread_ = std::thread(&DSchedContinueManager::StartEvent, this);
64 std::unique_lock<std::mutex> lock(eventMutex_);
65 eventCon_.wait(lock, [this] {
66 return eventHandler_ != nullptr;
67 });
68 HILOGI("Init DSchedContinueManager end");
69 }
70
StartEvent()71 void DSchedContinueManager::StartEvent()
72 {
73 HILOGI("StartEvent start");
74 prctl(PR_SET_NAME, DSCHED_CONTINUE_MANAGER.c_str());
75 auto runner = AppExecFwk::EventRunner::Create(false);
76 {
77 std::lock_guard<std::mutex> lock(eventMutex_);
78 eventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
79 }
80 eventCon_.notify_one();
81 runner->Run();
82 HILOGI("StartEvent end");
83 }
84
UnInit()85 void DSchedContinueManager::UnInit()
86 {
87 HILOGI("UnInit start");
88 DSchedTransportSoftbusAdapter::GetInstance().UnregisterListener(SERVICE_TYPE_CONTINUE, softbusListener_);
89 DSchedTransportSoftbusAdapter::GetInstance().ReleaseChannel();
90 continues_.clear();
91 cntSink_ = 0;
92 cntSource_ = 0;
93
94 if (eventHandler_ != nullptr) {
95 eventHandler_->GetEventRunner()->Stop();
96 eventThread_.join();
97 eventHandler_ = nullptr;
98 } else {
99 HILOGE("eventHandler_ is nullptr");
100 }
101 HILOGI("UnInit end");
102 }
103
NotifyAllConnectDecision(std::string peerDeviceId, bool isSupport)104 void DSchedContinueManager::NotifyAllConnectDecision(std::string peerDeviceId, bool isSupport)
105 {
106 HILOGI("Notify all connect decision, peerDeviceId %{public}s, isSupport %{public}d.",
107 GetAnonymStr(peerDeviceId).c_str(), isSupport);
108 #ifdef DMSFWK_ALL_CONNECT_MGR
109 std::lock_guard<std::mutex> decisionLock(connectDecisionMutex_);
110 peerConnectDecision_[peerDeviceId] = isSupport;
111 connectDecisionCond_.notify_all();
112 #endif
113 }
114
ContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId, int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)115 int32_t DSchedContinueManager::ContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
116 int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
117 {
118 if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
119 HILOGE("srcDeviceId or dstDeviceId or callback is null!");
120 return INVALID_PARAMETERS_ERR;
121 }
122
123 std::string localDevId;
124 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
125 HILOGE("get local deviceId failed!");
126 return INVALID_PARAMETERS_ERR;
127 }
128 if (localDevId != srcDeviceId && localDevId != dstDeviceId) {
129 HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.",
130 GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str());
131 return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
132 }
133 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(
134 localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) {
135 HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.",
136 GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str());
137 return INVALID_REMOTE_PARAMETERS_ERR;
138 }
139
140 auto func = [this, srcDeviceId, dstDeviceId, missionId, callback, wantParams]() {
141 HandleContinueMission(srcDeviceId, dstDeviceId, missionId, callback, wantParams);
142 };
143 if (eventHandler_ == nullptr) {
144 HILOGE("eventHandler_ is nullptr");
145 return INVALID_PARAMETERS_ERR;
146 }
147 eventHandler_->PostTask(func);
148 return ERR_OK;
149 }
150
HandleContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId, int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)151 void DSchedContinueManager::HandleContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
152 int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
153 {
154 HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. missionId: %{public}d.",
155 GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), missionId);
156
157 if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
158 HILOGE("srcDeviceId or dstDeviceId or callback is null!");
159 return;
160 }
161
162 std::string localDevId;
163 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
164 HILOGE("get local deviceId failed!");
165 return;
166 }
167 DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, dstDeviceId, missionId);
168
169 AAFwk::MissionInfo missionInfo;
170 if (AAFwk::AbilityManagerClient::GetInstance()->GetMissionInfo("", missionId, missionInfo) == ERR_OK
171 && srcDeviceId == localDevId) {
172 info.sourceBundleName_ = missionInfo.want.GetBundle();
173 info.sinkBundleName_ = missionInfo.want.GetBundle();
174 }
175
176 HandleContinueMissionWithBundleName(info, callback, wantParams);
177 return;
178 }
179
ContinueMission(const DSchedContinueInfo& continueInfo, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)180 int32_t DSchedContinueManager::ContinueMission(const DSchedContinueInfo& continueInfo,
181 const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
182 {
183 std::string srcDeviceId = continueInfo.sourceDeviceId_;
184 std::string dstDeviceId = continueInfo.sinkDeviceId_;
185
186 if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
187 HILOGE("srcDeviceId or dstDeviceId or callback is null!");
188 return INVALID_PARAMETERS_ERR;
189 }
190
191 std::string localDevId;
192 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
193 HILOGE("get local deviceId failed!");
194 return INVALID_PARAMETERS_ERR;
195 }
196 if (localDevId != srcDeviceId && localDevId != dstDeviceId) {
197 HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.",
198 GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str());
199 return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
200 }
201 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(
202 localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) {
203 HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.",
204 GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str());
205 return INVALID_REMOTE_PARAMETERS_ERR;
206 }
207
208 #ifdef SUPPORT_DISTRIBUTED_MISSION_MANAGER
209 if (localDevId == srcDeviceId) {
210 int32_t missionId = -1;
211 int32_t ret = DMSContinueSendMgr::GetInstance().GetMissionIdByBundleName(
212 continueInfo.sinkBundleName_, missionId);
213 if (ret != ERR_OK) {
214 HILOGE("get missionId fail, ret %{public}d.", ret);
215 return INVALID_PARAMETERS_ERR;
216 }
217 }
218 #endif
219
220 auto func = [this, continueInfo, callback, wantParams]() {
221 HandleContinueMission(continueInfo, callback, wantParams);
222 };
223 if (eventHandler_ == nullptr) {
224 HILOGE("eventHandler_ is nullptr");
225 return INVALID_PARAMETERS_ERR;
226 }
227 eventHandler_->PostTask(func);
228 return ERR_OK;
229 }
230
HandleContinueMission(const DSchedContinueInfo& continueInfo, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)231 void DSchedContinueManager::HandleContinueMission(const DSchedContinueInfo& continueInfo,
232 const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
233 {
234 std::string srcDeviceId = continueInfo.sourceDeviceId_;
235 std::string dstDeviceId = continueInfo.sinkDeviceId_;
236 std::string srcBundleName = continueInfo.sourceBundleName_;
237 std::string bundleName = continueInfo.sinkBundleName_;
238 std::string continueType = continueInfo.continueType_;
239 HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. bundleName: %{public}s."
240 " continueType: %{public}s.", GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(),
241 bundleName.c_str(), continueType.c_str());
242
243 if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
244 HILOGE("srcDeviceId or dstDeviceId or callback is null!");
245 return;
246 }
247
248 DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, srcBundleName, dstDeviceId, bundleName, continueType);
249 HandleContinueMissionWithBundleName(info, callback, wantParams);
250 return;
251 }
252
GetFirstBundleName(DSchedContinueInfo &info, std::string &firstBundleName, std::string bundleName, std::string deviceId)253 bool DSchedContinueManager::GetFirstBundleName(DSchedContinueInfo &info, std::string &firstBundleName,
254 std::string bundleName, std::string deviceId)
255 {
256 uint16_t bundleNameId;
257 DmsBundleInfo distributedBundleInfo;
258 DmsBmStorage::GetInstance()->GetBundleNameId(bundleName, bundleNameId);
259 bool result = DmsBmStorage::GetInstance()->GetDistributedBundleInfo(deviceId,
260 bundleNameId, distributedBundleInfo);
261 if (!result) {
262 HILOGE("GetDistributedBundleInfo faild");
263 return false;
264 }
265 std::vector<DmsAbilityInfo> dmsAbilityInfos = distributedBundleInfo.dmsAbilityInfos;
266 for (DmsAbilityInfo &ability: dmsAbilityInfos) {
267 std::vector<std::string> abilityContinueTypes = ability.continueType;
268 for (std::string &ability_continue_type: abilityContinueTypes) {
269 if (ability_continue_type == info.continueType_ && !ability.continueBundleName.empty()) {
270 firstBundleName = *ability.continueBundleName.begin();
271 return true;
272 }
273 }
274 }
275 HILOGE("can not get abilicy info or continue bundle names is empty for continue type:%{public}s",
276 info.continueType_.c_str());
277 return false;
278 }
279
HandleContinueMissionWithBundleName(DSchedContinueInfo &info, const sptr<IRemoteObject> &callback, const OHOS::AAFwk::WantParams &wantParams)280 void DSchedContinueManager::HandleContinueMissionWithBundleName(DSchedContinueInfo &info,
281 const sptr<IRemoteObject> &callback, const OHOS::AAFwk::WantParams &wantParams)
282 {
283 int32_t direction = CONTINUE_SINK;
284 int32_t ret = CheckContinuationLimit(info.sourceDeviceId_, info.sinkDeviceId_, direction);
285 if (ret != ERR_OK) {
286 HILOGE("CheckContinuationLimit failed, ret: %{public}d", ret);
287 return;
288 }
289 int32_t subType = CONTINUE_PUSH;
290 if (direction == CONTINUE_SOURCE) {
291 cntSource_++;
292 } else {
293 cntSink_++;
294 subType = CONTINUE_PULL;
295 if (info.sourceBundleName_.empty()) {
296 HILOGW("current sub type is continue pull; but can not get source bundle name from recv cache.");
297 std::string firstBundleNamme;
298 std::string bundleName = info.sinkBundleName_;
299 std::string deviceId = info.sinkDeviceId_;
300 if (GetFirstBundleName(info, firstBundleNamme, bundleName, deviceId)) {
301 info.sourceBundleName_ = firstBundleNamme;
302 }
303 }
304 }
305 {
306 std::lock_guard<std::mutex> continueLock(continueMutex_);
307 if (!continues_.empty() && continues_.find(info) != continues_.end()) {
308 HILOGE("a same continue task is already in progress.");
309 return;
310 }
311 auto newContinue = std::make_shared<DSchedContinue>(subType, direction, callback, info);
312 newContinue->Init();
313 continues_.insert(std::make_pair(info, newContinue));
314 #ifdef DMSFWK_ALL_CONNECT_MGR
315 {
316 std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
317 std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
318 if (peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end()) {
319 peerConnectDecision_.erase(peerDeviceId);
320 }
321 }
322 #endif
323 newContinue->OnContinueMission(wantParams);
324 }
325 WaitAllConnectDecision(direction, info, CONTINUE_TIMEOUT);
326 HILOGI("end, subType: %{public}d dirction: %{public}d, continue info: %{public}s",
327 subType, direction, info.toString().c_str());
328 }
329
WaitAllConnectDecision(int32_t direction, const DSchedContinueInfo &info, int32_t timeout)330 void DSchedContinueManager::WaitAllConnectDecision(int32_t direction, const DSchedContinueInfo &info, int32_t timeout)
331 {
332 #ifdef DMSFWK_ALL_CONNECT_MGR
333 std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
334 {
335 std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
336 connectDecisionCond_.wait_for(decisionLock, std::chrono::seconds(CONNECT_DECISION_WAIT_S),
337 [this, peerDeviceId]() {
338 return peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end() &&
339 peerConnectDecision_.at(peerDeviceId).load();
340 });
341
342 if (peerConnectDecision_.find(peerDeviceId) == peerConnectDecision_.end()) {
343 HILOGE("Not find peerDeviceId %{public}s in peerConnectDecision.", GetAnonymStr(peerDeviceId).c_str());
344 SetTimeOut(info, 0);
345 return;
346 }
347 if (!peerConnectDecision_.at(peerDeviceId).load()) {
348 HILOGE("All connect manager refuse bind to PeerDeviceId %{public}s.", GetAnonymStr(peerDeviceId).c_str());
349 peerConnectDecision_.erase(peerDeviceId);
350 SetTimeOut(info, 0);
351 return;
352 }
353 peerConnectDecision_.erase(peerDeviceId);
354 }
355 #endif
356 SetTimeOut(info, timeout);
357 }
358
SetTimeOut(const DSchedContinueInfo &info, int32_t timeout)359 void DSchedContinueManager::SetTimeOut(const DSchedContinueInfo &info, int32_t timeout)
360 {
361 auto func = [this, info]() {
362 if (continues_.empty() || continues_.count(info) == 0) {
363 HILOGE("continue not exist.");
364 return;
365 }
366 HILOGE("continue timeout! info: %{public}s", info.toString().c_str());
367 auto dsContinue = continues_[info];
368 if (dsContinue != nullptr) {
369 dsContinue->OnContinueEnd(CONTINUE_ABILITY_TIMEOUT_ERR);
370 }
371 };
372 if (eventHandler_ == nullptr) {
373 HILOGE("eventHandler_ is nullptr");
374 return;
375 }
376 timeout > 0 ? eventHandler_->PostTask(func, info.ToStringIgnoreMissionId(), timeout) :
377 eventHandler_->PostTask(func);
378 }
379
StartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId, int32_t callerUid, int32_t status, uint32_t accessToken)380 int32_t DSchedContinueManager::StartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId,
381 int32_t callerUid, int32_t status, uint32_t accessToken)
382 {
383 std::string dstDeviceId = want.GetElement().GetDeviceID();
384 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) {
385 HILOGE("GetDeviceInfoById fail, dstDevId: %{public}s.", GetAnonymStr(dstDeviceId).c_str());
386 return INVALID_REMOTE_PARAMETERS_ERR;
387 }
388 if (GetDSchedContinueByWant(want, missionId) == nullptr) {
389 HILOGE("GetDSchedContinueByWant fail, dstDevId: %{public}s, missionId: %{public}d.",
390 GetAnonymStr(dstDeviceId).c_str(), missionId);
391 return INVALID_REMOTE_PARAMETERS_ERR;
392 }
393
394 auto func = [this, want, missionId, callerUid, status, accessToken]() {
395 HandleStartContinuation(want, missionId, callerUid, status, accessToken);
396 };
397 if (eventHandler_ == nullptr) {
398 HILOGE("eventHandler_ is nullptr");
399 return INVALID_PARAMETERS_ERR;
400 }
401 eventHandler_->PostTask(func);
402 return ERR_OK;
403 }
404
HandleStartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId, int32_t callerUid, int32_t status, uint32_t accessToken)405 void DSchedContinueManager::HandleStartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId,
406 int32_t callerUid, int32_t status, uint32_t accessToken)
407 {
408 HILOGI("begin");
409 auto dContinue = GetDSchedContinueByWant(want, missionId);
410 if (dContinue != nullptr) {
411 dContinue->OnStartContinuation(want, callerUid, status, accessToken);
412 } else {
413 DmsRadar::GetInstance().SaveDataDmsRemoteWant("HandleStartContinuation", INVALID_PARAMETERS_ERR);
414 }
415 HILOGI("end");
416 return;
417 }
418
GetDSchedContinueByWant( const OHOS::AAFwk::Want& want, int32_t missionId)419 std::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByWant(
420 const OHOS::AAFwk::Want& want, int32_t missionId)
421 {
422 std::string srcDeviceId;
423 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(srcDeviceId)) {
424 DmsRadar::GetInstance().SaveDataDmsRemoteWant("GetDSchedContinueByWant", GET_LOCAL_DEVICE_ERR);
425 HILOGE("get local deviceId failed!");
426 return nullptr;
427 }
428 std::string dstDeviceId = want.GetElement().GetDeviceID();
429 std::string bundleName = want.GetElement().GetBundleName();
430 auto info = DSchedContinueInfo(srcDeviceId, bundleName, dstDeviceId, bundleName, "");
431
432 HILOGI("continue info: %{public}s.", info.toString().c_str());
433 {
434 std::lock_guard<std::mutex> continueLock(continueMutex_);
435 if (continues_.empty()) {
436 HILOGE("continue info doesn't match an existing continuation.");
437 return nullptr;
438 }
439 for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
440 if (iter->second == nullptr) {
441 continue;
442 }
443 DSchedContinueInfo continueInfo = iter->second->GetContinueInfo();
444 if (srcDeviceId == continueInfo.sourceDeviceId_
445 && bundleName == continueInfo.sourceBundleName_
446 && dstDeviceId == continueInfo.sinkDeviceId_) {
447 return iter->second;
448 }
449 }
450 }
451 HILOGE("missionId doesn't match the existing continuation, continueInfo: %{public}s.",
452 info.toString().c_str());
453 return nullptr;
454 }
455
NotifyCompleteContinuation(const std::u16string& devId, int32_t sessionId, bool isSuccess, const std::string &callerBundleName)456 int32_t DSchedContinueManager::NotifyCompleteContinuation(const std::u16string& devId, int32_t sessionId,
457 bool isSuccess, const std::string &callerBundleName)
458 {
459 auto func = [this, devId, sessionId, isSuccess, callerBundleName]() {
460 HandleNotifyCompleteContinuation(devId, sessionId, isSuccess, callerBundleName);
461 };
462 if (eventHandler_ == nullptr) {
463 HILOGE("eventHandler_ is nullptr");
464 return INVALID_PARAMETERS_ERR;
465 }
466 eventHandler_->PostTask(func);
467 return ERR_OK;
468 }
469
HandleNotifyCompleteContinuation(const std::u16string& devId, int32_t missionId, bool isSuccess, const std::string &callerBundleName)470 void DSchedContinueManager::HandleNotifyCompleteContinuation(const std::u16string& devId, int32_t missionId,
471 bool isSuccess, const std::string &callerBundleName)
472 {
473 HILOGI("begin, isSuccess %{public}d", isSuccess);
474 auto dContinue = GetDSchedContinueByDevId(devId, missionId);
475 if (dContinue != nullptr) {
476 if (dContinue->GetContinueInfo().sinkBundleName_ != callerBundleName) {
477 HILOGE("callerBundleName doesn't match the existing continuation");
478 return;
479 }
480 dContinue->OnNotifyComplete(missionId, isSuccess);
481 HILOGI("end, continue info: %{public}s.", dContinue->GetContinueInfo().toString().c_str());
482 }
483 return;
484 }
485
GetDSchedContinueByDevId( const std::u16string& devId, int32_t missionId)486 std::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByDevId(
487 const std::u16string& devId, int32_t missionId)
488 {
489 std::string deviceId = Str16ToStr8(devId);
490 HILOGI("begin, deviceId %{public}s, missionId %{public}d", GetAnonymStr(deviceId).c_str(), missionId);
491 {
492 std::lock_guard<std::mutex> continueLock(continueMutex_);
493 if (continues_.empty()) {
494 HILOGE("No continuation in progress.");
495 return nullptr;
496 }
497 for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
498 if (iter->second != nullptr && deviceId == iter->second->GetContinueInfo().sourceDeviceId_) {
499 return iter->second;
500 }
501 }
502 }
503 HILOGE("source deviceId doesn't match an existing continuation.");
504 return nullptr;
505 }
506
NotifyTerminateContinuation(const int32_t missionId)507 void DSchedContinueManager::NotifyTerminateContinuation(const int32_t missionId)
508 {
509 HILOGI("begin, missionId %{public}d", missionId);
510 {
511 std::lock_guard<std::mutex> continueLock(continueMutex_);
512 if (continues_.empty()) {
513 HILOGW("No continuation in progress.");
514 return;
515 }
516
517 ContinueLaunchMissionInfo missionInfo;
518 int32_t ret = DMSContinueSendMgr::GetInstance().GetContinueLaunchMissionInfo(missionId, missionInfo);
519 if (ret != ERR_OK) {
520 HILOGE("get continueLaunchMissionInfo failed, missionId %{public}d", missionId);
521 return;
522 }
523 HILOGI("alive missionInfo bundleName is %{public}s, abilityName is %{public}s",
524 missionInfo.bundleName.c_str(), missionInfo.abilityName.c_str());
525 for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
526 if (iter->second == nullptr) {
527 break;
528 }
529
530 auto continueInfo = iter->second->GetContinueInfo();
531 HILOGI("continueInfo bundleName is %{public}s, abilityName is %{public}s",
532 continueInfo.sinkBundleName_.c_str(), continueInfo.sinkAbilityName_.c_str());
533 if (missionInfo.bundleName == continueInfo.sinkBundleName_
534 && missionInfo.abilityName == continueInfo.sinkAbilityName_) {
535 HILOGE("Excute onContinueEnd");
536 iter->second->OnContinueEnd(CONTINUE_SINK_ABILITY_TERMINATED);
537 return;
538 }
539 }
540 }
541 HILOGW("doesn't match an existing continuation.");
542 }
543
OnContinueEnd(const DSchedContinueInfo& info)544 int32_t DSchedContinueManager::OnContinueEnd(const DSchedContinueInfo& info)
545 {
546 auto func = [this, info]() {
547 HandleContinueEnd(info);
548 };
549 if (eventHandler_ == nullptr) {
550 HILOGE("eventHandler_ is nullptr");
551 return INVALID_PARAMETERS_ERR;
552 }
553 eventHandler_->PostTask(func);
554 return ERR_OK;
555 }
556
HandleContinueEnd(const DSchedContinueInfo& info)557 void DSchedContinueManager::HandleContinueEnd(const DSchedContinueInfo& info)
558 {
559 HILOGI("begin, continue info: %{public}s.", info.toString().c_str());
560 std::lock_guard<std::mutex> continueLock(continueMutex_);
561 if (continues_.empty() || continues_.find(info) == continues_.end()) {
562 HILOGE("continue info doesn't match any existing continuation.");
563 return;
564 }
565 RemoveTimeout(info);
566 continues_.erase(info);
567 ContinueSceneSessionHandler::GetInstance().ClearContinueSessionId();
568
569 std::string localDevId;
570 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
571 HILOGE("get local deviceId failed!");
572 return;
573 }
574
575 if (info.sinkDeviceId_ == localDevId) {
576 cntSink_--;
577 } else if (info.sourceDeviceId_ == localDevId) {
578 cntSource_--;
579 }
580 HILOGI("end.");
581 }
582
RemoveTimeout(const DSchedContinueInfo& info)583 void DSchedContinueManager::RemoveTimeout(const DSchedContinueInfo& info)
584 {
585 if (eventHandler_ == nullptr) {
586 HILOGE("eventHandler_ is nullptr");
587 return;
588 }
589 eventHandler_->RemoveTask(info.ToStringIgnoreMissionId());
590 }
591
OnDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)592 void DSchedContinueManager::OnDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)
593 {
594 auto func = [this, sessionId, dataBuffer]() {
595 HandleDataRecv(sessionId, dataBuffer);
596 };
597 if (eventHandler_ == nullptr) {
598 HILOGE("eventHandler_ is nullptr");
599 return;
600 }
601 eventHandler_->PostTask(func);
602 }
603
HandleDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)604 void DSchedContinueManager::HandleDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)
605 {
606 HILOGI("start, sessionId: %{public}d.", sessionId);
607 if (dataBuffer == nullptr) {
608 HILOGE("dataBuffer is null.");
609 return;
610 }
611 uint8_t *data = dataBuffer->Data();
612 std::string jsonStr(reinterpret_cast<const char *>(data), dataBuffer->Capacity());
613 cJSON *rootValue = cJSON_Parse(jsonStr.c_str());
614 if (rootValue == nullptr) {
615 HILOGE("Parse jsonStr error.");
616 return;
617 }
618 cJSON *baseCmd = cJSON_GetObjectItemCaseSensitive(rootValue, "BaseCmd");
619 if (baseCmd == nullptr || !cJSON_IsString(baseCmd) || (baseCmd->valuestring == nullptr)) {
620 cJSON_Delete(rootValue);
621 HILOGE("Parse base cmd error.");
622 return;
623 }
624
625 cJSON *cmdValue = cJSON_Parse(baseCmd->valuestring);
626 cJSON_Delete(rootValue);
627 if (cmdValue == nullptr) {
628 HILOGE("Parse cmd value error.");
629 return;
630 }
631
632 cJSON *comvalue = cJSON_GetObjectItemCaseSensitive(cmdValue, "Command");
633 if (comvalue == nullptr || !cJSON_IsNumber(comvalue)) {
634 cJSON_Delete(cmdValue);
635 HILOGE("parse command failed");
636 return;
637 }
638 int32_t command = comvalue->valueint;
639 cJSON_Delete(cmdValue);
640 NotifyContinueDataRecv(sessionId, command, jsonStr, dataBuffer);
641 HILOGI("end, sessionId: %{public}d.", sessionId);
642 }
643
NotifyContinueDataRecv(int32_t sessionId, int32_t command, const std::string& jsonStr, std::shared_ptr<DSchedDataBuffer> dataBuffer)644 void DSchedContinueManager::NotifyContinueDataRecv(int32_t sessionId, int32_t command, const std::string& jsonStr,
645 std::shared_ptr<DSchedDataBuffer> dataBuffer)
646 {
647 HILOGI("start, parsed cmd %{public}d, sessionId: %{public}d.", command, sessionId);
648 std::lock_guard<std::mutex> continueLock(continueMutex_);
649 if (!continues_.empty()) {
650 for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
651 if (iter->second != nullptr && sessionId == iter->second->GetSessionId()) {
652 HILOGI("sessionId %{public}d exist.", sessionId);
653 iter->second->OnDataRecv(command, dataBuffer);
654 return;
655 }
656 }
657 }
658
659 if (command == DSCHED_CONTINUE_CMD_START) {
660 HILOGI("recv start cmd, sessionId: %{public}d.", sessionId);
661 auto startCmd = std::make_shared<DSchedContinueStartCmd>();
662 int32_t ret = startCmd->Unmarshal(jsonStr);
663 if (ret != ERR_OK) {
664 HILOGE("Unmarshal start cmd failed, ret: %{public}d", ret);
665 return;
666 }
667 int32_t direction = CONTINUE_SINK;
668 ret = CheckContinuationLimit(startCmd->srcDeviceId_, startCmd->dstDeviceId_, direction);
669 if (ret != ERR_OK) {
670 DmsRadar::GetInstance().SaveDataDmsRemoteWant("NotifyContinueDataRecv", ret);
671 HILOGE("CheckContinuationSubType failed, ret: %{public}d", ret);
672 return;
673 }
674
675 auto newContinue = std::make_shared<DSchedContinue>(startCmd, sessionId);
676 newContinue->Init();
677 continues_.insert(std::make_pair(newContinue->GetContinueInfo(), newContinue));
678
679 newContinue->OnStartCmd(startCmd->appVersion_);
680 HILOGI("end, continue info: %{public}s.", newContinue->GetContinueInfo().toString().c_str());
681 return;
682 }
683 HILOGE("No matching session to handle cmd! sessionId: %{public}d, recv cmd %{public}d.", sessionId, command);
684 return;
685 }
686
CheckContinuationLimit(const std::string& srcDeviceId, const std::string& dstDeviceId, int32_t &direction)687 int32_t DSchedContinueManager::CheckContinuationLimit(const std::string& srcDeviceId,
688 const std::string& dstDeviceId, int32_t &direction)
689 {
690 std::string localDevId;
691 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
692 HILOGE("get local deviceId failed!");
693 return GET_LOCAL_DEVICE_ERR;
694 }
695
696 direction = CONTINUE_SINK;
697 if (dstDeviceId == localDevId) {
698 if (cntSink_.load() >= MAX_CONCURRENT_SINK) {
699 HILOGE("can't deal more than %{public}d pull requests at the same time.", cntSink_.load());
700 return CONTINUE_ALREADY_IN_PROGRESS;
701 }
702 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(srcDeviceId) == nullptr) {
703 HILOGE("Irrecognized source device!");
704 return INVALID_PARAMETERS_ERR;
705 }
706 } else if (srcDeviceId == localDevId) {
707 if (cntSource_.load() >= MAX_CONCURRENT_SOURCE) {
708 HILOGE("can't deal more than %{public}d push requests at the same time.", cntSource_.load());
709 return CONTINUE_ALREADY_IN_PROGRESS;
710 }
711 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) {
712 HILOGE("Irrecognized destination device!");
713 return INVALID_PARAMETERS_ERR;
714 }
715 direction = CONTINUE_SOURCE;
716 } else {
717 HILOGE("source or target device must be local!");
718 return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
719 }
720 return ERR_OK;
721 }
722
GetContinueInfo(std::string &srcDeviceId, std::string &dstDeviceId)723 int32_t DSchedContinueManager::GetContinueInfo(std::string &srcDeviceId, std::string &dstDeviceId)
724 {
725 HILOGI("called");
726 std::lock_guard<std::mutex> continueLock(continueMutex_);
727 if (continues_.empty()) {
728 HILOGW("No continuation in progress.");
729 return ERR_OK;
730 }
731 auto dsContinue = continues_.begin()->second;
732 if (dsContinue == nullptr) {
733 HILOGE("dContinue is null");
734 return INVALID_PARAMETERS_ERR;
735 }
736 dstDeviceId = dsContinue->GetContinueInfo().sinkDeviceId_;
737 srcDeviceId = dsContinue->GetContinueInfo().sourceDeviceId_;
738 return ERR_OK;
739 }
740
OnShutdown(int32_t socket, bool isSelfCalled)741 void DSchedContinueManager::OnShutdown(int32_t socket, bool isSelfCalled)
742 {
743 if (isSelfCalled) {
744 HILOGW("called, shutdown by local, sessionId: %{public}d", socket);
745 return;
746 }
747 HILOGW("called, sessionId: %{public}d, isSelfCalled %{public}d", socket, isSelfCalled);
748 auto func = [this, socket]() {
749 std::lock_guard<std::mutex> continueLock(continueMutex_);
750 if (continues_.empty()) {
751 return;
752 }
753 for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
754 if (iter->second != nullptr && socket == iter->second->GetSessionId()) {
755 iter->second->OnContinueEnd(CONTINUE_SESSION_SHUTDOWN);
756 }
757 }
758 };
759 if (eventHandler_ == nullptr) {
760 HILOGE("eventHandler_ is nullptr");
761 return;
762 }
763 eventHandler_->PostTask(func);
764 return;
765 }
766
OnBind(int32_t socket, PeerSocketInfo info)767 void DSchedContinueManager::SoftbusListener::OnBind(int32_t socket, PeerSocketInfo info)
768 {
769 }
770
OnShutdown(int32_t socket, bool isSelfCalled)771 void DSchedContinueManager::SoftbusListener::OnShutdown(int32_t socket, bool isSelfCalled)
772 {
773 DSchedContinueManager::GetInstance().OnShutdown(socket, isSelfCalled);
774 }
775
OnDataRecv(int32_t socket, std::shared_ptr<DSchedDataBuffer> dataBuffer)776 void DSchedContinueManager::SoftbusListener::OnDataRecv(int32_t socket, std::shared_ptr<DSchedDataBuffer> dataBuffer)
777 {
778 DSchedContinueManager::GetInstance().OnDataRecv(socket, dataBuffer);
779 }
780 } // namespace DistributedSchedule
781 } // namespace OHOS
782