1686862fbSopenharmony_ci/* 2686862fbSopenharmony_ci * Copyright (c) 2024 Huawei Device Co., Ltd. 3686862fbSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4686862fbSopenharmony_ci * you may not use this file except in compliance with the License. 5686862fbSopenharmony_ci * You may obtain a copy of the License at 6686862fbSopenharmony_ci * 7686862fbSopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8686862fbSopenharmony_ci * 9686862fbSopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10686862fbSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11686862fbSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12686862fbSopenharmony_ci * See the License for the specific language governing permissions and 13686862fbSopenharmony_ci * limitations under the License. 14686862fbSopenharmony_ci */ 15686862fbSopenharmony_ci 16686862fbSopenharmony_ci#include "dsched_continue_manager.h" 17686862fbSopenharmony_ci 18686862fbSopenharmony_ci#include <chrono> 19686862fbSopenharmony_ci#include <sys/prctl.h> 20686862fbSopenharmony_ci 21686862fbSopenharmony_ci#include "cJSON.h" 22686862fbSopenharmony_ci 23686862fbSopenharmony_ci#include "continue_scene_session_handler.h" 24686862fbSopenharmony_ci#include "dfx/distributed_radar.h" 25686862fbSopenharmony_ci#include "distributed_sched_utils.h" 26686862fbSopenharmony_ci#include "dsched_transport_softbus_adapter.h" 27686862fbSopenharmony_ci#include "dtbschedmgr_device_info_storage.h" 28686862fbSopenharmony_ci#include "dtbschedmgr_log.h" 29686862fbSopenharmony_ci#include "mission/distributed_bm_storage.h" 30686862fbSopenharmony_ci#include "mission/dms_continue_send_manager.h" 31686862fbSopenharmony_ci#include "mission/dms_continue_recv_manager.h" 32686862fbSopenharmony_ci 33686862fbSopenharmony_cinamespace OHOS { 34686862fbSopenharmony_cinamespace DistributedSchedule { 35686862fbSopenharmony_cinamespace { 36686862fbSopenharmony_ciconst std::string TAG = "DSchedContinueManager"; 37686862fbSopenharmony_ciconst std::string DSCHED_CONTINUE_MANAGER = "dsched_continue_manager"; 38686862fbSopenharmony_ciconst std::string CONTINUE_TIMEOUT_TASK = "continue_timeout_task"; 39686862fbSopenharmony_ci} 40686862fbSopenharmony_ci 41686862fbSopenharmony_ciIMPLEMENT_SINGLE_INSTANCE(DSchedContinueManager); 42686862fbSopenharmony_ci 43686862fbSopenharmony_ciDSchedContinueManager::DSchedContinueManager() 44686862fbSopenharmony_ci{ 45686862fbSopenharmony_ci} 46686862fbSopenharmony_ci 47686862fbSopenharmony_ciDSchedContinueManager::~DSchedContinueManager() 48686862fbSopenharmony_ci{ 49686862fbSopenharmony_ci HILOGI("DSchedContinueManager delete"); 50686862fbSopenharmony_ci UnInit(); 51686862fbSopenharmony_ci} 52686862fbSopenharmony_ci 53686862fbSopenharmony_civoid DSchedContinueManager::Init() 54686862fbSopenharmony_ci{ 55686862fbSopenharmony_ci HILOGI("Init DSchedContinueManager start"); 56686862fbSopenharmony_ci if (eventHandler_ != nullptr) { 57686862fbSopenharmony_ci HILOGI("DSchedContinueManager already inited, end."); 58686862fbSopenharmony_ci return; 59686862fbSopenharmony_ci } 60686862fbSopenharmony_ci DSchedTransportSoftbusAdapter::GetInstance().InitChannel(); 61686862fbSopenharmony_ci softbusListener_ = std::make_shared<DSchedContinueManager::SoftbusListener>(); 62686862fbSopenharmony_ci DSchedTransportSoftbusAdapter::GetInstance().RegisterListener(SERVICE_TYPE_CONTINUE, softbusListener_); 63686862fbSopenharmony_ci eventThread_ = std::thread(&DSchedContinueManager::StartEvent, this); 64686862fbSopenharmony_ci std::unique_lock<std::mutex> lock(eventMutex_); 65686862fbSopenharmony_ci eventCon_.wait(lock, [this] { 66686862fbSopenharmony_ci return eventHandler_ != nullptr; 67686862fbSopenharmony_ci }); 68686862fbSopenharmony_ci HILOGI("Init DSchedContinueManager end"); 69686862fbSopenharmony_ci} 70686862fbSopenharmony_ci 71686862fbSopenharmony_civoid DSchedContinueManager::StartEvent() 72686862fbSopenharmony_ci{ 73686862fbSopenharmony_ci HILOGI("StartEvent start"); 74686862fbSopenharmony_ci prctl(PR_SET_NAME, DSCHED_CONTINUE_MANAGER.c_str()); 75686862fbSopenharmony_ci auto runner = AppExecFwk::EventRunner::Create(false); 76686862fbSopenharmony_ci { 77686862fbSopenharmony_ci std::lock_guard<std::mutex> lock(eventMutex_); 78686862fbSopenharmony_ci eventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner); 79686862fbSopenharmony_ci } 80686862fbSopenharmony_ci eventCon_.notify_one(); 81686862fbSopenharmony_ci runner->Run(); 82686862fbSopenharmony_ci HILOGI("StartEvent end"); 83686862fbSopenharmony_ci} 84686862fbSopenharmony_ci 85686862fbSopenharmony_civoid DSchedContinueManager::UnInit() 86686862fbSopenharmony_ci{ 87686862fbSopenharmony_ci HILOGI("UnInit start"); 88686862fbSopenharmony_ci DSchedTransportSoftbusAdapter::GetInstance().UnregisterListener(SERVICE_TYPE_CONTINUE, softbusListener_); 89686862fbSopenharmony_ci DSchedTransportSoftbusAdapter::GetInstance().ReleaseChannel(); 90686862fbSopenharmony_ci continues_.clear(); 91686862fbSopenharmony_ci cntSink_ = 0; 92686862fbSopenharmony_ci cntSource_ = 0; 93686862fbSopenharmony_ci 94686862fbSopenharmony_ci if (eventHandler_ != nullptr) { 95686862fbSopenharmony_ci eventHandler_->GetEventRunner()->Stop(); 96686862fbSopenharmony_ci eventThread_.join(); 97686862fbSopenharmony_ci eventHandler_ = nullptr; 98686862fbSopenharmony_ci } else { 99686862fbSopenharmony_ci HILOGE("eventHandler_ is nullptr"); 100686862fbSopenharmony_ci } 101686862fbSopenharmony_ci HILOGI("UnInit end"); 102686862fbSopenharmony_ci} 103686862fbSopenharmony_ci 104686862fbSopenharmony_civoid DSchedContinueManager::NotifyAllConnectDecision(std::string peerDeviceId, bool isSupport) 105686862fbSopenharmony_ci{ 106686862fbSopenharmony_ci HILOGI("Notify all connect decision, peerDeviceId %{public}s, isSupport %{public}d.", 107686862fbSopenharmony_ci GetAnonymStr(peerDeviceId).c_str(), isSupport); 108686862fbSopenharmony_ci#ifdef DMSFWK_ALL_CONNECT_MGR 109686862fbSopenharmony_ci std::lock_guard<std::mutex> decisionLock(connectDecisionMutex_); 110686862fbSopenharmony_ci peerConnectDecision_[peerDeviceId] = isSupport; 111686862fbSopenharmony_ci connectDecisionCond_.notify_all(); 112686862fbSopenharmony_ci#endif 113686862fbSopenharmony_ci} 114686862fbSopenharmony_ci 115686862fbSopenharmony_ciint32_t DSchedContinueManager::ContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId, 116686862fbSopenharmony_ci int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams) 117686862fbSopenharmony_ci{ 118686862fbSopenharmony_ci if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) { 119686862fbSopenharmony_ci HILOGE("srcDeviceId or dstDeviceId or callback is null!"); 120686862fbSopenharmony_ci return INVALID_PARAMETERS_ERR; 121686862fbSopenharmony_ci } 122686862fbSopenharmony_ci 123686862fbSopenharmony_ci std::string localDevId; 124686862fbSopenharmony_ci if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) { 125686862fbSopenharmony_ci HILOGE("get local deviceId failed!"); 126686862fbSopenharmony_ci return INVALID_PARAMETERS_ERR; 127686862fbSopenharmony_ci } 128686862fbSopenharmony_ci if (localDevId != srcDeviceId && localDevId != dstDeviceId) { 129686862fbSopenharmony_ci HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.", 130686862fbSopenharmony_ci GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str()); 131686862fbSopenharmony_ci return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET; 132686862fbSopenharmony_ci } 133686862fbSopenharmony_ci if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById( 134686862fbSopenharmony_ci localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) { 135686862fbSopenharmony_ci HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.", 136686862fbSopenharmony_ci GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str()); 137686862fbSopenharmony_ci return INVALID_REMOTE_PARAMETERS_ERR; 138686862fbSopenharmony_ci } 139686862fbSopenharmony_ci 140686862fbSopenharmony_ci auto func = [this, srcDeviceId, dstDeviceId, missionId, callback, wantParams]() { 141686862fbSopenharmony_ci HandleContinueMission(srcDeviceId, dstDeviceId, missionId, callback, wantParams); 142686862fbSopenharmony_ci }; 143686862fbSopenharmony_ci if (eventHandler_ == nullptr) { 144686862fbSopenharmony_ci HILOGE("eventHandler_ is nullptr"); 145686862fbSopenharmony_ci return INVALID_PARAMETERS_ERR; 146686862fbSopenharmony_ci } 147686862fbSopenharmony_ci eventHandler_->PostTask(func); 148686862fbSopenharmony_ci return ERR_OK; 149686862fbSopenharmony_ci} 150686862fbSopenharmony_ci 151686862fbSopenharmony_civoid DSchedContinueManager::HandleContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId, 152686862fbSopenharmony_ci int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams) 153686862fbSopenharmony_ci{ 154686862fbSopenharmony_ci HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. missionId: %{public}d.", 155686862fbSopenharmony_ci GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), missionId); 156686862fbSopenharmony_ci 157686862fbSopenharmony_ci if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) { 158686862fbSopenharmony_ci HILOGE("srcDeviceId or dstDeviceId or callback is null!"); 159686862fbSopenharmony_ci return; 160686862fbSopenharmony_ci } 161686862fbSopenharmony_ci 162686862fbSopenharmony_ci std::string localDevId; 163686862fbSopenharmony_ci if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) { 164686862fbSopenharmony_ci HILOGE("get local deviceId failed!"); 165686862fbSopenharmony_ci return; 166686862fbSopenharmony_ci } 167686862fbSopenharmony_ci DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, dstDeviceId, missionId); 168686862fbSopenharmony_ci 169686862fbSopenharmony_ci AAFwk::MissionInfo missionInfo; 170686862fbSopenharmony_ci if (AAFwk::AbilityManagerClient::GetInstance()->GetMissionInfo("", missionId, missionInfo) == ERR_OK 171686862fbSopenharmony_ci && srcDeviceId == localDevId) { 172686862fbSopenharmony_ci info.sourceBundleName_ = missionInfo.want.GetBundle(); 173686862fbSopenharmony_ci info.sinkBundleName_ = missionInfo.want.GetBundle(); 174686862fbSopenharmony_ci } 175686862fbSopenharmony_ci 176686862fbSopenharmony_ci HandleContinueMissionWithBundleName(info, callback, wantParams); 177686862fbSopenharmony_ci return; 178686862fbSopenharmony_ci} 179686862fbSopenharmony_ci 180686862fbSopenharmony_ciint32_t DSchedContinueManager::ContinueMission(const DSchedContinueInfo& continueInfo, 181686862fbSopenharmony_ci const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams) 182686862fbSopenharmony_ci{ 183686862fbSopenharmony_ci std::string srcDeviceId = continueInfo.sourceDeviceId_; 184686862fbSopenharmony_ci std::string dstDeviceId = continueInfo.sinkDeviceId_; 185686862fbSopenharmony_ci 186686862fbSopenharmony_ci if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) { 187686862fbSopenharmony_ci HILOGE("srcDeviceId or dstDeviceId or callback is null!"); 188686862fbSopenharmony_ci return INVALID_PARAMETERS_ERR; 189686862fbSopenharmony_ci } 190686862fbSopenharmony_ci 191686862fbSopenharmony_ci std::string localDevId; 192686862fbSopenharmony_ci if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) { 193686862fbSopenharmony_ci HILOGE("get local deviceId failed!"); 194686862fbSopenharmony_ci return INVALID_PARAMETERS_ERR; 195686862fbSopenharmony_ci } 196686862fbSopenharmony_ci if (localDevId != srcDeviceId && localDevId != dstDeviceId) { 197686862fbSopenharmony_ci HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.", 198686862fbSopenharmony_ci GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str()); 199686862fbSopenharmony_ci return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET; 200686862fbSopenharmony_ci } 201686862fbSopenharmony_ci if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById( 202686862fbSopenharmony_ci localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) { 203686862fbSopenharmony_ci HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.", 204686862fbSopenharmony_ci GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str()); 205686862fbSopenharmony_ci return INVALID_REMOTE_PARAMETERS_ERR; 206686862fbSopenharmony_ci } 207686862fbSopenharmony_ci 208686862fbSopenharmony_ci#ifdef SUPPORT_DISTRIBUTED_MISSION_MANAGER 209686862fbSopenharmony_ci if (localDevId == srcDeviceId) { 210686862fbSopenharmony_ci int32_t missionId = -1; 211686862fbSopenharmony_ci int32_t ret = DMSContinueSendMgr::GetInstance().GetMissionIdByBundleName( 212686862fbSopenharmony_ci continueInfo.sinkBundleName_, missionId); 213686862fbSopenharmony_ci if (ret != ERR_OK) { 214686862fbSopenharmony_ci HILOGE("get missionId fail, ret %{public}d.", ret); 215686862fbSopenharmony_ci return INVALID_PARAMETERS_ERR; 216686862fbSopenharmony_ci } 217686862fbSopenharmony_ci } 218686862fbSopenharmony_ci#endif 219686862fbSopenharmony_ci 220686862fbSopenharmony_ci auto func = [this, continueInfo, callback, wantParams]() { 221686862fbSopenharmony_ci HandleContinueMission(continueInfo, callback, wantParams); 222686862fbSopenharmony_ci }; 223686862fbSopenharmony_ci if (eventHandler_ == nullptr) { 224686862fbSopenharmony_ci HILOGE("eventHandler_ is nullptr"); 225686862fbSopenharmony_ci return INVALID_PARAMETERS_ERR; 226686862fbSopenharmony_ci } 227686862fbSopenharmony_ci eventHandler_->PostTask(func); 228686862fbSopenharmony_ci return ERR_OK; 229686862fbSopenharmony_ci} 230686862fbSopenharmony_ci 231686862fbSopenharmony_civoid DSchedContinueManager::HandleContinueMission(const DSchedContinueInfo& continueInfo, 232686862fbSopenharmony_ci const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams) 233686862fbSopenharmony_ci{ 234686862fbSopenharmony_ci std::string srcDeviceId = continueInfo.sourceDeviceId_; 235686862fbSopenharmony_ci std::string dstDeviceId = continueInfo.sinkDeviceId_; 236686862fbSopenharmony_ci std::string srcBundleName = continueInfo.sourceBundleName_; 237686862fbSopenharmony_ci std::string bundleName = continueInfo.sinkBundleName_; 238686862fbSopenharmony_ci std::string continueType = continueInfo.continueType_; 239686862fbSopenharmony_ci HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. bundleName: %{public}s." 240686862fbSopenharmony_ci " continueType: %{public}s.", GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), 241686862fbSopenharmony_ci bundleName.c_str(), continueType.c_str()); 242686862fbSopenharmony_ci 243686862fbSopenharmony_ci if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) { 244686862fbSopenharmony_ci HILOGE("srcDeviceId or dstDeviceId or callback is null!"); 245686862fbSopenharmony_ci return; 246686862fbSopenharmony_ci } 247686862fbSopenharmony_ci 248686862fbSopenharmony_ci DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, srcBundleName, dstDeviceId, bundleName, continueType); 249686862fbSopenharmony_ci HandleContinueMissionWithBundleName(info, callback, wantParams); 250686862fbSopenharmony_ci return; 251686862fbSopenharmony_ci} 252686862fbSopenharmony_ci 253686862fbSopenharmony_cibool DSchedContinueManager::GetFirstBundleName(DSchedContinueInfo &info, std::string &firstBundleName, 254686862fbSopenharmony_ci std::string bundleName, std::string deviceId) 255686862fbSopenharmony_ci{ 256686862fbSopenharmony_ci uint16_t bundleNameId; 257686862fbSopenharmony_ci DmsBundleInfo distributedBundleInfo; 258686862fbSopenharmony_ci DmsBmStorage::GetInstance()->GetBundleNameId(bundleName, bundleNameId); 259686862fbSopenharmony_ci bool result = DmsBmStorage::GetInstance()->GetDistributedBundleInfo(deviceId, 260686862fbSopenharmony_ci bundleNameId, distributedBundleInfo); 261686862fbSopenharmony_ci if (!result) { 262686862fbSopenharmony_ci HILOGE("GetDistributedBundleInfo faild"); 263686862fbSopenharmony_ci return false; 264686862fbSopenharmony_ci } 265686862fbSopenharmony_ci std::vector<DmsAbilityInfo> dmsAbilityInfos = distributedBundleInfo.dmsAbilityInfos; 266686862fbSopenharmony_ci for (DmsAbilityInfo &ability: dmsAbilityInfos) { 267686862fbSopenharmony_ci std::vector<std::string> abilityContinueTypes = ability.continueType; 268686862fbSopenharmony_ci for (std::string &ability_continue_type: abilityContinueTypes) { 269686862fbSopenharmony_ci if (ability_continue_type == info.continueType_ && !ability.continueBundleName.empty()) { 270686862fbSopenharmony_ci firstBundleName = *ability.continueBundleName.begin(); 271686862fbSopenharmony_ci return true; 272686862fbSopenharmony_ci } 273686862fbSopenharmony_ci } 274686862fbSopenharmony_ci } 275686862fbSopenharmony_ci HILOGE("can not get abilicy info or continue bundle names is empty for continue type:%{public}s", 276686862fbSopenharmony_ci info.continueType_.c_str()); 277686862fbSopenharmony_ci return false; 278686862fbSopenharmony_ci} 279686862fbSopenharmony_ci 280686862fbSopenharmony_civoid DSchedContinueManager::HandleContinueMissionWithBundleName(DSchedContinueInfo &info, 281686862fbSopenharmony_ci const sptr<IRemoteObject> &callback, const OHOS::AAFwk::WantParams &wantParams) 282686862fbSopenharmony_ci{ 283686862fbSopenharmony_ci int32_t direction = CONTINUE_SINK; 284686862fbSopenharmony_ci int32_t ret = CheckContinuationLimit(info.sourceDeviceId_, info.sinkDeviceId_, direction); 285686862fbSopenharmony_ci if (ret != ERR_OK) { 286686862fbSopenharmony_ci HILOGE("CheckContinuationLimit failed, ret: %{public}d", ret); 287686862fbSopenharmony_ci return; 288686862fbSopenharmony_ci } 289686862fbSopenharmony_ci int32_t subType = CONTINUE_PUSH; 290686862fbSopenharmony_ci if (direction == CONTINUE_SOURCE) { 291686862fbSopenharmony_ci cntSource_++; 292686862fbSopenharmony_ci } else { 293686862fbSopenharmony_ci cntSink_++; 294686862fbSopenharmony_ci subType = CONTINUE_PULL; 295686862fbSopenharmony_ci if (info.sourceBundleName_.empty()) { 296686862fbSopenharmony_ci HILOGW("current sub type is continue pull; but can not get source bundle name from recv cache."); 297686862fbSopenharmony_ci std::string firstBundleNamme; 298686862fbSopenharmony_ci std::string bundleName = info.sinkBundleName_; 299686862fbSopenharmony_ci std::string deviceId = info.sinkDeviceId_; 300686862fbSopenharmony_ci if (GetFirstBundleName(info, firstBundleNamme, bundleName, deviceId)) { 301686862fbSopenharmony_ci info.sourceBundleName_ = firstBundleNamme; 302686862fbSopenharmony_ci } 303686862fbSopenharmony_ci } 304686862fbSopenharmony_ci } 305686862fbSopenharmony_ci { 306686862fbSopenharmony_ci std::lock_guard<std::mutex> continueLock(continueMutex_); 307686862fbSopenharmony_ci if (!continues_.empty() && continues_.find(info) != continues_.end()) { 308686862fbSopenharmony_ci HILOGE("a same continue task is already in progress."); 309686862fbSopenharmony_ci return; 310686862fbSopenharmony_ci } 311686862fbSopenharmony_ci auto newContinue = std::make_shared<DSchedContinue>(subType, direction, callback, info); 312686862fbSopenharmony_ci newContinue->Init(); 313686862fbSopenharmony_ci continues_.insert(std::make_pair(info, newContinue)); 314686862fbSopenharmony_ci#ifdef DMSFWK_ALL_CONNECT_MGR 315686862fbSopenharmony_ci { 316686862fbSopenharmony_ci std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_); 317686862fbSopenharmony_ci std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_; 318686862fbSopenharmony_ci if (peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end()) { 319686862fbSopenharmony_ci peerConnectDecision_.erase(peerDeviceId); 320686862fbSopenharmony_ci } 321686862fbSopenharmony_ci } 322686862fbSopenharmony_ci#endif 323686862fbSopenharmony_ci newContinue->OnContinueMission(wantParams); 324686862fbSopenharmony_ci } 325686862fbSopenharmony_ci WaitAllConnectDecision(direction, info, CONTINUE_TIMEOUT); 326686862fbSopenharmony_ci HILOGI("end, subType: %{public}d dirction: %{public}d, continue info: %{public}s", 327686862fbSopenharmony_ci subType, direction, info.toString().c_str()); 328686862fbSopenharmony_ci} 329686862fbSopenharmony_ci 330686862fbSopenharmony_civoid DSchedContinueManager::WaitAllConnectDecision(int32_t direction, const DSchedContinueInfo &info, int32_t timeout) 331686862fbSopenharmony_ci{ 332686862fbSopenharmony_ci#ifdef DMSFWK_ALL_CONNECT_MGR 333686862fbSopenharmony_ci std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_; 334686862fbSopenharmony_ci { 335686862fbSopenharmony_ci std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_); 336686862fbSopenharmony_ci connectDecisionCond_.wait_for(decisionLock, std::chrono::seconds(CONNECT_DECISION_WAIT_S), 337686862fbSopenharmony_ci [this, peerDeviceId]() { 338686862fbSopenharmony_ci return peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end() && 339686862fbSopenharmony_ci peerConnectDecision_.at(peerDeviceId).load(); 340686862fbSopenharmony_ci }); 341686862fbSopenharmony_ci 342686862fbSopenharmony_ci if (peerConnectDecision_.find(peerDeviceId) == peerConnectDecision_.end()) { 343686862fbSopenharmony_ci HILOGE("Not find peerDeviceId %{public}s in peerConnectDecision.", GetAnonymStr(peerDeviceId).c_str()); 344686862fbSopenharmony_ci SetTimeOut(info, 0); 345686862fbSopenharmony_ci return; 346686862fbSopenharmony_ci } 347686862fbSopenharmony_ci if (!peerConnectDecision_.at(peerDeviceId).load()) { 348686862fbSopenharmony_ci HILOGE("All connect manager refuse bind to PeerDeviceId %{public}s.", GetAnonymStr(peerDeviceId).c_str()); 349686862fbSopenharmony_ci peerConnectDecision_.erase(peerDeviceId); 350686862fbSopenharmony_ci SetTimeOut(info, 0); 351686862fbSopenharmony_ci return; 352686862fbSopenharmony_ci } 353686862fbSopenharmony_ci peerConnectDecision_.erase(peerDeviceId); 354686862fbSopenharmony_ci } 355686862fbSopenharmony_ci#endif 356686862fbSopenharmony_ci SetTimeOut(info, timeout); 357686862fbSopenharmony_ci} 358686862fbSopenharmony_ci 359686862fbSopenharmony_civoid DSchedContinueManager::SetTimeOut(const DSchedContinueInfo &info, int32_t timeout) 360686862fbSopenharmony_ci{ 361686862fbSopenharmony_ci auto func = [this, info]() { 362686862fbSopenharmony_ci if (continues_.empty() || continues_.count(info) == 0) { 363686862fbSopenharmony_ci HILOGE("continue not exist."); 364686862fbSopenharmony_ci return; 365686862fbSopenharmony_ci } 366686862fbSopenharmony_ci HILOGE("continue timeout! info: %{public}s", info.toString().c_str()); 367686862fbSopenharmony_ci auto dsContinue = continues_[info]; 368686862fbSopenharmony_ci if (dsContinue != nullptr) { 369686862fbSopenharmony_ci dsContinue->OnContinueEnd(CONTINUE_ABILITY_TIMEOUT_ERR); 370686862fbSopenharmony_ci } 371686862fbSopenharmony_ci }; 372686862fbSopenharmony_ci if (eventHandler_ == nullptr) { 373686862fbSopenharmony_ci HILOGE("eventHandler_ is nullptr"); 374686862fbSopenharmony_ci return; 375686862fbSopenharmony_ci } 376686862fbSopenharmony_ci timeout > 0 ? eventHandler_->PostTask(func, info.ToStringIgnoreMissionId(), timeout) : 377686862fbSopenharmony_ci eventHandler_->PostTask(func); 378686862fbSopenharmony_ci} 379686862fbSopenharmony_ci 380686862fbSopenharmony_ciint32_t DSchedContinueManager::StartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId, 381686862fbSopenharmony_ci int32_t callerUid, int32_t status, uint32_t accessToken) 382686862fbSopenharmony_ci{ 383686862fbSopenharmony_ci std::string dstDeviceId = want.GetElement().GetDeviceID(); 384686862fbSopenharmony_ci if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) { 385686862fbSopenharmony_ci HILOGE("GetDeviceInfoById fail, dstDevId: %{public}s.", GetAnonymStr(dstDeviceId).c_str()); 386686862fbSopenharmony_ci return INVALID_REMOTE_PARAMETERS_ERR; 387686862fbSopenharmony_ci } 388686862fbSopenharmony_ci if (GetDSchedContinueByWant(want, missionId) == nullptr) { 389686862fbSopenharmony_ci HILOGE("GetDSchedContinueByWant fail, dstDevId: %{public}s, missionId: %{public}d.", 390686862fbSopenharmony_ci GetAnonymStr(dstDeviceId).c_str(), missionId); 391686862fbSopenharmony_ci return INVALID_REMOTE_PARAMETERS_ERR; 392686862fbSopenharmony_ci } 393686862fbSopenharmony_ci 394686862fbSopenharmony_ci auto func = [this, want, missionId, callerUid, status, accessToken]() { 395686862fbSopenharmony_ci HandleStartContinuation(want, missionId, callerUid, status, accessToken); 396686862fbSopenharmony_ci }; 397686862fbSopenharmony_ci if (eventHandler_ == nullptr) { 398686862fbSopenharmony_ci HILOGE("eventHandler_ is nullptr"); 399686862fbSopenharmony_ci return INVALID_PARAMETERS_ERR; 400686862fbSopenharmony_ci } 401686862fbSopenharmony_ci eventHandler_->PostTask(func); 402686862fbSopenharmony_ci return ERR_OK; 403686862fbSopenharmony_ci} 404686862fbSopenharmony_ci 405686862fbSopenharmony_civoid DSchedContinueManager::HandleStartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId, 406686862fbSopenharmony_ci int32_t callerUid, int32_t status, uint32_t accessToken) 407686862fbSopenharmony_ci{ 408686862fbSopenharmony_ci HILOGI("begin"); 409686862fbSopenharmony_ci auto dContinue = GetDSchedContinueByWant(want, missionId); 410686862fbSopenharmony_ci if (dContinue != nullptr) { 411686862fbSopenharmony_ci dContinue->OnStartContinuation(want, callerUid, status, accessToken); 412686862fbSopenharmony_ci } else { 413686862fbSopenharmony_ci DmsRadar::GetInstance().SaveDataDmsRemoteWant("HandleStartContinuation", INVALID_PARAMETERS_ERR); 414686862fbSopenharmony_ci } 415686862fbSopenharmony_ci HILOGI("end"); 416686862fbSopenharmony_ci return; 417686862fbSopenharmony_ci} 418686862fbSopenharmony_ci 419686862fbSopenharmony_cistd::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByWant( 420686862fbSopenharmony_ci const OHOS::AAFwk::Want& want, int32_t missionId) 421686862fbSopenharmony_ci{ 422686862fbSopenharmony_ci std::string srcDeviceId; 423686862fbSopenharmony_ci if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(srcDeviceId)) { 424686862fbSopenharmony_ci DmsRadar::GetInstance().SaveDataDmsRemoteWant("GetDSchedContinueByWant", GET_LOCAL_DEVICE_ERR); 425686862fbSopenharmony_ci HILOGE("get local deviceId failed!"); 426686862fbSopenharmony_ci return nullptr; 427686862fbSopenharmony_ci } 428686862fbSopenharmony_ci std::string dstDeviceId = want.GetElement().GetDeviceID(); 429686862fbSopenharmony_ci std::string bundleName = want.GetElement().GetBundleName(); 430686862fbSopenharmony_ci auto info = DSchedContinueInfo(srcDeviceId, bundleName, dstDeviceId, bundleName, ""); 431686862fbSopenharmony_ci 432686862fbSopenharmony_ci HILOGI("continue info: %{public}s.", info.toString().c_str()); 433686862fbSopenharmony_ci { 434686862fbSopenharmony_ci std::lock_guard<std::mutex> continueLock(continueMutex_); 435686862fbSopenharmony_ci if (continues_.empty()) { 436686862fbSopenharmony_ci HILOGE("continue info doesn't match an existing continuation."); 437686862fbSopenharmony_ci return nullptr; 438686862fbSopenharmony_ci } 439686862fbSopenharmony_ci for (auto iter = continues_.begin(); iter != continues_.end(); iter++) { 440686862fbSopenharmony_ci if (iter->second == nullptr) { 441686862fbSopenharmony_ci continue; 442686862fbSopenharmony_ci } 443686862fbSopenharmony_ci DSchedContinueInfo continueInfo = iter->second->GetContinueInfo(); 444686862fbSopenharmony_ci if (srcDeviceId == continueInfo.sourceDeviceId_ 445686862fbSopenharmony_ci && bundleName == continueInfo.sourceBundleName_ 446686862fbSopenharmony_ci && dstDeviceId == continueInfo.sinkDeviceId_) { 447686862fbSopenharmony_ci return iter->second; 448686862fbSopenharmony_ci } 449686862fbSopenharmony_ci } 450686862fbSopenharmony_ci } 451686862fbSopenharmony_ci HILOGE("missionId doesn't match the existing continuation, continueInfo: %{public}s.", 452686862fbSopenharmony_ci info.toString().c_str()); 453686862fbSopenharmony_ci return nullptr; 454686862fbSopenharmony_ci} 455686862fbSopenharmony_ci 456686862fbSopenharmony_ciint32_t DSchedContinueManager::NotifyCompleteContinuation(const std::u16string& devId, int32_t sessionId, 457686862fbSopenharmony_ci bool isSuccess, const std::string &callerBundleName) 458686862fbSopenharmony_ci{ 459686862fbSopenharmony_ci auto func = [this, devId, sessionId, isSuccess, callerBundleName]() { 460686862fbSopenharmony_ci HandleNotifyCompleteContinuation(devId, sessionId, isSuccess, callerBundleName); 461686862fbSopenharmony_ci }; 462686862fbSopenharmony_ci if (eventHandler_ == nullptr) { 463686862fbSopenharmony_ci HILOGE("eventHandler_ is nullptr"); 464686862fbSopenharmony_ci return INVALID_PARAMETERS_ERR; 465686862fbSopenharmony_ci } 466686862fbSopenharmony_ci eventHandler_->PostTask(func); 467686862fbSopenharmony_ci return ERR_OK; 468686862fbSopenharmony_ci} 469686862fbSopenharmony_ci 470686862fbSopenharmony_civoid DSchedContinueManager::HandleNotifyCompleteContinuation(const std::u16string& devId, int32_t missionId, 471686862fbSopenharmony_ci bool isSuccess, const std::string &callerBundleName) 472686862fbSopenharmony_ci{ 473686862fbSopenharmony_ci HILOGI("begin, isSuccess %{public}d", isSuccess); 474686862fbSopenharmony_ci auto dContinue = GetDSchedContinueByDevId(devId, missionId); 475686862fbSopenharmony_ci if (dContinue != nullptr) { 476686862fbSopenharmony_ci if (dContinue->GetContinueInfo().sinkBundleName_ != callerBundleName) { 477686862fbSopenharmony_ci HILOGE("callerBundleName doesn't match the existing continuation"); 478686862fbSopenharmony_ci return; 479686862fbSopenharmony_ci } 480686862fbSopenharmony_ci dContinue->OnNotifyComplete(missionId, isSuccess); 481686862fbSopenharmony_ci HILOGI("end, continue info: %{public}s.", dContinue->GetContinueInfo().toString().c_str()); 482686862fbSopenharmony_ci } 483686862fbSopenharmony_ci return; 484686862fbSopenharmony_ci} 485686862fbSopenharmony_ci 486686862fbSopenharmony_cistd::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByDevId( 487686862fbSopenharmony_ci const std::u16string& devId, int32_t missionId) 488686862fbSopenharmony_ci{ 489686862fbSopenharmony_ci std::string deviceId = Str16ToStr8(devId); 490686862fbSopenharmony_ci HILOGI("begin, deviceId %{public}s, missionId %{public}d", GetAnonymStr(deviceId).c_str(), missionId); 491686862fbSopenharmony_ci { 492686862fbSopenharmony_ci std::lock_guard<std::mutex> continueLock(continueMutex_); 493686862fbSopenharmony_ci if (continues_.empty()) { 494686862fbSopenharmony_ci HILOGE("No continuation in progress."); 495686862fbSopenharmony_ci return nullptr; 496686862fbSopenharmony_ci } 497686862fbSopenharmony_ci for (auto iter = continues_.begin(); iter != continues_.end(); iter++) { 498686862fbSopenharmony_ci if (iter->second != nullptr && deviceId == iter->second->GetContinueInfo().sourceDeviceId_) { 499686862fbSopenharmony_ci return iter->second; 500686862fbSopenharmony_ci } 501686862fbSopenharmony_ci } 502686862fbSopenharmony_ci } 503686862fbSopenharmony_ci HILOGE("source deviceId doesn't match an existing continuation."); 504686862fbSopenharmony_ci return nullptr; 505686862fbSopenharmony_ci} 506686862fbSopenharmony_ci 507686862fbSopenharmony_civoid DSchedContinueManager::NotifyTerminateContinuation(const int32_t missionId) 508686862fbSopenharmony_ci{ 509686862fbSopenharmony_ci HILOGI("begin, missionId %{public}d", missionId); 510686862fbSopenharmony_ci { 511686862fbSopenharmony_ci std::lock_guard<std::mutex> continueLock(continueMutex_); 512686862fbSopenharmony_ci if (continues_.empty()) { 513686862fbSopenharmony_ci HILOGW("No continuation in progress."); 514686862fbSopenharmony_ci return; 515686862fbSopenharmony_ci } 516686862fbSopenharmony_ci 517686862fbSopenharmony_ci ContinueLaunchMissionInfo missionInfo; 518686862fbSopenharmony_ci int32_t ret = DMSContinueSendMgr::GetInstance().GetContinueLaunchMissionInfo(missionId, missionInfo); 519686862fbSopenharmony_ci if (ret != ERR_OK) { 520686862fbSopenharmony_ci HILOGE("get continueLaunchMissionInfo failed, missionId %{public}d", missionId); 521686862fbSopenharmony_ci return; 522686862fbSopenharmony_ci } 523686862fbSopenharmony_ci HILOGI("alive missionInfo bundleName is %{public}s, abilityName is %{public}s", 524686862fbSopenharmony_ci missionInfo.bundleName.c_str(), missionInfo.abilityName.c_str()); 525686862fbSopenharmony_ci for (auto iter = continues_.begin(); iter != continues_.end(); iter++) { 526686862fbSopenharmony_ci if (iter->second == nullptr) { 527686862fbSopenharmony_ci break; 528686862fbSopenharmony_ci } 529686862fbSopenharmony_ci 530686862fbSopenharmony_ci auto continueInfo = iter->second->GetContinueInfo(); 531686862fbSopenharmony_ci HILOGI("continueInfo bundleName is %{public}s, abilityName is %{public}s", 532686862fbSopenharmony_ci continueInfo.sinkBundleName_.c_str(), continueInfo.sinkAbilityName_.c_str()); 533686862fbSopenharmony_ci if (missionInfo.bundleName == continueInfo.sinkBundleName_ 534686862fbSopenharmony_ci && missionInfo.abilityName == continueInfo.sinkAbilityName_) { 535686862fbSopenharmony_ci HILOGE("Excute onContinueEnd"); 536686862fbSopenharmony_ci iter->second->OnContinueEnd(CONTINUE_SINK_ABILITY_TERMINATED); 537686862fbSopenharmony_ci return; 538686862fbSopenharmony_ci } 539686862fbSopenharmony_ci } 540686862fbSopenharmony_ci } 541686862fbSopenharmony_ci HILOGW("doesn't match an existing continuation."); 542686862fbSopenharmony_ci} 543686862fbSopenharmony_ci 544686862fbSopenharmony_ciint32_t DSchedContinueManager::OnContinueEnd(const DSchedContinueInfo& info) 545686862fbSopenharmony_ci{ 546686862fbSopenharmony_ci auto func = [this, info]() { 547686862fbSopenharmony_ci HandleContinueEnd(info); 548686862fbSopenharmony_ci }; 549686862fbSopenharmony_ci if (eventHandler_ == nullptr) { 550686862fbSopenharmony_ci HILOGE("eventHandler_ is nullptr"); 551686862fbSopenharmony_ci return INVALID_PARAMETERS_ERR; 552686862fbSopenharmony_ci } 553686862fbSopenharmony_ci eventHandler_->PostTask(func); 554686862fbSopenharmony_ci return ERR_OK; 555686862fbSopenharmony_ci} 556686862fbSopenharmony_ci 557686862fbSopenharmony_civoid DSchedContinueManager::HandleContinueEnd(const DSchedContinueInfo& info) 558686862fbSopenharmony_ci{ 559686862fbSopenharmony_ci HILOGI("begin, continue info: %{public}s.", info.toString().c_str()); 560686862fbSopenharmony_ci std::lock_guard<std::mutex> continueLock(continueMutex_); 561686862fbSopenharmony_ci if (continues_.empty() || continues_.find(info) == continues_.end()) { 562686862fbSopenharmony_ci HILOGE("continue info doesn't match any existing continuation."); 563686862fbSopenharmony_ci return; 564686862fbSopenharmony_ci } 565686862fbSopenharmony_ci RemoveTimeout(info); 566686862fbSopenharmony_ci continues_.erase(info); 567686862fbSopenharmony_ci ContinueSceneSessionHandler::GetInstance().ClearContinueSessionId(); 568686862fbSopenharmony_ci 569686862fbSopenharmony_ci std::string localDevId; 570686862fbSopenharmony_ci if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) { 571686862fbSopenharmony_ci HILOGE("get local deviceId failed!"); 572686862fbSopenharmony_ci return; 573686862fbSopenharmony_ci } 574686862fbSopenharmony_ci 575686862fbSopenharmony_ci if (info.sinkDeviceId_ == localDevId) { 576686862fbSopenharmony_ci cntSink_--; 577686862fbSopenharmony_ci } else if (info.sourceDeviceId_ == localDevId) { 578686862fbSopenharmony_ci cntSource_--; 579686862fbSopenharmony_ci } 580686862fbSopenharmony_ci HILOGI("end."); 581686862fbSopenharmony_ci} 582686862fbSopenharmony_ci 583686862fbSopenharmony_civoid DSchedContinueManager::RemoveTimeout(const DSchedContinueInfo& info) 584686862fbSopenharmony_ci{ 585686862fbSopenharmony_ci if (eventHandler_ == nullptr) { 586686862fbSopenharmony_ci HILOGE("eventHandler_ is nullptr"); 587686862fbSopenharmony_ci return; 588686862fbSopenharmony_ci } 589686862fbSopenharmony_ci eventHandler_->RemoveTask(info.ToStringIgnoreMissionId()); 590686862fbSopenharmony_ci} 591686862fbSopenharmony_ci 592686862fbSopenharmony_civoid DSchedContinueManager::OnDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer) 593686862fbSopenharmony_ci{ 594686862fbSopenharmony_ci auto func = [this, sessionId, dataBuffer]() { 595686862fbSopenharmony_ci HandleDataRecv(sessionId, dataBuffer); 596686862fbSopenharmony_ci }; 597686862fbSopenharmony_ci if (eventHandler_ == nullptr) { 598686862fbSopenharmony_ci HILOGE("eventHandler_ is nullptr"); 599686862fbSopenharmony_ci return; 600686862fbSopenharmony_ci } 601686862fbSopenharmony_ci eventHandler_->PostTask(func); 602686862fbSopenharmony_ci} 603686862fbSopenharmony_ci 604686862fbSopenharmony_civoid DSchedContinueManager::HandleDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer) 605686862fbSopenharmony_ci{ 606686862fbSopenharmony_ci HILOGI("start, sessionId: %{public}d.", sessionId); 607686862fbSopenharmony_ci if (dataBuffer == nullptr) { 608686862fbSopenharmony_ci HILOGE("dataBuffer is null."); 609686862fbSopenharmony_ci return; 610686862fbSopenharmony_ci } 611686862fbSopenharmony_ci uint8_t *data = dataBuffer->Data(); 612686862fbSopenharmony_ci std::string jsonStr(reinterpret_cast<const char *>(data), dataBuffer->Capacity()); 613686862fbSopenharmony_ci cJSON *rootValue = cJSON_Parse(jsonStr.c_str()); 614686862fbSopenharmony_ci if (rootValue == nullptr) { 615686862fbSopenharmony_ci HILOGE("Parse jsonStr error."); 616686862fbSopenharmony_ci return; 617686862fbSopenharmony_ci } 618686862fbSopenharmony_ci cJSON *baseCmd = cJSON_GetObjectItemCaseSensitive(rootValue, "BaseCmd"); 619686862fbSopenharmony_ci if (baseCmd == nullptr || !cJSON_IsString(baseCmd) || (baseCmd->valuestring == nullptr)) { 620686862fbSopenharmony_ci cJSON_Delete(rootValue); 621686862fbSopenharmony_ci HILOGE("Parse base cmd error."); 622686862fbSopenharmony_ci return; 623686862fbSopenharmony_ci } 624686862fbSopenharmony_ci 625686862fbSopenharmony_ci cJSON *cmdValue = cJSON_Parse(baseCmd->valuestring); 626686862fbSopenharmony_ci cJSON_Delete(rootValue); 627686862fbSopenharmony_ci if (cmdValue == nullptr) { 628686862fbSopenharmony_ci HILOGE("Parse cmd value error."); 629686862fbSopenharmony_ci return; 630686862fbSopenharmony_ci } 631686862fbSopenharmony_ci 632686862fbSopenharmony_ci cJSON *comvalue = cJSON_GetObjectItemCaseSensitive(cmdValue, "Command"); 633686862fbSopenharmony_ci if (comvalue == nullptr || !cJSON_IsNumber(comvalue)) { 634686862fbSopenharmony_ci cJSON_Delete(cmdValue); 635686862fbSopenharmony_ci HILOGE("parse command failed"); 636686862fbSopenharmony_ci return; 637686862fbSopenharmony_ci } 638686862fbSopenharmony_ci int32_t command = comvalue->valueint; 639686862fbSopenharmony_ci cJSON_Delete(cmdValue); 640686862fbSopenharmony_ci NotifyContinueDataRecv(sessionId, command, jsonStr, dataBuffer); 641686862fbSopenharmony_ci HILOGI("end, sessionId: %{public}d.", sessionId); 642686862fbSopenharmony_ci} 643686862fbSopenharmony_ci 644686862fbSopenharmony_civoid DSchedContinueManager::NotifyContinueDataRecv(int32_t sessionId, int32_t command, const std::string& jsonStr, 645686862fbSopenharmony_ci std::shared_ptr<DSchedDataBuffer> dataBuffer) 646686862fbSopenharmony_ci{ 647686862fbSopenharmony_ci HILOGI("start, parsed cmd %{public}d, sessionId: %{public}d.", command, sessionId); 648686862fbSopenharmony_ci std::lock_guard<std::mutex> continueLock(continueMutex_); 649686862fbSopenharmony_ci if (!continues_.empty()) { 650686862fbSopenharmony_ci for (auto iter = continues_.begin(); iter != continues_.end(); iter++) { 651686862fbSopenharmony_ci if (iter->second != nullptr && sessionId == iter->second->GetSessionId()) { 652686862fbSopenharmony_ci HILOGI("sessionId %{public}d exist.", sessionId); 653686862fbSopenharmony_ci iter->second->OnDataRecv(command, dataBuffer); 654686862fbSopenharmony_ci return; 655686862fbSopenharmony_ci } 656686862fbSopenharmony_ci } 657686862fbSopenharmony_ci } 658686862fbSopenharmony_ci 659686862fbSopenharmony_ci if (command == DSCHED_CONTINUE_CMD_START) { 660686862fbSopenharmony_ci HILOGI("recv start cmd, sessionId: %{public}d.", sessionId); 661686862fbSopenharmony_ci auto startCmd = std::make_shared<DSchedContinueStartCmd>(); 662686862fbSopenharmony_ci int32_t ret = startCmd->Unmarshal(jsonStr); 663686862fbSopenharmony_ci if (ret != ERR_OK) { 664686862fbSopenharmony_ci HILOGE("Unmarshal start cmd failed, ret: %{public}d", ret); 665686862fbSopenharmony_ci return; 666686862fbSopenharmony_ci } 667686862fbSopenharmony_ci int32_t direction = CONTINUE_SINK; 668686862fbSopenharmony_ci ret = CheckContinuationLimit(startCmd->srcDeviceId_, startCmd->dstDeviceId_, direction); 669686862fbSopenharmony_ci if (ret != ERR_OK) { 670686862fbSopenharmony_ci DmsRadar::GetInstance().SaveDataDmsRemoteWant("NotifyContinueDataRecv", ret); 671686862fbSopenharmony_ci HILOGE("CheckContinuationSubType failed, ret: %{public}d", ret); 672686862fbSopenharmony_ci return; 673686862fbSopenharmony_ci } 674686862fbSopenharmony_ci 675686862fbSopenharmony_ci auto newContinue = std::make_shared<DSchedContinue>(startCmd, sessionId); 676686862fbSopenharmony_ci newContinue->Init(); 677686862fbSopenharmony_ci continues_.insert(std::make_pair(newContinue->GetContinueInfo(), newContinue)); 678686862fbSopenharmony_ci 679686862fbSopenharmony_ci newContinue->OnStartCmd(startCmd->appVersion_); 680686862fbSopenharmony_ci HILOGI("end, continue info: %{public}s.", newContinue->GetContinueInfo().toString().c_str()); 681686862fbSopenharmony_ci return; 682686862fbSopenharmony_ci } 683686862fbSopenharmony_ci HILOGE("No matching session to handle cmd! sessionId: %{public}d, recv cmd %{public}d.", sessionId, command); 684686862fbSopenharmony_ci return; 685686862fbSopenharmony_ci} 686686862fbSopenharmony_ci 687686862fbSopenharmony_ciint32_t DSchedContinueManager::CheckContinuationLimit(const std::string& srcDeviceId, 688686862fbSopenharmony_ci const std::string& dstDeviceId, int32_t &direction) 689686862fbSopenharmony_ci{ 690686862fbSopenharmony_ci std::string localDevId; 691686862fbSopenharmony_ci if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) { 692686862fbSopenharmony_ci HILOGE("get local deviceId failed!"); 693686862fbSopenharmony_ci return GET_LOCAL_DEVICE_ERR; 694686862fbSopenharmony_ci } 695686862fbSopenharmony_ci 696686862fbSopenharmony_ci direction = CONTINUE_SINK; 697686862fbSopenharmony_ci if (dstDeviceId == localDevId) { 698686862fbSopenharmony_ci if (cntSink_.load() >= MAX_CONCURRENT_SINK) { 699686862fbSopenharmony_ci HILOGE("can't deal more than %{public}d pull requests at the same time.", cntSink_.load()); 700686862fbSopenharmony_ci return CONTINUE_ALREADY_IN_PROGRESS; 701686862fbSopenharmony_ci } 702686862fbSopenharmony_ci if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(srcDeviceId) == nullptr) { 703686862fbSopenharmony_ci HILOGE("Irrecognized source device!"); 704686862fbSopenharmony_ci return INVALID_PARAMETERS_ERR; 705686862fbSopenharmony_ci } 706686862fbSopenharmony_ci } else if (srcDeviceId == localDevId) { 707686862fbSopenharmony_ci if (cntSource_.load() >= MAX_CONCURRENT_SOURCE) { 708686862fbSopenharmony_ci HILOGE("can't deal more than %{public}d push requests at the same time.", cntSource_.load()); 709686862fbSopenharmony_ci return CONTINUE_ALREADY_IN_PROGRESS; 710686862fbSopenharmony_ci } 711686862fbSopenharmony_ci if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) { 712686862fbSopenharmony_ci HILOGE("Irrecognized destination device!"); 713686862fbSopenharmony_ci return INVALID_PARAMETERS_ERR; 714686862fbSopenharmony_ci } 715686862fbSopenharmony_ci direction = CONTINUE_SOURCE; 716686862fbSopenharmony_ci } else { 717686862fbSopenharmony_ci HILOGE("source or target device must be local!"); 718686862fbSopenharmony_ci return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET; 719686862fbSopenharmony_ci } 720686862fbSopenharmony_ci return ERR_OK; 721686862fbSopenharmony_ci} 722686862fbSopenharmony_ci 723686862fbSopenharmony_ciint32_t DSchedContinueManager::GetContinueInfo(std::string &srcDeviceId, std::string &dstDeviceId) 724686862fbSopenharmony_ci{ 725686862fbSopenharmony_ci HILOGI("called"); 726686862fbSopenharmony_ci std::lock_guard<std::mutex> continueLock(continueMutex_); 727686862fbSopenharmony_ci if (continues_.empty()) { 728686862fbSopenharmony_ci HILOGW("No continuation in progress."); 729686862fbSopenharmony_ci return ERR_OK; 730686862fbSopenharmony_ci } 731686862fbSopenharmony_ci auto dsContinue = continues_.begin()->second; 732686862fbSopenharmony_ci if (dsContinue == nullptr) { 733686862fbSopenharmony_ci HILOGE("dContinue is null"); 734686862fbSopenharmony_ci return INVALID_PARAMETERS_ERR; 735686862fbSopenharmony_ci } 736686862fbSopenharmony_ci dstDeviceId = dsContinue->GetContinueInfo().sinkDeviceId_; 737686862fbSopenharmony_ci srcDeviceId = dsContinue->GetContinueInfo().sourceDeviceId_; 738686862fbSopenharmony_ci return ERR_OK; 739686862fbSopenharmony_ci} 740686862fbSopenharmony_ci 741686862fbSopenharmony_civoid DSchedContinueManager::OnShutdown(int32_t socket, bool isSelfCalled) 742686862fbSopenharmony_ci{ 743686862fbSopenharmony_ci if (isSelfCalled) { 744686862fbSopenharmony_ci HILOGW("called, shutdown by local, sessionId: %{public}d", socket); 745686862fbSopenharmony_ci return; 746686862fbSopenharmony_ci } 747686862fbSopenharmony_ci HILOGW("called, sessionId: %{public}d, isSelfCalled %{public}d", socket, isSelfCalled); 748686862fbSopenharmony_ci auto func = [this, socket]() { 749686862fbSopenharmony_ci std::lock_guard<std::mutex> continueLock(continueMutex_); 750686862fbSopenharmony_ci if (continues_.empty()) { 751686862fbSopenharmony_ci return; 752686862fbSopenharmony_ci } 753686862fbSopenharmony_ci for (auto iter = continues_.begin(); iter != continues_.end(); iter++) { 754686862fbSopenharmony_ci if (iter->second != nullptr && socket == iter->second->GetSessionId()) { 755686862fbSopenharmony_ci iter->second->OnContinueEnd(CONTINUE_SESSION_SHUTDOWN); 756686862fbSopenharmony_ci } 757686862fbSopenharmony_ci } 758686862fbSopenharmony_ci }; 759686862fbSopenharmony_ci if (eventHandler_ == nullptr) { 760686862fbSopenharmony_ci HILOGE("eventHandler_ is nullptr"); 761686862fbSopenharmony_ci return; 762686862fbSopenharmony_ci } 763686862fbSopenharmony_ci eventHandler_->PostTask(func); 764686862fbSopenharmony_ci return; 765686862fbSopenharmony_ci} 766686862fbSopenharmony_ci 767686862fbSopenharmony_civoid DSchedContinueManager::SoftbusListener::OnBind(int32_t socket, PeerSocketInfo info) 768686862fbSopenharmony_ci{ 769686862fbSopenharmony_ci} 770686862fbSopenharmony_ci 771686862fbSopenharmony_civoid DSchedContinueManager::SoftbusListener::OnShutdown(int32_t socket, bool isSelfCalled) 772686862fbSopenharmony_ci{ 773686862fbSopenharmony_ci DSchedContinueManager::GetInstance().OnShutdown(socket, isSelfCalled); 774686862fbSopenharmony_ci} 775686862fbSopenharmony_ci 776686862fbSopenharmony_civoid DSchedContinueManager::SoftbusListener::OnDataRecv(int32_t socket, std::shared_ptr<DSchedDataBuffer> dataBuffer) 777686862fbSopenharmony_ci{ 778686862fbSopenharmony_ci DSchedContinueManager::GetInstance().OnDataRecv(socket, dataBuffer); 779686862fbSopenharmony_ci} 780686862fbSopenharmony_ci} // namespace DistributedSchedule 781686862fbSopenharmony_ci} // namespace OHOS 782