1686862fbSopenharmony_ci/*
2686862fbSopenharmony_ci * Copyright (c) 2024 Huawei Device Co., Ltd.
3686862fbSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4686862fbSopenharmony_ci * you may not use this file except in compliance with the License.
5686862fbSopenharmony_ci * You may obtain a copy of the License at
6686862fbSopenharmony_ci *
7686862fbSopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
8686862fbSopenharmony_ci *
9686862fbSopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10686862fbSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11686862fbSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12686862fbSopenharmony_ci * See the License for the specific language governing permissions and
13686862fbSopenharmony_ci * limitations under the License.
14686862fbSopenharmony_ci */
15686862fbSopenharmony_ci
16686862fbSopenharmony_ci#include "dsched_continue_manager.h"
17686862fbSopenharmony_ci
18686862fbSopenharmony_ci#include <chrono>
19686862fbSopenharmony_ci#include <sys/prctl.h>
20686862fbSopenharmony_ci
21686862fbSopenharmony_ci#include "cJSON.h"
22686862fbSopenharmony_ci
23686862fbSopenharmony_ci#include "continue_scene_session_handler.h"
24686862fbSopenharmony_ci#include "dfx/distributed_radar.h"
25686862fbSopenharmony_ci#include "distributed_sched_utils.h"
26686862fbSopenharmony_ci#include "dsched_transport_softbus_adapter.h"
27686862fbSopenharmony_ci#include "dtbschedmgr_device_info_storage.h"
28686862fbSopenharmony_ci#include "dtbschedmgr_log.h"
29686862fbSopenharmony_ci#include "mission/distributed_bm_storage.h"
30686862fbSopenharmony_ci#include "mission/dms_continue_send_manager.h"
31686862fbSopenharmony_ci#include "mission/dms_continue_recv_manager.h"
32686862fbSopenharmony_ci
33686862fbSopenharmony_cinamespace OHOS {
34686862fbSopenharmony_cinamespace DistributedSchedule {
35686862fbSopenharmony_cinamespace {
36686862fbSopenharmony_ciconst std::string TAG = "DSchedContinueManager";
37686862fbSopenharmony_ciconst std::string DSCHED_CONTINUE_MANAGER = "dsched_continue_manager";
38686862fbSopenharmony_ciconst std::string CONTINUE_TIMEOUT_TASK = "continue_timeout_task";
39686862fbSopenharmony_ci}
40686862fbSopenharmony_ci
41686862fbSopenharmony_ciIMPLEMENT_SINGLE_INSTANCE(DSchedContinueManager);
42686862fbSopenharmony_ci
43686862fbSopenharmony_ciDSchedContinueManager::DSchedContinueManager()
44686862fbSopenharmony_ci{
45686862fbSopenharmony_ci}
46686862fbSopenharmony_ci
47686862fbSopenharmony_ciDSchedContinueManager::~DSchedContinueManager()
48686862fbSopenharmony_ci{
49686862fbSopenharmony_ci    HILOGI("DSchedContinueManager delete");
50686862fbSopenharmony_ci    UnInit();
51686862fbSopenharmony_ci}
52686862fbSopenharmony_ci
53686862fbSopenharmony_civoid DSchedContinueManager::Init()
54686862fbSopenharmony_ci{
55686862fbSopenharmony_ci    HILOGI("Init DSchedContinueManager start");
56686862fbSopenharmony_ci    if (eventHandler_ != nullptr) {
57686862fbSopenharmony_ci        HILOGI("DSchedContinueManager already inited, end.");
58686862fbSopenharmony_ci        return;
59686862fbSopenharmony_ci    }
60686862fbSopenharmony_ci    DSchedTransportSoftbusAdapter::GetInstance().InitChannel();
61686862fbSopenharmony_ci    softbusListener_ = std::make_shared<DSchedContinueManager::SoftbusListener>();
62686862fbSopenharmony_ci    DSchedTransportSoftbusAdapter::GetInstance().RegisterListener(SERVICE_TYPE_CONTINUE, softbusListener_);
63686862fbSopenharmony_ci    eventThread_ = std::thread(&DSchedContinueManager::StartEvent, this);
64686862fbSopenharmony_ci    std::unique_lock<std::mutex> lock(eventMutex_);
65686862fbSopenharmony_ci    eventCon_.wait(lock, [this] {
66686862fbSopenharmony_ci        return eventHandler_ != nullptr;
67686862fbSopenharmony_ci    });
68686862fbSopenharmony_ci    HILOGI("Init DSchedContinueManager end");
69686862fbSopenharmony_ci}
70686862fbSopenharmony_ci
71686862fbSopenharmony_civoid DSchedContinueManager::StartEvent()
72686862fbSopenharmony_ci{
73686862fbSopenharmony_ci    HILOGI("StartEvent start");
74686862fbSopenharmony_ci    prctl(PR_SET_NAME, DSCHED_CONTINUE_MANAGER.c_str());
75686862fbSopenharmony_ci    auto runner = AppExecFwk::EventRunner::Create(false);
76686862fbSopenharmony_ci    {
77686862fbSopenharmony_ci        std::lock_guard<std::mutex> lock(eventMutex_);
78686862fbSopenharmony_ci        eventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
79686862fbSopenharmony_ci    }
80686862fbSopenharmony_ci    eventCon_.notify_one();
81686862fbSopenharmony_ci    runner->Run();
82686862fbSopenharmony_ci    HILOGI("StartEvent end");
83686862fbSopenharmony_ci}
84686862fbSopenharmony_ci
85686862fbSopenharmony_civoid DSchedContinueManager::UnInit()
86686862fbSopenharmony_ci{
87686862fbSopenharmony_ci    HILOGI("UnInit start");
88686862fbSopenharmony_ci    DSchedTransportSoftbusAdapter::GetInstance().UnregisterListener(SERVICE_TYPE_CONTINUE, softbusListener_);
89686862fbSopenharmony_ci    DSchedTransportSoftbusAdapter::GetInstance().ReleaseChannel();
90686862fbSopenharmony_ci    continues_.clear();
91686862fbSopenharmony_ci    cntSink_ = 0;
92686862fbSopenharmony_ci    cntSource_ = 0;
93686862fbSopenharmony_ci
94686862fbSopenharmony_ci    if (eventHandler_ != nullptr) {
95686862fbSopenharmony_ci        eventHandler_->GetEventRunner()->Stop();
96686862fbSopenharmony_ci        eventThread_.join();
97686862fbSopenharmony_ci        eventHandler_ = nullptr;
98686862fbSopenharmony_ci    } else {
99686862fbSopenharmony_ci        HILOGE("eventHandler_ is nullptr");
100686862fbSopenharmony_ci    }
101686862fbSopenharmony_ci    HILOGI("UnInit end");
102686862fbSopenharmony_ci}
103686862fbSopenharmony_ci
104686862fbSopenharmony_civoid DSchedContinueManager::NotifyAllConnectDecision(std::string peerDeviceId, bool isSupport)
105686862fbSopenharmony_ci{
106686862fbSopenharmony_ci    HILOGI("Notify all connect decision, peerDeviceId %{public}s, isSupport %{public}d.",
107686862fbSopenharmony_ci        GetAnonymStr(peerDeviceId).c_str(), isSupport);
108686862fbSopenharmony_ci#ifdef DMSFWK_ALL_CONNECT_MGR
109686862fbSopenharmony_ci    std::lock_guard<std::mutex> decisionLock(connectDecisionMutex_);
110686862fbSopenharmony_ci    peerConnectDecision_[peerDeviceId] = isSupport;
111686862fbSopenharmony_ci    connectDecisionCond_.notify_all();
112686862fbSopenharmony_ci#endif
113686862fbSopenharmony_ci}
114686862fbSopenharmony_ci
115686862fbSopenharmony_ciint32_t DSchedContinueManager::ContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
116686862fbSopenharmony_ci    int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
117686862fbSopenharmony_ci{
118686862fbSopenharmony_ci    if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
119686862fbSopenharmony_ci        HILOGE("srcDeviceId or dstDeviceId or callback is null!");
120686862fbSopenharmony_ci        return INVALID_PARAMETERS_ERR;
121686862fbSopenharmony_ci    }
122686862fbSopenharmony_ci
123686862fbSopenharmony_ci    std::string localDevId;
124686862fbSopenharmony_ci    if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
125686862fbSopenharmony_ci        HILOGE("get local deviceId failed!");
126686862fbSopenharmony_ci        return INVALID_PARAMETERS_ERR;
127686862fbSopenharmony_ci    }
128686862fbSopenharmony_ci    if (localDevId != srcDeviceId && localDevId != dstDeviceId) {
129686862fbSopenharmony_ci        HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.",
130686862fbSopenharmony_ci            GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str());
131686862fbSopenharmony_ci        return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
132686862fbSopenharmony_ci    }
133686862fbSopenharmony_ci    if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(
134686862fbSopenharmony_ci        localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) {
135686862fbSopenharmony_ci        HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.",
136686862fbSopenharmony_ci            GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str());
137686862fbSopenharmony_ci        return INVALID_REMOTE_PARAMETERS_ERR;
138686862fbSopenharmony_ci    }
139686862fbSopenharmony_ci
140686862fbSopenharmony_ci    auto func = [this, srcDeviceId, dstDeviceId, missionId, callback, wantParams]() {
141686862fbSopenharmony_ci        HandleContinueMission(srcDeviceId, dstDeviceId, missionId, callback, wantParams);
142686862fbSopenharmony_ci    };
143686862fbSopenharmony_ci    if (eventHandler_ == nullptr) {
144686862fbSopenharmony_ci        HILOGE("eventHandler_ is nullptr");
145686862fbSopenharmony_ci        return INVALID_PARAMETERS_ERR;
146686862fbSopenharmony_ci    }
147686862fbSopenharmony_ci    eventHandler_->PostTask(func);
148686862fbSopenharmony_ci    return ERR_OK;
149686862fbSopenharmony_ci}
150686862fbSopenharmony_ci
151686862fbSopenharmony_civoid DSchedContinueManager::HandleContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
152686862fbSopenharmony_ci    int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
153686862fbSopenharmony_ci{
154686862fbSopenharmony_ci    HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. missionId: %{public}d.",
155686862fbSopenharmony_ci        GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), missionId);
156686862fbSopenharmony_ci
157686862fbSopenharmony_ci    if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
158686862fbSopenharmony_ci        HILOGE("srcDeviceId or dstDeviceId or callback is null!");
159686862fbSopenharmony_ci        return;
160686862fbSopenharmony_ci    }
161686862fbSopenharmony_ci
162686862fbSopenharmony_ci    std::string localDevId;
163686862fbSopenharmony_ci    if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
164686862fbSopenharmony_ci        HILOGE("get local deviceId failed!");
165686862fbSopenharmony_ci        return;
166686862fbSopenharmony_ci    }
167686862fbSopenharmony_ci    DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, dstDeviceId, missionId);
168686862fbSopenharmony_ci
169686862fbSopenharmony_ci    AAFwk::MissionInfo missionInfo;
170686862fbSopenharmony_ci    if (AAFwk::AbilityManagerClient::GetInstance()->GetMissionInfo("", missionId, missionInfo) == ERR_OK
171686862fbSopenharmony_ci        && srcDeviceId == localDevId) {
172686862fbSopenharmony_ci        info.sourceBundleName_ = missionInfo.want.GetBundle();
173686862fbSopenharmony_ci        info.sinkBundleName_ = missionInfo.want.GetBundle();
174686862fbSopenharmony_ci    }
175686862fbSopenharmony_ci
176686862fbSopenharmony_ci    HandleContinueMissionWithBundleName(info, callback, wantParams);
177686862fbSopenharmony_ci    return;
178686862fbSopenharmony_ci}
179686862fbSopenharmony_ci
180686862fbSopenharmony_ciint32_t DSchedContinueManager::ContinueMission(const DSchedContinueInfo& continueInfo,
181686862fbSopenharmony_ci    const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
182686862fbSopenharmony_ci{
183686862fbSopenharmony_ci    std::string srcDeviceId = continueInfo.sourceDeviceId_;
184686862fbSopenharmony_ci    std::string dstDeviceId = continueInfo.sinkDeviceId_;
185686862fbSopenharmony_ci
186686862fbSopenharmony_ci    if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
187686862fbSopenharmony_ci        HILOGE("srcDeviceId or dstDeviceId or callback is null!");
188686862fbSopenharmony_ci        return INVALID_PARAMETERS_ERR;
189686862fbSopenharmony_ci    }
190686862fbSopenharmony_ci
191686862fbSopenharmony_ci    std::string localDevId;
192686862fbSopenharmony_ci    if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
193686862fbSopenharmony_ci        HILOGE("get local deviceId failed!");
194686862fbSopenharmony_ci        return INVALID_PARAMETERS_ERR;
195686862fbSopenharmony_ci    }
196686862fbSopenharmony_ci    if (localDevId != srcDeviceId && localDevId != dstDeviceId) {
197686862fbSopenharmony_ci        HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.",
198686862fbSopenharmony_ci            GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str());
199686862fbSopenharmony_ci        return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
200686862fbSopenharmony_ci    }
201686862fbSopenharmony_ci    if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(
202686862fbSopenharmony_ci        localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) {
203686862fbSopenharmony_ci        HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.",
204686862fbSopenharmony_ci            GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str());
205686862fbSopenharmony_ci        return INVALID_REMOTE_PARAMETERS_ERR;
206686862fbSopenharmony_ci    }
207686862fbSopenharmony_ci
208686862fbSopenharmony_ci#ifdef SUPPORT_DISTRIBUTED_MISSION_MANAGER
209686862fbSopenharmony_ci    if (localDevId == srcDeviceId) {
210686862fbSopenharmony_ci        int32_t missionId = -1;
211686862fbSopenharmony_ci        int32_t ret = DMSContinueSendMgr::GetInstance().GetMissionIdByBundleName(
212686862fbSopenharmony_ci            continueInfo.sinkBundleName_, missionId);
213686862fbSopenharmony_ci        if (ret != ERR_OK) {
214686862fbSopenharmony_ci            HILOGE("get missionId fail, ret %{public}d.", ret);
215686862fbSopenharmony_ci            return INVALID_PARAMETERS_ERR;
216686862fbSopenharmony_ci        }
217686862fbSopenharmony_ci    }
218686862fbSopenharmony_ci#endif
219686862fbSopenharmony_ci
220686862fbSopenharmony_ci    auto func = [this, continueInfo, callback, wantParams]() {
221686862fbSopenharmony_ci        HandleContinueMission(continueInfo, callback, wantParams);
222686862fbSopenharmony_ci    };
223686862fbSopenharmony_ci    if (eventHandler_ == nullptr) {
224686862fbSopenharmony_ci        HILOGE("eventHandler_ is nullptr");
225686862fbSopenharmony_ci        return INVALID_PARAMETERS_ERR;
226686862fbSopenharmony_ci    }
227686862fbSopenharmony_ci    eventHandler_->PostTask(func);
228686862fbSopenharmony_ci    return ERR_OK;
229686862fbSopenharmony_ci}
230686862fbSopenharmony_ci
231686862fbSopenharmony_civoid DSchedContinueManager::HandleContinueMission(const DSchedContinueInfo& continueInfo,
232686862fbSopenharmony_ci    const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
233686862fbSopenharmony_ci{
234686862fbSopenharmony_ci    std::string srcDeviceId = continueInfo.sourceDeviceId_;
235686862fbSopenharmony_ci    std::string dstDeviceId = continueInfo.sinkDeviceId_;
236686862fbSopenharmony_ci    std::string srcBundleName = continueInfo.sourceBundleName_;
237686862fbSopenharmony_ci    std::string bundleName = continueInfo.sinkBundleName_;
238686862fbSopenharmony_ci    std::string continueType = continueInfo.continueType_;
239686862fbSopenharmony_ci    HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. bundleName: %{public}s."
240686862fbSopenharmony_ci        " continueType: %{public}s.", GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(),
241686862fbSopenharmony_ci        bundleName.c_str(), continueType.c_str());
242686862fbSopenharmony_ci
243686862fbSopenharmony_ci    if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
244686862fbSopenharmony_ci        HILOGE("srcDeviceId or dstDeviceId or callback is null!");
245686862fbSopenharmony_ci        return;
246686862fbSopenharmony_ci    }
247686862fbSopenharmony_ci
248686862fbSopenharmony_ci    DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, srcBundleName, dstDeviceId, bundleName, continueType);
249686862fbSopenharmony_ci    HandleContinueMissionWithBundleName(info, callback, wantParams);
250686862fbSopenharmony_ci    return;
251686862fbSopenharmony_ci}
252686862fbSopenharmony_ci
253686862fbSopenharmony_cibool DSchedContinueManager::GetFirstBundleName(DSchedContinueInfo &info, std::string &firstBundleName,
254686862fbSopenharmony_ci    std::string bundleName, std::string deviceId)
255686862fbSopenharmony_ci{
256686862fbSopenharmony_ci    uint16_t bundleNameId;
257686862fbSopenharmony_ci    DmsBundleInfo distributedBundleInfo;
258686862fbSopenharmony_ci    DmsBmStorage::GetInstance()->GetBundleNameId(bundleName, bundleNameId);
259686862fbSopenharmony_ci    bool result = DmsBmStorage::GetInstance()->GetDistributedBundleInfo(deviceId,
260686862fbSopenharmony_ci        bundleNameId, distributedBundleInfo);
261686862fbSopenharmony_ci    if (!result) {
262686862fbSopenharmony_ci        HILOGE("GetDistributedBundleInfo faild");
263686862fbSopenharmony_ci        return false;
264686862fbSopenharmony_ci    }
265686862fbSopenharmony_ci    std::vector<DmsAbilityInfo> dmsAbilityInfos = distributedBundleInfo.dmsAbilityInfos;
266686862fbSopenharmony_ci    for (DmsAbilityInfo &ability: dmsAbilityInfos) {
267686862fbSopenharmony_ci        std::vector<std::string> abilityContinueTypes = ability.continueType;
268686862fbSopenharmony_ci        for (std::string &ability_continue_type: abilityContinueTypes) {
269686862fbSopenharmony_ci            if (ability_continue_type == info.continueType_ && !ability.continueBundleName.empty()) {
270686862fbSopenharmony_ci                firstBundleName = *ability.continueBundleName.begin();
271686862fbSopenharmony_ci                return true;
272686862fbSopenharmony_ci            }
273686862fbSopenharmony_ci        }
274686862fbSopenharmony_ci    }
275686862fbSopenharmony_ci    HILOGE("can not get abilicy info or continue bundle names is empty for continue type:%{public}s",
276686862fbSopenharmony_ci           info.continueType_.c_str());
277686862fbSopenharmony_ci    return false;
278686862fbSopenharmony_ci}
279686862fbSopenharmony_ci
280686862fbSopenharmony_civoid DSchedContinueManager::HandleContinueMissionWithBundleName(DSchedContinueInfo &info,
281686862fbSopenharmony_ci    const sptr<IRemoteObject> &callback, const OHOS::AAFwk::WantParams &wantParams)
282686862fbSopenharmony_ci{
283686862fbSopenharmony_ci    int32_t direction = CONTINUE_SINK;
284686862fbSopenharmony_ci    int32_t ret = CheckContinuationLimit(info.sourceDeviceId_, info.sinkDeviceId_, direction);
285686862fbSopenharmony_ci    if (ret != ERR_OK) {
286686862fbSopenharmony_ci        HILOGE("CheckContinuationLimit failed, ret: %{public}d", ret);
287686862fbSopenharmony_ci        return;
288686862fbSopenharmony_ci    }
289686862fbSopenharmony_ci    int32_t subType = CONTINUE_PUSH;
290686862fbSopenharmony_ci    if (direction == CONTINUE_SOURCE) {
291686862fbSopenharmony_ci        cntSource_++;
292686862fbSopenharmony_ci    } else {
293686862fbSopenharmony_ci        cntSink_++;
294686862fbSopenharmony_ci        subType = CONTINUE_PULL;
295686862fbSopenharmony_ci        if (info.sourceBundleName_.empty()) {
296686862fbSopenharmony_ci            HILOGW("current sub type is continue pull; but can not get source bundle name from recv cache.");
297686862fbSopenharmony_ci            std::string firstBundleNamme;
298686862fbSopenharmony_ci            std::string bundleName = info.sinkBundleName_;
299686862fbSopenharmony_ci            std::string deviceId = info.sinkDeviceId_;
300686862fbSopenharmony_ci            if (GetFirstBundleName(info, firstBundleNamme, bundleName, deviceId)) {
301686862fbSopenharmony_ci                info.sourceBundleName_ = firstBundleNamme;
302686862fbSopenharmony_ci            }
303686862fbSopenharmony_ci        }
304686862fbSopenharmony_ci    }
305686862fbSopenharmony_ci    {
306686862fbSopenharmony_ci        std::lock_guard<std::mutex> continueLock(continueMutex_);
307686862fbSopenharmony_ci        if (!continues_.empty() && continues_.find(info) != continues_.end()) {
308686862fbSopenharmony_ci            HILOGE("a same continue task is already in progress.");
309686862fbSopenharmony_ci            return;
310686862fbSopenharmony_ci        }
311686862fbSopenharmony_ci        auto newContinue = std::make_shared<DSchedContinue>(subType, direction, callback, info);
312686862fbSopenharmony_ci        newContinue->Init();
313686862fbSopenharmony_ci        continues_.insert(std::make_pair(info, newContinue));
314686862fbSopenharmony_ci#ifdef DMSFWK_ALL_CONNECT_MGR
315686862fbSopenharmony_ci        {
316686862fbSopenharmony_ci            std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
317686862fbSopenharmony_ci            std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
318686862fbSopenharmony_ci            if (peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end()) {
319686862fbSopenharmony_ci                peerConnectDecision_.erase(peerDeviceId);
320686862fbSopenharmony_ci            }
321686862fbSopenharmony_ci        }
322686862fbSopenharmony_ci#endif
323686862fbSopenharmony_ci        newContinue->OnContinueMission(wantParams);
324686862fbSopenharmony_ci    }
325686862fbSopenharmony_ci    WaitAllConnectDecision(direction, info, CONTINUE_TIMEOUT);
326686862fbSopenharmony_ci    HILOGI("end, subType: %{public}d dirction: %{public}d, continue info: %{public}s",
327686862fbSopenharmony_ci        subType, direction, info.toString().c_str());
328686862fbSopenharmony_ci}
329686862fbSopenharmony_ci
330686862fbSopenharmony_civoid DSchedContinueManager::WaitAllConnectDecision(int32_t direction, const DSchedContinueInfo &info, int32_t timeout)
331686862fbSopenharmony_ci{
332686862fbSopenharmony_ci#ifdef DMSFWK_ALL_CONNECT_MGR
333686862fbSopenharmony_ci    std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
334686862fbSopenharmony_ci    {
335686862fbSopenharmony_ci        std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
336686862fbSopenharmony_ci        connectDecisionCond_.wait_for(decisionLock, std::chrono::seconds(CONNECT_DECISION_WAIT_S),
337686862fbSopenharmony_ci            [this, peerDeviceId]() {
338686862fbSopenharmony_ci                return peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end() &&
339686862fbSopenharmony_ci                    peerConnectDecision_.at(peerDeviceId).load();
340686862fbSopenharmony_ci            });
341686862fbSopenharmony_ci
342686862fbSopenharmony_ci        if (peerConnectDecision_.find(peerDeviceId) == peerConnectDecision_.end()) {
343686862fbSopenharmony_ci            HILOGE("Not find peerDeviceId %{public}s in peerConnectDecision.", GetAnonymStr(peerDeviceId).c_str());
344686862fbSopenharmony_ci            SetTimeOut(info, 0);
345686862fbSopenharmony_ci            return;
346686862fbSopenharmony_ci        }
347686862fbSopenharmony_ci        if (!peerConnectDecision_.at(peerDeviceId).load()) {
348686862fbSopenharmony_ci            HILOGE("All connect manager refuse bind to PeerDeviceId %{public}s.", GetAnonymStr(peerDeviceId).c_str());
349686862fbSopenharmony_ci            peerConnectDecision_.erase(peerDeviceId);
350686862fbSopenharmony_ci            SetTimeOut(info, 0);
351686862fbSopenharmony_ci            return;
352686862fbSopenharmony_ci        }
353686862fbSopenharmony_ci        peerConnectDecision_.erase(peerDeviceId);
354686862fbSopenharmony_ci    }
355686862fbSopenharmony_ci#endif
356686862fbSopenharmony_ci    SetTimeOut(info, timeout);
357686862fbSopenharmony_ci}
358686862fbSopenharmony_ci
359686862fbSopenharmony_civoid DSchedContinueManager::SetTimeOut(const DSchedContinueInfo &info, int32_t timeout)
360686862fbSopenharmony_ci{
361686862fbSopenharmony_ci    auto func = [this, info]() {
362686862fbSopenharmony_ci        if (continues_.empty() || continues_.count(info) == 0) {
363686862fbSopenharmony_ci            HILOGE("continue not exist.");
364686862fbSopenharmony_ci            return;
365686862fbSopenharmony_ci        }
366686862fbSopenharmony_ci        HILOGE("continue timeout! info: %{public}s", info.toString().c_str());
367686862fbSopenharmony_ci        auto dsContinue = continues_[info];
368686862fbSopenharmony_ci        if (dsContinue != nullptr) {
369686862fbSopenharmony_ci            dsContinue->OnContinueEnd(CONTINUE_ABILITY_TIMEOUT_ERR);
370686862fbSopenharmony_ci        }
371686862fbSopenharmony_ci    };
372686862fbSopenharmony_ci    if (eventHandler_ == nullptr) {
373686862fbSopenharmony_ci        HILOGE("eventHandler_ is nullptr");
374686862fbSopenharmony_ci        return;
375686862fbSopenharmony_ci    }
376686862fbSopenharmony_ci    timeout > 0 ? eventHandler_->PostTask(func, info.ToStringIgnoreMissionId(), timeout) :
377686862fbSopenharmony_ci        eventHandler_->PostTask(func);
378686862fbSopenharmony_ci}
379686862fbSopenharmony_ci
380686862fbSopenharmony_ciint32_t DSchedContinueManager::StartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId,
381686862fbSopenharmony_ci    int32_t callerUid, int32_t status, uint32_t accessToken)
382686862fbSopenharmony_ci{
383686862fbSopenharmony_ci    std::string dstDeviceId = want.GetElement().GetDeviceID();
384686862fbSopenharmony_ci    if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) {
385686862fbSopenharmony_ci        HILOGE("GetDeviceInfoById fail, dstDevId: %{public}s.", GetAnonymStr(dstDeviceId).c_str());
386686862fbSopenharmony_ci        return INVALID_REMOTE_PARAMETERS_ERR;
387686862fbSopenharmony_ci    }
388686862fbSopenharmony_ci    if (GetDSchedContinueByWant(want, missionId) == nullptr) {
389686862fbSopenharmony_ci        HILOGE("GetDSchedContinueByWant fail, dstDevId: %{public}s, missionId: %{public}d.",
390686862fbSopenharmony_ci            GetAnonymStr(dstDeviceId).c_str(), missionId);
391686862fbSopenharmony_ci        return INVALID_REMOTE_PARAMETERS_ERR;
392686862fbSopenharmony_ci    }
393686862fbSopenharmony_ci
394686862fbSopenharmony_ci    auto func = [this, want, missionId, callerUid, status, accessToken]() {
395686862fbSopenharmony_ci        HandleStartContinuation(want, missionId, callerUid, status, accessToken);
396686862fbSopenharmony_ci    };
397686862fbSopenharmony_ci    if (eventHandler_ == nullptr) {
398686862fbSopenharmony_ci        HILOGE("eventHandler_ is nullptr");
399686862fbSopenharmony_ci        return INVALID_PARAMETERS_ERR;
400686862fbSopenharmony_ci    }
401686862fbSopenharmony_ci    eventHandler_->PostTask(func);
402686862fbSopenharmony_ci    return ERR_OK;
403686862fbSopenharmony_ci}
404686862fbSopenharmony_ci
405686862fbSopenharmony_civoid DSchedContinueManager::HandleStartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId,
406686862fbSopenharmony_ci    int32_t callerUid, int32_t status, uint32_t accessToken)
407686862fbSopenharmony_ci{
408686862fbSopenharmony_ci    HILOGI("begin");
409686862fbSopenharmony_ci    auto dContinue = GetDSchedContinueByWant(want, missionId);
410686862fbSopenharmony_ci    if (dContinue != nullptr) {
411686862fbSopenharmony_ci        dContinue->OnStartContinuation(want, callerUid, status, accessToken);
412686862fbSopenharmony_ci    } else {
413686862fbSopenharmony_ci        DmsRadar::GetInstance().SaveDataDmsRemoteWant("HandleStartContinuation", INVALID_PARAMETERS_ERR);
414686862fbSopenharmony_ci    }
415686862fbSopenharmony_ci    HILOGI("end");
416686862fbSopenharmony_ci    return;
417686862fbSopenharmony_ci}
418686862fbSopenharmony_ci
419686862fbSopenharmony_cistd::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByWant(
420686862fbSopenharmony_ci    const OHOS::AAFwk::Want& want, int32_t missionId)
421686862fbSopenharmony_ci{
422686862fbSopenharmony_ci    std::string srcDeviceId;
423686862fbSopenharmony_ci    if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(srcDeviceId)) {
424686862fbSopenharmony_ci        DmsRadar::GetInstance().SaveDataDmsRemoteWant("GetDSchedContinueByWant", GET_LOCAL_DEVICE_ERR);
425686862fbSopenharmony_ci        HILOGE("get local deviceId failed!");
426686862fbSopenharmony_ci        return nullptr;
427686862fbSopenharmony_ci    }
428686862fbSopenharmony_ci    std::string dstDeviceId = want.GetElement().GetDeviceID();
429686862fbSopenharmony_ci    std::string bundleName = want.GetElement().GetBundleName();
430686862fbSopenharmony_ci    auto info = DSchedContinueInfo(srcDeviceId, bundleName, dstDeviceId, bundleName, "");
431686862fbSopenharmony_ci
432686862fbSopenharmony_ci    HILOGI("continue info: %{public}s.", info.toString().c_str());
433686862fbSopenharmony_ci    {
434686862fbSopenharmony_ci        std::lock_guard<std::mutex> continueLock(continueMutex_);
435686862fbSopenharmony_ci        if (continues_.empty()) {
436686862fbSopenharmony_ci            HILOGE("continue info doesn't match an existing continuation.");
437686862fbSopenharmony_ci            return nullptr;
438686862fbSopenharmony_ci        }
439686862fbSopenharmony_ci        for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
440686862fbSopenharmony_ci            if (iter->second == nullptr) {
441686862fbSopenharmony_ci                continue;
442686862fbSopenharmony_ci            }
443686862fbSopenharmony_ci            DSchedContinueInfo continueInfo = iter->second->GetContinueInfo();
444686862fbSopenharmony_ci            if (srcDeviceId == continueInfo.sourceDeviceId_
445686862fbSopenharmony_ci                && bundleName == continueInfo.sourceBundleName_
446686862fbSopenharmony_ci                && dstDeviceId == continueInfo.sinkDeviceId_) {
447686862fbSopenharmony_ci                return iter->second;
448686862fbSopenharmony_ci            }
449686862fbSopenharmony_ci        }
450686862fbSopenharmony_ci    }
451686862fbSopenharmony_ci    HILOGE("missionId doesn't match the existing continuation, continueInfo: %{public}s.",
452686862fbSopenharmony_ci        info.toString().c_str());
453686862fbSopenharmony_ci    return nullptr;
454686862fbSopenharmony_ci}
455686862fbSopenharmony_ci
456686862fbSopenharmony_ciint32_t DSchedContinueManager::NotifyCompleteContinuation(const std::u16string& devId, int32_t sessionId,
457686862fbSopenharmony_ci    bool isSuccess, const std::string &callerBundleName)
458686862fbSopenharmony_ci{
459686862fbSopenharmony_ci    auto func = [this, devId, sessionId, isSuccess, callerBundleName]() {
460686862fbSopenharmony_ci        HandleNotifyCompleteContinuation(devId, sessionId, isSuccess, callerBundleName);
461686862fbSopenharmony_ci    };
462686862fbSopenharmony_ci    if (eventHandler_ == nullptr) {
463686862fbSopenharmony_ci        HILOGE("eventHandler_ is nullptr");
464686862fbSopenharmony_ci        return INVALID_PARAMETERS_ERR;
465686862fbSopenharmony_ci    }
466686862fbSopenharmony_ci    eventHandler_->PostTask(func);
467686862fbSopenharmony_ci    return ERR_OK;
468686862fbSopenharmony_ci}
469686862fbSopenharmony_ci
470686862fbSopenharmony_civoid DSchedContinueManager::HandleNotifyCompleteContinuation(const std::u16string& devId, int32_t missionId,
471686862fbSopenharmony_ci    bool isSuccess, const std::string &callerBundleName)
472686862fbSopenharmony_ci{
473686862fbSopenharmony_ci    HILOGI("begin, isSuccess %{public}d", isSuccess);
474686862fbSopenharmony_ci    auto dContinue = GetDSchedContinueByDevId(devId, missionId);
475686862fbSopenharmony_ci    if (dContinue != nullptr) {
476686862fbSopenharmony_ci        if (dContinue->GetContinueInfo().sinkBundleName_ != callerBundleName) {
477686862fbSopenharmony_ci            HILOGE("callerBundleName doesn't match the existing continuation");
478686862fbSopenharmony_ci            return;
479686862fbSopenharmony_ci        }
480686862fbSopenharmony_ci        dContinue->OnNotifyComplete(missionId, isSuccess);
481686862fbSopenharmony_ci        HILOGI("end, continue info: %{public}s.", dContinue->GetContinueInfo().toString().c_str());
482686862fbSopenharmony_ci    }
483686862fbSopenharmony_ci    return;
484686862fbSopenharmony_ci}
485686862fbSopenharmony_ci
486686862fbSopenharmony_cistd::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByDevId(
487686862fbSopenharmony_ci    const std::u16string& devId, int32_t missionId)
488686862fbSopenharmony_ci{
489686862fbSopenharmony_ci    std::string deviceId = Str16ToStr8(devId);
490686862fbSopenharmony_ci    HILOGI("begin, deviceId %{public}s, missionId %{public}d", GetAnonymStr(deviceId).c_str(), missionId);
491686862fbSopenharmony_ci    {
492686862fbSopenharmony_ci        std::lock_guard<std::mutex> continueLock(continueMutex_);
493686862fbSopenharmony_ci        if (continues_.empty()) {
494686862fbSopenharmony_ci            HILOGE("No continuation in progress.");
495686862fbSopenharmony_ci            return nullptr;
496686862fbSopenharmony_ci        }
497686862fbSopenharmony_ci        for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
498686862fbSopenharmony_ci            if (iter->second != nullptr && deviceId == iter->second->GetContinueInfo().sourceDeviceId_) {
499686862fbSopenharmony_ci                return iter->second;
500686862fbSopenharmony_ci            }
501686862fbSopenharmony_ci        }
502686862fbSopenharmony_ci    }
503686862fbSopenharmony_ci    HILOGE("source deviceId doesn't match an existing continuation.");
504686862fbSopenharmony_ci    return nullptr;
505686862fbSopenharmony_ci}
506686862fbSopenharmony_ci
507686862fbSopenharmony_civoid DSchedContinueManager::NotifyTerminateContinuation(const int32_t missionId)
508686862fbSopenharmony_ci{
509686862fbSopenharmony_ci    HILOGI("begin, missionId %{public}d", missionId);
510686862fbSopenharmony_ci    {
511686862fbSopenharmony_ci        std::lock_guard<std::mutex> continueLock(continueMutex_);
512686862fbSopenharmony_ci        if (continues_.empty()) {
513686862fbSopenharmony_ci            HILOGW("No continuation in progress.");
514686862fbSopenharmony_ci            return;
515686862fbSopenharmony_ci        }
516686862fbSopenharmony_ci
517686862fbSopenharmony_ci        ContinueLaunchMissionInfo missionInfo;
518686862fbSopenharmony_ci        int32_t ret = DMSContinueSendMgr::GetInstance().GetContinueLaunchMissionInfo(missionId, missionInfo);
519686862fbSopenharmony_ci        if (ret != ERR_OK) {
520686862fbSopenharmony_ci            HILOGE("get continueLaunchMissionInfo failed, missionId %{public}d", missionId);
521686862fbSopenharmony_ci            return;
522686862fbSopenharmony_ci        }
523686862fbSopenharmony_ci        HILOGI("alive missionInfo bundleName is %{public}s, abilityName is %{public}s",
524686862fbSopenharmony_ci            missionInfo.bundleName.c_str(), missionInfo.abilityName.c_str());
525686862fbSopenharmony_ci        for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
526686862fbSopenharmony_ci            if (iter->second == nullptr) {
527686862fbSopenharmony_ci                break;
528686862fbSopenharmony_ci            }
529686862fbSopenharmony_ci
530686862fbSopenharmony_ci            auto continueInfo = iter->second->GetContinueInfo();
531686862fbSopenharmony_ci            HILOGI("continueInfo bundleName is %{public}s, abilityName is %{public}s",
532686862fbSopenharmony_ci                continueInfo.sinkBundleName_.c_str(), continueInfo.sinkAbilityName_.c_str());
533686862fbSopenharmony_ci            if (missionInfo.bundleName == continueInfo.sinkBundleName_
534686862fbSopenharmony_ci                && missionInfo.abilityName == continueInfo.sinkAbilityName_) {
535686862fbSopenharmony_ci                HILOGE("Excute onContinueEnd");
536686862fbSopenharmony_ci                iter->second->OnContinueEnd(CONTINUE_SINK_ABILITY_TERMINATED);
537686862fbSopenharmony_ci                return;
538686862fbSopenharmony_ci            }
539686862fbSopenharmony_ci        }
540686862fbSopenharmony_ci    }
541686862fbSopenharmony_ci    HILOGW("doesn't match an existing continuation.");
542686862fbSopenharmony_ci}
543686862fbSopenharmony_ci
544686862fbSopenharmony_ciint32_t DSchedContinueManager::OnContinueEnd(const DSchedContinueInfo& info)
545686862fbSopenharmony_ci{
546686862fbSopenharmony_ci    auto func = [this, info]() {
547686862fbSopenharmony_ci        HandleContinueEnd(info);
548686862fbSopenharmony_ci    };
549686862fbSopenharmony_ci    if (eventHandler_ == nullptr) {
550686862fbSopenharmony_ci        HILOGE("eventHandler_ is nullptr");
551686862fbSopenharmony_ci        return INVALID_PARAMETERS_ERR;
552686862fbSopenharmony_ci    }
553686862fbSopenharmony_ci    eventHandler_->PostTask(func);
554686862fbSopenharmony_ci    return ERR_OK;
555686862fbSopenharmony_ci}
556686862fbSopenharmony_ci
557686862fbSopenharmony_civoid DSchedContinueManager::HandleContinueEnd(const DSchedContinueInfo& info)
558686862fbSopenharmony_ci{
559686862fbSopenharmony_ci    HILOGI("begin, continue info: %{public}s.", info.toString().c_str());
560686862fbSopenharmony_ci    std::lock_guard<std::mutex> continueLock(continueMutex_);
561686862fbSopenharmony_ci    if (continues_.empty() || continues_.find(info) == continues_.end()) {
562686862fbSopenharmony_ci        HILOGE("continue info doesn't match any existing continuation.");
563686862fbSopenharmony_ci        return;
564686862fbSopenharmony_ci    }
565686862fbSopenharmony_ci    RemoveTimeout(info);
566686862fbSopenharmony_ci    continues_.erase(info);
567686862fbSopenharmony_ci    ContinueSceneSessionHandler::GetInstance().ClearContinueSessionId();
568686862fbSopenharmony_ci
569686862fbSopenharmony_ci    std::string localDevId;
570686862fbSopenharmony_ci    if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
571686862fbSopenharmony_ci        HILOGE("get local deviceId failed!");
572686862fbSopenharmony_ci        return;
573686862fbSopenharmony_ci    }
574686862fbSopenharmony_ci
575686862fbSopenharmony_ci    if (info.sinkDeviceId_ == localDevId) {
576686862fbSopenharmony_ci        cntSink_--;
577686862fbSopenharmony_ci    } else if (info.sourceDeviceId_ == localDevId) {
578686862fbSopenharmony_ci        cntSource_--;
579686862fbSopenharmony_ci    }
580686862fbSopenharmony_ci    HILOGI("end.");
581686862fbSopenharmony_ci}
582686862fbSopenharmony_ci
583686862fbSopenharmony_civoid DSchedContinueManager::RemoveTimeout(const DSchedContinueInfo& info)
584686862fbSopenharmony_ci{
585686862fbSopenharmony_ci    if (eventHandler_ == nullptr) {
586686862fbSopenharmony_ci        HILOGE("eventHandler_ is nullptr");
587686862fbSopenharmony_ci        return;
588686862fbSopenharmony_ci    }
589686862fbSopenharmony_ci    eventHandler_->RemoveTask(info.ToStringIgnoreMissionId());
590686862fbSopenharmony_ci}
591686862fbSopenharmony_ci
592686862fbSopenharmony_civoid DSchedContinueManager::OnDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)
593686862fbSopenharmony_ci{
594686862fbSopenharmony_ci    auto func = [this, sessionId, dataBuffer]() {
595686862fbSopenharmony_ci        HandleDataRecv(sessionId, dataBuffer);
596686862fbSopenharmony_ci    };
597686862fbSopenharmony_ci    if (eventHandler_ == nullptr) {
598686862fbSopenharmony_ci        HILOGE("eventHandler_ is nullptr");
599686862fbSopenharmony_ci        return;
600686862fbSopenharmony_ci    }
601686862fbSopenharmony_ci    eventHandler_->PostTask(func);
602686862fbSopenharmony_ci}
603686862fbSopenharmony_ci
604686862fbSopenharmony_civoid DSchedContinueManager::HandleDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)
605686862fbSopenharmony_ci{
606686862fbSopenharmony_ci    HILOGI("start, sessionId: %{public}d.", sessionId);
607686862fbSopenharmony_ci    if (dataBuffer == nullptr) {
608686862fbSopenharmony_ci        HILOGE("dataBuffer is null.");
609686862fbSopenharmony_ci        return;
610686862fbSopenharmony_ci    }
611686862fbSopenharmony_ci    uint8_t *data = dataBuffer->Data();
612686862fbSopenharmony_ci    std::string jsonStr(reinterpret_cast<const char *>(data), dataBuffer->Capacity());
613686862fbSopenharmony_ci    cJSON *rootValue = cJSON_Parse(jsonStr.c_str());
614686862fbSopenharmony_ci    if (rootValue == nullptr) {
615686862fbSopenharmony_ci        HILOGE("Parse jsonStr error.");
616686862fbSopenharmony_ci        return;
617686862fbSopenharmony_ci    }
618686862fbSopenharmony_ci    cJSON *baseCmd = cJSON_GetObjectItemCaseSensitive(rootValue, "BaseCmd");
619686862fbSopenharmony_ci    if (baseCmd == nullptr || !cJSON_IsString(baseCmd) || (baseCmd->valuestring == nullptr)) {
620686862fbSopenharmony_ci        cJSON_Delete(rootValue);
621686862fbSopenharmony_ci        HILOGE("Parse base cmd error.");
622686862fbSopenharmony_ci        return;
623686862fbSopenharmony_ci    }
624686862fbSopenharmony_ci
625686862fbSopenharmony_ci    cJSON *cmdValue = cJSON_Parse(baseCmd->valuestring);
626686862fbSopenharmony_ci    cJSON_Delete(rootValue);
627686862fbSopenharmony_ci    if (cmdValue == nullptr) {
628686862fbSopenharmony_ci        HILOGE("Parse cmd value error.");
629686862fbSopenharmony_ci        return;
630686862fbSopenharmony_ci    }
631686862fbSopenharmony_ci
632686862fbSopenharmony_ci    cJSON *comvalue = cJSON_GetObjectItemCaseSensitive(cmdValue, "Command");
633686862fbSopenharmony_ci    if (comvalue == nullptr || !cJSON_IsNumber(comvalue)) {
634686862fbSopenharmony_ci        cJSON_Delete(cmdValue);
635686862fbSopenharmony_ci        HILOGE("parse command failed");
636686862fbSopenharmony_ci        return;
637686862fbSopenharmony_ci    }
638686862fbSopenharmony_ci    int32_t command = comvalue->valueint;
639686862fbSopenharmony_ci    cJSON_Delete(cmdValue);
640686862fbSopenharmony_ci    NotifyContinueDataRecv(sessionId, command, jsonStr, dataBuffer);
641686862fbSopenharmony_ci    HILOGI("end, sessionId: %{public}d.", sessionId);
642686862fbSopenharmony_ci}
643686862fbSopenharmony_ci
644686862fbSopenharmony_civoid DSchedContinueManager::NotifyContinueDataRecv(int32_t sessionId, int32_t command, const std::string& jsonStr,
645686862fbSopenharmony_ci    std::shared_ptr<DSchedDataBuffer> dataBuffer)
646686862fbSopenharmony_ci{
647686862fbSopenharmony_ci    HILOGI("start, parsed cmd %{public}d, sessionId: %{public}d.", command, sessionId);
648686862fbSopenharmony_ci    std::lock_guard<std::mutex> continueLock(continueMutex_);
649686862fbSopenharmony_ci    if (!continues_.empty()) {
650686862fbSopenharmony_ci        for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
651686862fbSopenharmony_ci            if (iter->second != nullptr && sessionId == iter->second->GetSessionId()) {
652686862fbSopenharmony_ci                HILOGI("sessionId %{public}d exist.", sessionId);
653686862fbSopenharmony_ci                iter->second->OnDataRecv(command, dataBuffer);
654686862fbSopenharmony_ci                return;
655686862fbSopenharmony_ci            }
656686862fbSopenharmony_ci        }
657686862fbSopenharmony_ci    }
658686862fbSopenharmony_ci
659686862fbSopenharmony_ci    if (command == DSCHED_CONTINUE_CMD_START) {
660686862fbSopenharmony_ci        HILOGI("recv start cmd, sessionId: %{public}d.", sessionId);
661686862fbSopenharmony_ci        auto startCmd = std::make_shared<DSchedContinueStartCmd>();
662686862fbSopenharmony_ci        int32_t ret = startCmd->Unmarshal(jsonStr);
663686862fbSopenharmony_ci        if (ret != ERR_OK) {
664686862fbSopenharmony_ci            HILOGE("Unmarshal start cmd failed, ret: %{public}d", ret);
665686862fbSopenharmony_ci            return;
666686862fbSopenharmony_ci        }
667686862fbSopenharmony_ci        int32_t direction = CONTINUE_SINK;
668686862fbSopenharmony_ci        ret = CheckContinuationLimit(startCmd->srcDeviceId_, startCmd->dstDeviceId_, direction);
669686862fbSopenharmony_ci        if (ret != ERR_OK) {
670686862fbSopenharmony_ci            DmsRadar::GetInstance().SaveDataDmsRemoteWant("NotifyContinueDataRecv", ret);
671686862fbSopenharmony_ci            HILOGE("CheckContinuationSubType failed, ret: %{public}d", ret);
672686862fbSopenharmony_ci            return;
673686862fbSopenharmony_ci        }
674686862fbSopenharmony_ci
675686862fbSopenharmony_ci        auto newContinue = std::make_shared<DSchedContinue>(startCmd, sessionId);
676686862fbSopenharmony_ci        newContinue->Init();
677686862fbSopenharmony_ci        continues_.insert(std::make_pair(newContinue->GetContinueInfo(), newContinue));
678686862fbSopenharmony_ci
679686862fbSopenharmony_ci        newContinue->OnStartCmd(startCmd->appVersion_);
680686862fbSopenharmony_ci        HILOGI("end, continue info: %{public}s.", newContinue->GetContinueInfo().toString().c_str());
681686862fbSopenharmony_ci        return;
682686862fbSopenharmony_ci    }
683686862fbSopenharmony_ci    HILOGE("No matching session to handle cmd! sessionId: %{public}d, recv cmd %{public}d.", sessionId, command);
684686862fbSopenharmony_ci    return;
685686862fbSopenharmony_ci}
686686862fbSopenharmony_ci
687686862fbSopenharmony_ciint32_t DSchedContinueManager::CheckContinuationLimit(const std::string& srcDeviceId,
688686862fbSopenharmony_ci    const std::string& dstDeviceId, int32_t &direction)
689686862fbSopenharmony_ci{
690686862fbSopenharmony_ci    std::string localDevId;
691686862fbSopenharmony_ci    if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
692686862fbSopenharmony_ci        HILOGE("get local deviceId failed!");
693686862fbSopenharmony_ci        return GET_LOCAL_DEVICE_ERR;
694686862fbSopenharmony_ci    }
695686862fbSopenharmony_ci
696686862fbSopenharmony_ci    direction = CONTINUE_SINK;
697686862fbSopenharmony_ci    if (dstDeviceId == localDevId) {
698686862fbSopenharmony_ci        if (cntSink_.load() >= MAX_CONCURRENT_SINK) {
699686862fbSopenharmony_ci            HILOGE("can't deal more than %{public}d pull requests at the same time.", cntSink_.load());
700686862fbSopenharmony_ci            return CONTINUE_ALREADY_IN_PROGRESS;
701686862fbSopenharmony_ci        }
702686862fbSopenharmony_ci        if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(srcDeviceId) == nullptr) {
703686862fbSopenharmony_ci            HILOGE("Irrecognized source device!");
704686862fbSopenharmony_ci            return INVALID_PARAMETERS_ERR;
705686862fbSopenharmony_ci        }
706686862fbSopenharmony_ci    } else if (srcDeviceId == localDevId) {
707686862fbSopenharmony_ci        if (cntSource_.load() >= MAX_CONCURRENT_SOURCE) {
708686862fbSopenharmony_ci            HILOGE("can't deal more than %{public}d push requests at the same time.", cntSource_.load());
709686862fbSopenharmony_ci            return CONTINUE_ALREADY_IN_PROGRESS;
710686862fbSopenharmony_ci        }
711686862fbSopenharmony_ci        if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) {
712686862fbSopenharmony_ci            HILOGE("Irrecognized destination device!");
713686862fbSopenharmony_ci            return INVALID_PARAMETERS_ERR;
714686862fbSopenharmony_ci        }
715686862fbSopenharmony_ci        direction = CONTINUE_SOURCE;
716686862fbSopenharmony_ci    } else {
717686862fbSopenharmony_ci        HILOGE("source or target device must be local!");
718686862fbSopenharmony_ci        return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
719686862fbSopenharmony_ci    }
720686862fbSopenharmony_ci    return ERR_OK;
721686862fbSopenharmony_ci}
722686862fbSopenharmony_ci
723686862fbSopenharmony_ciint32_t DSchedContinueManager::GetContinueInfo(std::string &srcDeviceId, std::string &dstDeviceId)
724686862fbSopenharmony_ci{
725686862fbSopenharmony_ci    HILOGI("called");
726686862fbSopenharmony_ci    std::lock_guard<std::mutex> continueLock(continueMutex_);
727686862fbSopenharmony_ci    if (continues_.empty()) {
728686862fbSopenharmony_ci        HILOGW("No continuation in progress.");
729686862fbSopenharmony_ci        return ERR_OK;
730686862fbSopenharmony_ci    }
731686862fbSopenharmony_ci    auto dsContinue = continues_.begin()->second;
732686862fbSopenharmony_ci    if (dsContinue == nullptr) {
733686862fbSopenharmony_ci        HILOGE("dContinue is null");
734686862fbSopenharmony_ci        return INVALID_PARAMETERS_ERR;
735686862fbSopenharmony_ci    }
736686862fbSopenharmony_ci    dstDeviceId = dsContinue->GetContinueInfo().sinkDeviceId_;
737686862fbSopenharmony_ci    srcDeviceId = dsContinue->GetContinueInfo().sourceDeviceId_;
738686862fbSopenharmony_ci    return ERR_OK;
739686862fbSopenharmony_ci}
740686862fbSopenharmony_ci
741686862fbSopenharmony_civoid DSchedContinueManager::OnShutdown(int32_t socket, bool isSelfCalled)
742686862fbSopenharmony_ci{
743686862fbSopenharmony_ci    if (isSelfCalled) {
744686862fbSopenharmony_ci        HILOGW("called, shutdown by local, sessionId: %{public}d", socket);
745686862fbSopenharmony_ci        return;
746686862fbSopenharmony_ci    }
747686862fbSopenharmony_ci    HILOGW("called, sessionId: %{public}d, isSelfCalled %{public}d", socket, isSelfCalled);
748686862fbSopenharmony_ci    auto func = [this, socket]() {
749686862fbSopenharmony_ci        std::lock_guard<std::mutex> continueLock(continueMutex_);
750686862fbSopenharmony_ci        if (continues_.empty()) {
751686862fbSopenharmony_ci            return;
752686862fbSopenharmony_ci        }
753686862fbSopenharmony_ci        for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
754686862fbSopenharmony_ci            if (iter->second != nullptr && socket == iter->second->GetSessionId()) {
755686862fbSopenharmony_ci                iter->second->OnContinueEnd(CONTINUE_SESSION_SHUTDOWN);
756686862fbSopenharmony_ci            }
757686862fbSopenharmony_ci        }
758686862fbSopenharmony_ci    };
759686862fbSopenharmony_ci    if (eventHandler_ == nullptr) {
760686862fbSopenharmony_ci        HILOGE("eventHandler_ is nullptr");
761686862fbSopenharmony_ci        return;
762686862fbSopenharmony_ci    }
763686862fbSopenharmony_ci    eventHandler_->PostTask(func);
764686862fbSopenharmony_ci    return;
765686862fbSopenharmony_ci}
766686862fbSopenharmony_ci
767686862fbSopenharmony_civoid DSchedContinueManager::SoftbusListener::OnBind(int32_t socket, PeerSocketInfo info)
768686862fbSopenharmony_ci{
769686862fbSopenharmony_ci}
770686862fbSopenharmony_ci
771686862fbSopenharmony_civoid DSchedContinueManager::SoftbusListener::OnShutdown(int32_t socket, bool isSelfCalled)
772686862fbSopenharmony_ci{
773686862fbSopenharmony_ci    DSchedContinueManager::GetInstance().OnShutdown(socket, isSelfCalled);
774686862fbSopenharmony_ci}
775686862fbSopenharmony_ci
776686862fbSopenharmony_civoid DSchedContinueManager::SoftbusListener::OnDataRecv(int32_t socket, std::shared_ptr<DSchedDataBuffer> dataBuffer)
777686862fbSopenharmony_ci{
778686862fbSopenharmony_ci    DSchedContinueManager::GetInstance().OnDataRecv(socket, dataBuffer);
779686862fbSopenharmony_ci}
780686862fbSopenharmony_ci}  // namespace DistributedSchedule
781686862fbSopenharmony_ci}  // namespace OHOS
782