1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "dsched_continue_manager.h"
17 
18 #include <chrono>
19 #include <sys/prctl.h>
20 
21 #include "cJSON.h"
22 
23 #include "continue_scene_session_handler.h"
24 #include "dfx/distributed_radar.h"
25 #include "distributed_sched_utils.h"
26 #include "dsched_transport_softbus_adapter.h"
27 #include "dtbschedmgr_device_info_storage.h"
28 #include "dtbschedmgr_log.h"
29 #include "mission/distributed_bm_storage.h"
30 #include "mission/dms_continue_send_manager.h"
31 #include "mission/dms_continue_recv_manager.h"
32 
33 namespace OHOS {
34 namespace DistributedSchedule {
35 namespace {
36 const std::string TAG = "DSchedContinueManager";
37 const std::string DSCHED_CONTINUE_MANAGER = "dsched_continue_manager";
38 const std::string CONTINUE_TIMEOUT_TASK = "continue_timeout_task";
39 }
40 
41 IMPLEMENT_SINGLE_INSTANCE(DSchedContinueManager);
42 
DSchedContinueManager()43 DSchedContinueManager::DSchedContinueManager()
44 {
45 }
46 
~DSchedContinueManager()47 DSchedContinueManager::~DSchedContinueManager()
48 {
49     HILOGI("DSchedContinueManager delete");
50     UnInit();
51 }
52 
Init()53 void DSchedContinueManager::Init()
54 {
55     HILOGI("Init DSchedContinueManager start");
56     if (eventHandler_ != nullptr) {
57         HILOGI("DSchedContinueManager already inited, end.");
58         return;
59     }
60     DSchedTransportSoftbusAdapter::GetInstance().InitChannel();
61     softbusListener_ = std::make_shared<DSchedContinueManager::SoftbusListener>();
62     DSchedTransportSoftbusAdapter::GetInstance().RegisterListener(SERVICE_TYPE_CONTINUE, softbusListener_);
63     eventThread_ = std::thread(&DSchedContinueManager::StartEvent, this);
64     std::unique_lock<std::mutex> lock(eventMutex_);
65     eventCon_.wait(lock, [this] {
66         return eventHandler_ != nullptr;
67     });
68     HILOGI("Init DSchedContinueManager end");
69 }
70 
StartEvent()71 void DSchedContinueManager::StartEvent()
72 {
73     HILOGI("StartEvent start");
74     prctl(PR_SET_NAME, DSCHED_CONTINUE_MANAGER.c_str());
75     auto runner = AppExecFwk::EventRunner::Create(false);
76     {
77         std::lock_guard<std::mutex> lock(eventMutex_);
78         eventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
79     }
80     eventCon_.notify_one();
81     runner->Run();
82     HILOGI("StartEvent end");
83 }
84 
UnInit()85 void DSchedContinueManager::UnInit()
86 {
87     HILOGI("UnInit start");
88     DSchedTransportSoftbusAdapter::GetInstance().UnregisterListener(SERVICE_TYPE_CONTINUE, softbusListener_);
89     DSchedTransportSoftbusAdapter::GetInstance().ReleaseChannel();
90     continues_.clear();
91     cntSink_ = 0;
92     cntSource_ = 0;
93 
94     if (eventHandler_ != nullptr) {
95         eventHandler_->GetEventRunner()->Stop();
96         eventThread_.join();
97         eventHandler_ = nullptr;
98     } else {
99         HILOGE("eventHandler_ is nullptr");
100     }
101     HILOGI("UnInit end");
102 }
103 
NotifyAllConnectDecision(std::string peerDeviceId, bool isSupport)104 void DSchedContinueManager::NotifyAllConnectDecision(std::string peerDeviceId, bool isSupport)
105 {
106     HILOGI("Notify all connect decision, peerDeviceId %{public}s, isSupport %{public}d.",
107         GetAnonymStr(peerDeviceId).c_str(), isSupport);
108 #ifdef DMSFWK_ALL_CONNECT_MGR
109     std::lock_guard<std::mutex> decisionLock(connectDecisionMutex_);
110     peerConnectDecision_[peerDeviceId] = isSupport;
111     connectDecisionCond_.notify_all();
112 #endif
113 }
114 
ContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId, int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)115 int32_t DSchedContinueManager::ContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
116     int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
117 {
118     if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
119         HILOGE("srcDeviceId or dstDeviceId or callback is null!");
120         return INVALID_PARAMETERS_ERR;
121     }
122 
123     std::string localDevId;
124     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
125         HILOGE("get local deviceId failed!");
126         return INVALID_PARAMETERS_ERR;
127     }
128     if (localDevId != srcDeviceId && localDevId != dstDeviceId) {
129         HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.",
130             GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str());
131         return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
132     }
133     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(
134         localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) {
135         HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.",
136             GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str());
137         return INVALID_REMOTE_PARAMETERS_ERR;
138     }
139 
140     auto func = [this, srcDeviceId, dstDeviceId, missionId, callback, wantParams]() {
141         HandleContinueMission(srcDeviceId, dstDeviceId, missionId, callback, wantParams);
142     };
143     if (eventHandler_ == nullptr) {
144         HILOGE("eventHandler_ is nullptr");
145         return INVALID_PARAMETERS_ERR;
146     }
147     eventHandler_->PostTask(func);
148     return ERR_OK;
149 }
150 
HandleContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId, int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)151 void DSchedContinueManager::HandleContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
152     int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
153 {
154     HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. missionId: %{public}d.",
155         GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), missionId);
156 
157     if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
158         HILOGE("srcDeviceId or dstDeviceId or callback is null!");
159         return;
160     }
161 
162     std::string localDevId;
163     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
164         HILOGE("get local deviceId failed!");
165         return;
166     }
167     DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, dstDeviceId, missionId);
168 
169     AAFwk::MissionInfo missionInfo;
170     if (AAFwk::AbilityManagerClient::GetInstance()->GetMissionInfo("", missionId, missionInfo) == ERR_OK
171         && srcDeviceId == localDevId) {
172         info.sourceBundleName_ = missionInfo.want.GetBundle();
173         info.sinkBundleName_ = missionInfo.want.GetBundle();
174     }
175 
176     HandleContinueMissionWithBundleName(info, callback, wantParams);
177     return;
178 }
179 
ContinueMission(const DSchedContinueInfo& continueInfo, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)180 int32_t DSchedContinueManager::ContinueMission(const DSchedContinueInfo& continueInfo,
181     const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
182 {
183     std::string srcDeviceId = continueInfo.sourceDeviceId_;
184     std::string dstDeviceId = continueInfo.sinkDeviceId_;
185 
186     if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
187         HILOGE("srcDeviceId or dstDeviceId or callback is null!");
188         return INVALID_PARAMETERS_ERR;
189     }
190 
191     std::string localDevId;
192     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
193         HILOGE("get local deviceId failed!");
194         return INVALID_PARAMETERS_ERR;
195     }
196     if (localDevId != srcDeviceId && localDevId != dstDeviceId) {
197         HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.",
198             GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str());
199         return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
200     }
201     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(
202         localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) {
203         HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.",
204             GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str());
205         return INVALID_REMOTE_PARAMETERS_ERR;
206     }
207 
208 #ifdef SUPPORT_DISTRIBUTED_MISSION_MANAGER
209     if (localDevId == srcDeviceId) {
210         int32_t missionId = -1;
211         int32_t ret = DMSContinueSendMgr::GetInstance().GetMissionIdByBundleName(
212             continueInfo.sinkBundleName_, missionId);
213         if (ret != ERR_OK) {
214             HILOGE("get missionId fail, ret %{public}d.", ret);
215             return INVALID_PARAMETERS_ERR;
216         }
217     }
218 #endif
219 
220     auto func = [this, continueInfo, callback, wantParams]() {
221         HandleContinueMission(continueInfo, callback, wantParams);
222     };
223     if (eventHandler_ == nullptr) {
224         HILOGE("eventHandler_ is nullptr");
225         return INVALID_PARAMETERS_ERR;
226     }
227     eventHandler_->PostTask(func);
228     return ERR_OK;
229 }
230 
HandleContinueMission(const DSchedContinueInfo& continueInfo, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)231 void DSchedContinueManager::HandleContinueMission(const DSchedContinueInfo& continueInfo,
232     const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
233 {
234     std::string srcDeviceId = continueInfo.sourceDeviceId_;
235     std::string dstDeviceId = continueInfo.sinkDeviceId_;
236     std::string srcBundleName = continueInfo.sourceBundleName_;
237     std::string bundleName = continueInfo.sinkBundleName_;
238     std::string continueType = continueInfo.continueType_;
239     HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. bundleName: %{public}s."
240         " continueType: %{public}s.", GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(),
241         bundleName.c_str(), continueType.c_str());
242 
243     if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
244         HILOGE("srcDeviceId or dstDeviceId or callback is null!");
245         return;
246     }
247 
248     DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, srcBundleName, dstDeviceId, bundleName, continueType);
249     HandleContinueMissionWithBundleName(info, callback, wantParams);
250     return;
251 }
252 
GetFirstBundleName(DSchedContinueInfo &info, std::string &firstBundleName, std::string bundleName, std::string deviceId)253 bool DSchedContinueManager::GetFirstBundleName(DSchedContinueInfo &info, std::string &firstBundleName,
254     std::string bundleName, std::string deviceId)
255 {
256     uint16_t bundleNameId;
257     DmsBundleInfo distributedBundleInfo;
258     DmsBmStorage::GetInstance()->GetBundleNameId(bundleName, bundleNameId);
259     bool result = DmsBmStorage::GetInstance()->GetDistributedBundleInfo(deviceId,
260         bundleNameId, distributedBundleInfo);
261     if (!result) {
262         HILOGE("GetDistributedBundleInfo faild");
263         return false;
264     }
265     std::vector<DmsAbilityInfo> dmsAbilityInfos = distributedBundleInfo.dmsAbilityInfos;
266     for (DmsAbilityInfo &ability: dmsAbilityInfos) {
267         std::vector<std::string> abilityContinueTypes = ability.continueType;
268         for (std::string &ability_continue_type: abilityContinueTypes) {
269             if (ability_continue_type == info.continueType_ && !ability.continueBundleName.empty()) {
270                 firstBundleName = *ability.continueBundleName.begin();
271                 return true;
272             }
273         }
274     }
275     HILOGE("can not get abilicy info or continue bundle names is empty for continue type:%{public}s",
276            info.continueType_.c_str());
277     return false;
278 }
279 
HandleContinueMissionWithBundleName(DSchedContinueInfo &info, const sptr<IRemoteObject> &callback, const OHOS::AAFwk::WantParams &wantParams)280 void DSchedContinueManager::HandleContinueMissionWithBundleName(DSchedContinueInfo &info,
281     const sptr<IRemoteObject> &callback, const OHOS::AAFwk::WantParams &wantParams)
282 {
283     int32_t direction = CONTINUE_SINK;
284     int32_t ret = CheckContinuationLimit(info.sourceDeviceId_, info.sinkDeviceId_, direction);
285     if (ret != ERR_OK) {
286         HILOGE("CheckContinuationLimit failed, ret: %{public}d", ret);
287         return;
288     }
289     int32_t subType = CONTINUE_PUSH;
290     if (direction == CONTINUE_SOURCE) {
291         cntSource_++;
292     } else {
293         cntSink_++;
294         subType = CONTINUE_PULL;
295         if (info.sourceBundleName_.empty()) {
296             HILOGW("current sub type is continue pull; but can not get source bundle name from recv cache.");
297             std::string firstBundleNamme;
298             std::string bundleName = info.sinkBundleName_;
299             std::string deviceId = info.sinkDeviceId_;
300             if (GetFirstBundleName(info, firstBundleNamme, bundleName, deviceId)) {
301                 info.sourceBundleName_ = firstBundleNamme;
302             }
303         }
304     }
305     {
306         std::lock_guard<std::mutex> continueLock(continueMutex_);
307         if (!continues_.empty() && continues_.find(info) != continues_.end()) {
308             HILOGE("a same continue task is already in progress.");
309             return;
310         }
311         auto newContinue = std::make_shared<DSchedContinue>(subType, direction, callback, info);
312         newContinue->Init();
313         continues_.insert(std::make_pair(info, newContinue));
314 #ifdef DMSFWK_ALL_CONNECT_MGR
315         {
316             std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
317             std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
318             if (peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end()) {
319                 peerConnectDecision_.erase(peerDeviceId);
320             }
321         }
322 #endif
323         newContinue->OnContinueMission(wantParams);
324     }
325     WaitAllConnectDecision(direction, info, CONTINUE_TIMEOUT);
326     HILOGI("end, subType: %{public}d dirction: %{public}d, continue info: %{public}s",
327         subType, direction, info.toString().c_str());
328 }
329 
WaitAllConnectDecision(int32_t direction, const DSchedContinueInfo &info, int32_t timeout)330 void DSchedContinueManager::WaitAllConnectDecision(int32_t direction, const DSchedContinueInfo &info, int32_t timeout)
331 {
332 #ifdef DMSFWK_ALL_CONNECT_MGR
333     std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
334     {
335         std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
336         connectDecisionCond_.wait_for(decisionLock, std::chrono::seconds(CONNECT_DECISION_WAIT_S),
337             [this, peerDeviceId]() {
338                 return peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end() &&
339                     peerConnectDecision_.at(peerDeviceId).load();
340             });
341 
342         if (peerConnectDecision_.find(peerDeviceId) == peerConnectDecision_.end()) {
343             HILOGE("Not find peerDeviceId %{public}s in peerConnectDecision.", GetAnonymStr(peerDeviceId).c_str());
344             SetTimeOut(info, 0);
345             return;
346         }
347         if (!peerConnectDecision_.at(peerDeviceId).load()) {
348             HILOGE("All connect manager refuse bind to PeerDeviceId %{public}s.", GetAnonymStr(peerDeviceId).c_str());
349             peerConnectDecision_.erase(peerDeviceId);
350             SetTimeOut(info, 0);
351             return;
352         }
353         peerConnectDecision_.erase(peerDeviceId);
354     }
355 #endif
356     SetTimeOut(info, timeout);
357 }
358 
SetTimeOut(const DSchedContinueInfo &info, int32_t timeout)359 void DSchedContinueManager::SetTimeOut(const DSchedContinueInfo &info, int32_t timeout)
360 {
361     auto func = [this, info]() {
362         if (continues_.empty() || continues_.count(info) == 0) {
363             HILOGE("continue not exist.");
364             return;
365         }
366         HILOGE("continue timeout! info: %{public}s", info.toString().c_str());
367         auto dsContinue = continues_[info];
368         if (dsContinue != nullptr) {
369             dsContinue->OnContinueEnd(CONTINUE_ABILITY_TIMEOUT_ERR);
370         }
371     };
372     if (eventHandler_ == nullptr) {
373         HILOGE("eventHandler_ is nullptr");
374         return;
375     }
376     timeout > 0 ? eventHandler_->PostTask(func, info.ToStringIgnoreMissionId(), timeout) :
377         eventHandler_->PostTask(func);
378 }
379 
StartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId, int32_t callerUid, int32_t status, uint32_t accessToken)380 int32_t DSchedContinueManager::StartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId,
381     int32_t callerUid, int32_t status, uint32_t accessToken)
382 {
383     std::string dstDeviceId = want.GetElement().GetDeviceID();
384     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) {
385         HILOGE("GetDeviceInfoById fail, dstDevId: %{public}s.", GetAnonymStr(dstDeviceId).c_str());
386         return INVALID_REMOTE_PARAMETERS_ERR;
387     }
388     if (GetDSchedContinueByWant(want, missionId) == nullptr) {
389         HILOGE("GetDSchedContinueByWant fail, dstDevId: %{public}s, missionId: %{public}d.",
390             GetAnonymStr(dstDeviceId).c_str(), missionId);
391         return INVALID_REMOTE_PARAMETERS_ERR;
392     }
393 
394     auto func = [this, want, missionId, callerUid, status, accessToken]() {
395         HandleStartContinuation(want, missionId, callerUid, status, accessToken);
396     };
397     if (eventHandler_ == nullptr) {
398         HILOGE("eventHandler_ is nullptr");
399         return INVALID_PARAMETERS_ERR;
400     }
401     eventHandler_->PostTask(func);
402     return ERR_OK;
403 }
404 
HandleStartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId, int32_t callerUid, int32_t status, uint32_t accessToken)405 void DSchedContinueManager::HandleStartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId,
406     int32_t callerUid, int32_t status, uint32_t accessToken)
407 {
408     HILOGI("begin");
409     auto dContinue = GetDSchedContinueByWant(want, missionId);
410     if (dContinue != nullptr) {
411         dContinue->OnStartContinuation(want, callerUid, status, accessToken);
412     } else {
413         DmsRadar::GetInstance().SaveDataDmsRemoteWant("HandleStartContinuation", INVALID_PARAMETERS_ERR);
414     }
415     HILOGI("end");
416     return;
417 }
418 
GetDSchedContinueByWant( const OHOS::AAFwk::Want& want, int32_t missionId)419 std::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByWant(
420     const OHOS::AAFwk::Want& want, int32_t missionId)
421 {
422     std::string srcDeviceId;
423     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(srcDeviceId)) {
424         DmsRadar::GetInstance().SaveDataDmsRemoteWant("GetDSchedContinueByWant", GET_LOCAL_DEVICE_ERR);
425         HILOGE("get local deviceId failed!");
426         return nullptr;
427     }
428     std::string dstDeviceId = want.GetElement().GetDeviceID();
429     std::string bundleName = want.GetElement().GetBundleName();
430     auto info = DSchedContinueInfo(srcDeviceId, bundleName, dstDeviceId, bundleName, "");
431 
432     HILOGI("continue info: %{public}s.", info.toString().c_str());
433     {
434         std::lock_guard<std::mutex> continueLock(continueMutex_);
435         if (continues_.empty()) {
436             HILOGE("continue info doesn't match an existing continuation.");
437             return nullptr;
438         }
439         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
440             if (iter->second == nullptr) {
441                 continue;
442             }
443             DSchedContinueInfo continueInfo = iter->second->GetContinueInfo();
444             if (srcDeviceId == continueInfo.sourceDeviceId_
445                 && bundleName == continueInfo.sourceBundleName_
446                 && dstDeviceId == continueInfo.sinkDeviceId_) {
447                 return iter->second;
448             }
449         }
450     }
451     HILOGE("missionId doesn't match the existing continuation, continueInfo: %{public}s.",
452         info.toString().c_str());
453     return nullptr;
454 }
455 
NotifyCompleteContinuation(const std::u16string& devId, int32_t sessionId, bool isSuccess, const std::string &callerBundleName)456 int32_t DSchedContinueManager::NotifyCompleteContinuation(const std::u16string& devId, int32_t sessionId,
457     bool isSuccess, const std::string &callerBundleName)
458 {
459     auto func = [this, devId, sessionId, isSuccess, callerBundleName]() {
460         HandleNotifyCompleteContinuation(devId, sessionId, isSuccess, callerBundleName);
461     };
462     if (eventHandler_ == nullptr) {
463         HILOGE("eventHandler_ is nullptr");
464         return INVALID_PARAMETERS_ERR;
465     }
466     eventHandler_->PostTask(func);
467     return ERR_OK;
468 }
469 
HandleNotifyCompleteContinuation(const std::u16string& devId, int32_t missionId, bool isSuccess, const std::string &callerBundleName)470 void DSchedContinueManager::HandleNotifyCompleteContinuation(const std::u16string& devId, int32_t missionId,
471     bool isSuccess, const std::string &callerBundleName)
472 {
473     HILOGI("begin, isSuccess %{public}d", isSuccess);
474     auto dContinue = GetDSchedContinueByDevId(devId, missionId);
475     if (dContinue != nullptr) {
476         if (dContinue->GetContinueInfo().sinkBundleName_ != callerBundleName) {
477             HILOGE("callerBundleName doesn't match the existing continuation");
478             return;
479         }
480         dContinue->OnNotifyComplete(missionId, isSuccess);
481         HILOGI("end, continue info: %{public}s.", dContinue->GetContinueInfo().toString().c_str());
482     }
483     return;
484 }
485 
GetDSchedContinueByDevId( const std::u16string& devId, int32_t missionId)486 std::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByDevId(
487     const std::u16string& devId, int32_t missionId)
488 {
489     std::string deviceId = Str16ToStr8(devId);
490     HILOGI("begin, deviceId %{public}s, missionId %{public}d", GetAnonymStr(deviceId).c_str(), missionId);
491     {
492         std::lock_guard<std::mutex> continueLock(continueMutex_);
493         if (continues_.empty()) {
494             HILOGE("No continuation in progress.");
495             return nullptr;
496         }
497         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
498             if (iter->second != nullptr && deviceId == iter->second->GetContinueInfo().sourceDeviceId_) {
499                 return iter->second;
500             }
501         }
502     }
503     HILOGE("source deviceId doesn't match an existing continuation.");
504     return nullptr;
505 }
506 
NotifyTerminateContinuation(const int32_t missionId)507 void DSchedContinueManager::NotifyTerminateContinuation(const int32_t missionId)
508 {
509     HILOGI("begin, missionId %{public}d", missionId);
510     {
511         std::lock_guard<std::mutex> continueLock(continueMutex_);
512         if (continues_.empty()) {
513             HILOGW("No continuation in progress.");
514             return;
515         }
516 
517         ContinueLaunchMissionInfo missionInfo;
518         int32_t ret = DMSContinueSendMgr::GetInstance().GetContinueLaunchMissionInfo(missionId, missionInfo);
519         if (ret != ERR_OK) {
520             HILOGE("get continueLaunchMissionInfo failed, missionId %{public}d", missionId);
521             return;
522         }
523         HILOGI("alive missionInfo bundleName is %{public}s, abilityName is %{public}s",
524             missionInfo.bundleName.c_str(), missionInfo.abilityName.c_str());
525         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
526             if (iter->second == nullptr) {
527                 break;
528             }
529 
530             auto continueInfo = iter->second->GetContinueInfo();
531             HILOGI("continueInfo bundleName is %{public}s, abilityName is %{public}s",
532                 continueInfo.sinkBundleName_.c_str(), continueInfo.sinkAbilityName_.c_str());
533             if (missionInfo.bundleName == continueInfo.sinkBundleName_
534                 && missionInfo.abilityName == continueInfo.sinkAbilityName_) {
535                 HILOGE("Excute onContinueEnd");
536                 iter->second->OnContinueEnd(CONTINUE_SINK_ABILITY_TERMINATED);
537                 return;
538             }
539         }
540     }
541     HILOGW("doesn't match an existing continuation.");
542 }
543 
OnContinueEnd(const DSchedContinueInfo& info)544 int32_t DSchedContinueManager::OnContinueEnd(const DSchedContinueInfo& info)
545 {
546     auto func = [this, info]() {
547         HandleContinueEnd(info);
548     };
549     if (eventHandler_ == nullptr) {
550         HILOGE("eventHandler_ is nullptr");
551         return INVALID_PARAMETERS_ERR;
552     }
553     eventHandler_->PostTask(func);
554     return ERR_OK;
555 }
556 
HandleContinueEnd(const DSchedContinueInfo& info)557 void DSchedContinueManager::HandleContinueEnd(const DSchedContinueInfo& info)
558 {
559     HILOGI("begin, continue info: %{public}s.", info.toString().c_str());
560     std::lock_guard<std::mutex> continueLock(continueMutex_);
561     if (continues_.empty() || continues_.find(info) == continues_.end()) {
562         HILOGE("continue info doesn't match any existing continuation.");
563         return;
564     }
565     RemoveTimeout(info);
566     continues_.erase(info);
567     ContinueSceneSessionHandler::GetInstance().ClearContinueSessionId();
568 
569     std::string localDevId;
570     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
571         HILOGE("get local deviceId failed!");
572         return;
573     }
574 
575     if (info.sinkDeviceId_ == localDevId) {
576         cntSink_--;
577     } else if (info.sourceDeviceId_ == localDevId) {
578         cntSource_--;
579     }
580     HILOGI("end.");
581 }
582 
RemoveTimeout(const DSchedContinueInfo& info)583 void DSchedContinueManager::RemoveTimeout(const DSchedContinueInfo& info)
584 {
585     if (eventHandler_ == nullptr) {
586         HILOGE("eventHandler_ is nullptr");
587         return;
588     }
589     eventHandler_->RemoveTask(info.ToStringIgnoreMissionId());
590 }
591 
OnDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)592 void DSchedContinueManager::OnDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)
593 {
594     auto func = [this, sessionId, dataBuffer]() {
595         HandleDataRecv(sessionId, dataBuffer);
596     };
597     if (eventHandler_ == nullptr) {
598         HILOGE("eventHandler_ is nullptr");
599         return;
600     }
601     eventHandler_->PostTask(func);
602 }
603 
HandleDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)604 void DSchedContinueManager::HandleDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)
605 {
606     HILOGI("start, sessionId: %{public}d.", sessionId);
607     if (dataBuffer == nullptr) {
608         HILOGE("dataBuffer is null.");
609         return;
610     }
611     uint8_t *data = dataBuffer->Data();
612     std::string jsonStr(reinterpret_cast<const char *>(data), dataBuffer->Capacity());
613     cJSON *rootValue = cJSON_Parse(jsonStr.c_str());
614     if (rootValue == nullptr) {
615         HILOGE("Parse jsonStr error.");
616         return;
617     }
618     cJSON *baseCmd = cJSON_GetObjectItemCaseSensitive(rootValue, "BaseCmd");
619     if (baseCmd == nullptr || !cJSON_IsString(baseCmd) || (baseCmd->valuestring == nullptr)) {
620         cJSON_Delete(rootValue);
621         HILOGE("Parse base cmd error.");
622         return;
623     }
624 
625     cJSON *cmdValue = cJSON_Parse(baseCmd->valuestring);
626     cJSON_Delete(rootValue);
627     if (cmdValue == nullptr) {
628         HILOGE("Parse cmd value error.");
629         return;
630     }
631 
632     cJSON *comvalue = cJSON_GetObjectItemCaseSensitive(cmdValue, "Command");
633     if (comvalue == nullptr || !cJSON_IsNumber(comvalue)) {
634         cJSON_Delete(cmdValue);
635         HILOGE("parse command failed");
636         return;
637     }
638     int32_t command = comvalue->valueint;
639     cJSON_Delete(cmdValue);
640     NotifyContinueDataRecv(sessionId, command, jsonStr, dataBuffer);
641     HILOGI("end, sessionId: %{public}d.", sessionId);
642 }
643 
NotifyContinueDataRecv(int32_t sessionId, int32_t command, const std::string& jsonStr, std::shared_ptr<DSchedDataBuffer> dataBuffer)644 void DSchedContinueManager::NotifyContinueDataRecv(int32_t sessionId, int32_t command, const std::string& jsonStr,
645     std::shared_ptr<DSchedDataBuffer> dataBuffer)
646 {
647     HILOGI("start, parsed cmd %{public}d, sessionId: %{public}d.", command, sessionId);
648     std::lock_guard<std::mutex> continueLock(continueMutex_);
649     if (!continues_.empty()) {
650         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
651             if (iter->second != nullptr && sessionId == iter->second->GetSessionId()) {
652                 HILOGI("sessionId %{public}d exist.", sessionId);
653                 iter->second->OnDataRecv(command, dataBuffer);
654                 return;
655             }
656         }
657     }
658 
659     if (command == DSCHED_CONTINUE_CMD_START) {
660         HILOGI("recv start cmd, sessionId: %{public}d.", sessionId);
661         auto startCmd = std::make_shared<DSchedContinueStartCmd>();
662         int32_t ret = startCmd->Unmarshal(jsonStr);
663         if (ret != ERR_OK) {
664             HILOGE("Unmarshal start cmd failed, ret: %{public}d", ret);
665             return;
666         }
667         int32_t direction = CONTINUE_SINK;
668         ret = CheckContinuationLimit(startCmd->srcDeviceId_, startCmd->dstDeviceId_, direction);
669         if (ret != ERR_OK) {
670             DmsRadar::GetInstance().SaveDataDmsRemoteWant("NotifyContinueDataRecv", ret);
671             HILOGE("CheckContinuationSubType failed, ret: %{public}d", ret);
672             return;
673         }
674 
675         auto newContinue = std::make_shared<DSchedContinue>(startCmd, sessionId);
676         newContinue->Init();
677         continues_.insert(std::make_pair(newContinue->GetContinueInfo(), newContinue));
678 
679         newContinue->OnStartCmd(startCmd->appVersion_);
680         HILOGI("end, continue info: %{public}s.", newContinue->GetContinueInfo().toString().c_str());
681         return;
682     }
683     HILOGE("No matching session to handle cmd! sessionId: %{public}d, recv cmd %{public}d.", sessionId, command);
684     return;
685 }
686 
CheckContinuationLimit(const std::string& srcDeviceId, const std::string& dstDeviceId, int32_t &direction)687 int32_t DSchedContinueManager::CheckContinuationLimit(const std::string& srcDeviceId,
688     const std::string& dstDeviceId, int32_t &direction)
689 {
690     std::string localDevId;
691     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
692         HILOGE("get local deviceId failed!");
693         return GET_LOCAL_DEVICE_ERR;
694     }
695 
696     direction = CONTINUE_SINK;
697     if (dstDeviceId == localDevId) {
698         if (cntSink_.load() >= MAX_CONCURRENT_SINK) {
699             HILOGE("can't deal more than %{public}d pull requests at the same time.", cntSink_.load());
700             return CONTINUE_ALREADY_IN_PROGRESS;
701         }
702         if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(srcDeviceId) == nullptr) {
703             HILOGE("Irrecognized source device!");
704             return INVALID_PARAMETERS_ERR;
705         }
706     } else if (srcDeviceId == localDevId) {
707         if (cntSource_.load() >= MAX_CONCURRENT_SOURCE) {
708             HILOGE("can't deal more than %{public}d push requests at the same time.", cntSource_.load());
709             return CONTINUE_ALREADY_IN_PROGRESS;
710         }
711         if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) {
712             HILOGE("Irrecognized destination device!");
713             return INVALID_PARAMETERS_ERR;
714         }
715         direction = CONTINUE_SOURCE;
716     } else {
717         HILOGE("source or target device must be local!");
718         return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
719     }
720     return ERR_OK;
721 }
722 
GetContinueInfo(std::string &srcDeviceId, std::string &dstDeviceId)723 int32_t DSchedContinueManager::GetContinueInfo(std::string &srcDeviceId, std::string &dstDeviceId)
724 {
725     HILOGI("called");
726     std::lock_guard<std::mutex> continueLock(continueMutex_);
727     if (continues_.empty()) {
728         HILOGW("No continuation in progress.");
729         return ERR_OK;
730     }
731     auto dsContinue = continues_.begin()->second;
732     if (dsContinue == nullptr) {
733         HILOGE("dContinue is null");
734         return INVALID_PARAMETERS_ERR;
735     }
736     dstDeviceId = dsContinue->GetContinueInfo().sinkDeviceId_;
737     srcDeviceId = dsContinue->GetContinueInfo().sourceDeviceId_;
738     return ERR_OK;
739 }
740 
OnShutdown(int32_t socket, bool isSelfCalled)741 void DSchedContinueManager::OnShutdown(int32_t socket, bool isSelfCalled)
742 {
743     if (isSelfCalled) {
744         HILOGW("called, shutdown by local, sessionId: %{public}d", socket);
745         return;
746     }
747     HILOGW("called, sessionId: %{public}d, isSelfCalled %{public}d", socket, isSelfCalled);
748     auto func = [this, socket]() {
749         std::lock_guard<std::mutex> continueLock(continueMutex_);
750         if (continues_.empty()) {
751             return;
752         }
753         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
754             if (iter->second != nullptr && socket == iter->second->GetSessionId()) {
755                 iter->second->OnContinueEnd(CONTINUE_SESSION_SHUTDOWN);
756             }
757         }
758     };
759     if (eventHandler_ == nullptr) {
760         HILOGE("eventHandler_ is nullptr");
761         return;
762     }
763     eventHandler_->PostTask(func);
764     return;
765 }
766 
OnBind(int32_t socket, PeerSocketInfo info)767 void DSchedContinueManager::SoftbusListener::OnBind(int32_t socket, PeerSocketInfo info)
768 {
769 }
770 
OnShutdown(int32_t socket, bool isSelfCalled)771 void DSchedContinueManager::SoftbusListener::OnShutdown(int32_t socket, bool isSelfCalled)
772 {
773     DSchedContinueManager::GetInstance().OnShutdown(socket, isSelfCalled);
774 }
775 
OnDataRecv(int32_t socket, std::shared_ptr<DSchedDataBuffer> dataBuffer)776 void DSchedContinueManager::SoftbusListener::OnDataRecv(int32_t socket, std::shared_ptr<DSchedDataBuffer> dataBuffer)
777 {
778     DSchedContinueManager::GetInstance().OnDataRecv(socket, dataBuffer);
779 }
780 }  // namespace DistributedSchedule
781 }  // namespace OHOS
782