1/* 2 * Copyright (c) 2021-2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15#include "dataobs_mgr_inner.h" 16 17#include "data_ability_observer_stub.h" 18#include "dataobs_mgr_errors.h" 19#include "hilog_tag_wrapper.h" 20#include "common_utils.h" 21 22namespace OHOS { 23namespace AAFwk { 24 25DataObsMgrInner::DataObsMgrInner() {} 26 27DataObsMgrInner::~DataObsMgrInner() {} 28 29int DataObsMgrInner::HandleRegisterObserver(const Uri &uri, sptr<IDataAbilityObserver> dataObserver) 30{ 31 std::lock_guard<ffrt::mutex> lock(innerMutex_); 32 33 auto [obsPair, flag] = observers_.try_emplace(uri.ToString(), std::list<sptr<IDataAbilityObserver>>()); 34 if (!flag && obsPair->second.size() > OBS_NUM_MAX) { 35 TAG_LOGE(AAFwkTag::DBOBSMGR, 36 "subscribers num:%{public}s maxed", 37 CommonUtils::Anonymous(uri.ToString()).c_str()); 38 return DATAOBS_SERVICE_OBS_LIMMIT; 39 } 40 41 for (auto obs = obsPair->second.begin(); obs != obsPair->second.end(); obs++) { 42 if ((*obs)->AsObject() == dataObserver->AsObject()) { 43 TAG_LOGE(AAFwkTag::DBOBSMGR, "obs registered:%{public}s", 44 CommonUtils::Anonymous(uri.ToString()).c_str()); 45 return OBS_EXIST; 46 } 47 } 48 49 obsPair->second.push_back(dataObserver); 50 51 AddObsDeathRecipient(dataObserver); 52 53 return NO_ERROR; 54} 55 56int DataObsMgrInner::HandleUnregisterObserver(const Uri &uri, sptr<IDataAbilityObserver> dataObserver) 57{ 58 std::lock_guard<ffrt::mutex> lock(innerMutex_); 59 60 auto obsPair = observers_.find(uri.ToString()); 61 if (obsPair == observers_.end()) { 62 TAG_LOGW( 63 AAFwkTag::DBOBSMGR, "uri no obs:%{public}s", CommonUtils::Anonymous(uri.ToString()).c_str()); 64 return NO_OBS_FOR_URI; 65 } 66 67 TAG_LOGD(AAFwkTag::DBOBSMGR, "obs num:%{public}zu:%{public}s", obsPair->second.size(), 68 CommonUtils::Anonymous(uri.ToString()).c_str()); 69 auto obs = obsPair->second.begin(); 70 for (; obs != obsPair->second.end(); obs++) { 71 if ((*obs)->AsObject() == dataObserver->AsObject()) { 72 break; 73 } 74 } 75 if (obs == obsPair->second.end()) { 76 TAG_LOGW( 77 AAFwkTag::DBOBSMGR, "uri no obs:%{public}s", CommonUtils::Anonymous(uri.ToString()).c_str()); 78 return NO_OBS_FOR_URI; 79 } 80 obsPair->second.remove(*obs); 81 if (obsPair->second.empty()) { 82 observers_.erase(obsPair); 83 } 84 85 if (!HaveRegistered(dataObserver)) { 86 RemoveObsDeathRecipient(dataObserver->AsObject()); 87 } 88 89 return NO_ERROR; 90} 91 92int DataObsMgrInner::HandleNotifyChange(const Uri &uri) 93{ 94 std::list<sptr<IDataAbilityObserver>> obsList; 95 std::lock_guard<ffrt::mutex> lock(innerMutex_); 96 { 97 auto obsPair = observers_.find(uri.ToString()); 98 if (obsPair == observers_.end()) { 99 TAG_LOGD(AAFwkTag::DBOBSMGR, "uri no obs:%{public}s", 100 CommonUtils::Anonymous(uri.ToString()).c_str()); 101 return NO_OBS_FOR_URI; 102 } 103 obsList = obsPair->second; 104 } 105 106 for (auto &obs : obsList) { 107 if (obs != nullptr) { 108 obs->OnChange(); 109 } 110 } 111 112 TAG_LOGD(AAFwkTag::DBOBSMGR, "uri end:%{public}s,obs num:%{public}zu", 113 CommonUtils::Anonymous(uri.ToString()).c_str(), obsList.size()); 114 return NO_ERROR; 115} 116 117void DataObsMgrInner::AddObsDeathRecipient(sptr<IDataAbilityObserver> dataObserver) 118{ 119 if ((dataObserver == nullptr) || dataObserver->AsObject() == nullptr) { 120 return; 121 } 122 123 auto it = obsRecipient_.find(dataObserver->AsObject()); 124 if (it != obsRecipient_.end()) { 125 TAG_LOGW(AAFwkTag::DBOBSMGR, "called"); 126 return; 127 } else { 128 std::weak_ptr<DataObsMgrInner> thisWeakPtr(shared_from_this()); 129 sptr<IRemoteObject::DeathRecipient> deathRecipient = 130 new DataObsCallbackRecipient([thisWeakPtr](const wptr<IRemoteObject> &remote) { 131 auto dataObsMgrInner = thisWeakPtr.lock(); 132 if (dataObsMgrInner) { 133 dataObsMgrInner->OnCallBackDied(remote); 134 } 135 }); 136 if (!dataObserver->AsObject()->AddDeathRecipient(deathRecipient)) { 137 TAG_LOGE(AAFwkTag::DBOBSMGR, "failed"); 138 } 139 obsRecipient_.emplace(dataObserver->AsObject(), deathRecipient); 140 } 141} 142 143void DataObsMgrInner::RemoveObsDeathRecipient(sptr<IRemoteObject> dataObserver) 144{ 145 if (dataObserver == nullptr) { 146 return; 147 } 148 149 auto it = obsRecipient_.find(dataObserver); 150 if (it != obsRecipient_.end()) { 151 it->first->RemoveDeathRecipient(it->second); 152 obsRecipient_.erase(it); 153 return; 154 } 155} 156 157void DataObsMgrInner::OnCallBackDied(const wptr<IRemoteObject> &remote) 158{ 159 auto dataObserver = remote.promote(); 160 if (dataObserver == nullptr) { 161 return; 162 } 163 std::lock_guard<ffrt::mutex> lock(innerMutex_); 164 165 if (dataObserver == nullptr) { 166 TAG_LOGE(AAFwkTag::DBOBSMGR, "null dataObserver"); 167 return; 168 } 169 170 RemoveObs(dataObserver); 171} 172 173void DataObsMgrInner::RemoveObs(sptr<IRemoteObject> dataObserver) 174{ 175 for (auto iter = observers_.begin(); iter != observers_.end();) { 176 auto &obsList = iter->second; 177 for (auto it = obsList.begin(); it != obsList.end(); it++) { 178 if ((*it)->AsObject() == dataObserver) { 179 TAG_LOGD(AAFwkTag::DBOBSMGR, "erase"); 180 obsList.erase(it); 181 break; 182 } 183 } 184 if (obsList.size() == 0) { 185 iter = observers_.erase(iter); 186 } else { 187 iter++; 188 } 189 } 190 RemoveObsDeathRecipient(dataObserver); 191} 192 193bool DataObsMgrInner::HaveRegistered(sptr<IDataAbilityObserver> dataObserver) 194{ 195 for (auto &[key, value] : observers_) { 196 auto obs = std::find(value.begin(), value.end(), dataObserver); 197 if (obs != value.end()) { 198 return true; 199 } 200 } 201 return false; 202} 203} // namespace AAFwk 204} // namespace OHOS 205