1/*
2 * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15#include "dataobs_mgr_inner.h"
16
17#include "data_ability_observer_stub.h"
18#include "dataobs_mgr_errors.h"
19#include "hilog_tag_wrapper.h"
20#include "common_utils.h"
21
22namespace OHOS {
23namespace AAFwk {
24
25DataObsMgrInner::DataObsMgrInner() {}
26
27DataObsMgrInner::~DataObsMgrInner() {}
28
29int DataObsMgrInner::HandleRegisterObserver(const Uri &uri, sptr<IDataAbilityObserver> dataObserver)
30{
31    std::lock_guard<ffrt::mutex> lock(innerMutex_);
32
33    auto [obsPair, flag] = observers_.try_emplace(uri.ToString(), std::list<sptr<IDataAbilityObserver>>());
34    if (!flag && obsPair->second.size() > OBS_NUM_MAX) {
35        TAG_LOGE(AAFwkTag::DBOBSMGR,
36            "subscribers num:%{public}s maxed",
37            CommonUtils::Anonymous(uri.ToString()).c_str());
38        return DATAOBS_SERVICE_OBS_LIMMIT;
39    }
40
41    for (auto obs = obsPair->second.begin(); obs != obsPair->second.end(); obs++) {
42        if ((*obs)->AsObject() == dataObserver->AsObject()) {
43            TAG_LOGE(AAFwkTag::DBOBSMGR, "obs registered:%{public}s",
44                CommonUtils::Anonymous(uri.ToString()).c_str());
45            return OBS_EXIST;
46        }
47    }
48
49    obsPair->second.push_back(dataObserver);
50
51    AddObsDeathRecipient(dataObserver);
52
53    return NO_ERROR;
54}
55
56int DataObsMgrInner::HandleUnregisterObserver(const Uri &uri, sptr<IDataAbilityObserver> dataObserver)
57{
58    std::lock_guard<ffrt::mutex> lock(innerMutex_);
59
60    auto obsPair = observers_.find(uri.ToString());
61    if (obsPair == observers_.end()) {
62        TAG_LOGW(
63            AAFwkTag::DBOBSMGR, "uri no obs:%{public}s", CommonUtils::Anonymous(uri.ToString()).c_str());
64        return NO_OBS_FOR_URI;
65    }
66
67    TAG_LOGD(AAFwkTag::DBOBSMGR, "obs num:%{public}zu:%{public}s", obsPair->second.size(),
68        CommonUtils::Anonymous(uri.ToString()).c_str());
69    auto obs = obsPair->second.begin();
70    for (; obs != obsPair->second.end(); obs++) {
71        if ((*obs)->AsObject() == dataObserver->AsObject()) {
72            break;
73        }
74    }
75    if (obs == obsPair->second.end()) {
76        TAG_LOGW(
77            AAFwkTag::DBOBSMGR, "uri no obs:%{public}s", CommonUtils::Anonymous(uri.ToString()).c_str());
78        return NO_OBS_FOR_URI;
79    }
80    obsPair->second.remove(*obs);
81    if (obsPair->second.empty()) {
82        observers_.erase(obsPair);
83    }
84
85    if (!HaveRegistered(dataObserver)) {
86        RemoveObsDeathRecipient(dataObserver->AsObject());
87    }
88
89    return NO_ERROR;
90}
91
92int DataObsMgrInner::HandleNotifyChange(const Uri &uri)
93{
94    std::list<sptr<IDataAbilityObserver>> obsList;
95    std::lock_guard<ffrt::mutex> lock(innerMutex_);
96    {
97        auto obsPair = observers_.find(uri.ToString());
98        if (obsPair == observers_.end()) {
99            TAG_LOGD(AAFwkTag::DBOBSMGR, "uri no obs:%{public}s",
100                CommonUtils::Anonymous(uri.ToString()).c_str());
101            return NO_OBS_FOR_URI;
102        }
103        obsList = obsPair->second;
104    }
105
106    for (auto &obs : obsList) {
107        if (obs != nullptr) {
108            obs->OnChange();
109        }
110    }
111
112    TAG_LOGD(AAFwkTag::DBOBSMGR, "uri end:%{public}s,obs num:%{public}zu",
113        CommonUtils::Anonymous(uri.ToString()).c_str(), obsList.size());
114    return NO_ERROR;
115}
116
117void DataObsMgrInner::AddObsDeathRecipient(sptr<IDataAbilityObserver> dataObserver)
118{
119    if ((dataObserver == nullptr) || dataObserver->AsObject() == nullptr) {
120        return;
121    }
122
123    auto it = obsRecipient_.find(dataObserver->AsObject());
124    if (it != obsRecipient_.end()) {
125        TAG_LOGW(AAFwkTag::DBOBSMGR, "called");
126        return;
127    } else {
128        std::weak_ptr<DataObsMgrInner> thisWeakPtr(shared_from_this());
129        sptr<IRemoteObject::DeathRecipient> deathRecipient =
130            new DataObsCallbackRecipient([thisWeakPtr](const wptr<IRemoteObject> &remote) {
131                auto dataObsMgrInner = thisWeakPtr.lock();
132                if (dataObsMgrInner) {
133                    dataObsMgrInner->OnCallBackDied(remote);
134                }
135            });
136        if (!dataObserver->AsObject()->AddDeathRecipient(deathRecipient)) {
137            TAG_LOGE(AAFwkTag::DBOBSMGR, "failed");
138        }
139        obsRecipient_.emplace(dataObserver->AsObject(), deathRecipient);
140    }
141}
142
143void DataObsMgrInner::RemoveObsDeathRecipient(sptr<IRemoteObject> dataObserver)
144{
145    if (dataObserver == nullptr) {
146        return;
147    }
148
149    auto it = obsRecipient_.find(dataObserver);
150    if (it != obsRecipient_.end()) {
151        it->first->RemoveDeathRecipient(it->second);
152        obsRecipient_.erase(it);
153        return;
154    }
155}
156
157void DataObsMgrInner::OnCallBackDied(const wptr<IRemoteObject> &remote)
158{
159    auto dataObserver = remote.promote();
160    if (dataObserver == nullptr) {
161        return;
162    }
163    std::lock_guard<ffrt::mutex> lock(innerMutex_);
164
165    if (dataObserver == nullptr) {
166        TAG_LOGE(AAFwkTag::DBOBSMGR, "null dataObserver");
167        return;
168    }
169
170    RemoveObs(dataObserver);
171}
172
173void DataObsMgrInner::RemoveObs(sptr<IRemoteObject> dataObserver)
174{
175    for (auto iter = observers_.begin(); iter != observers_.end();) {
176        auto &obsList = iter->second;
177        for (auto it = obsList.begin(); it != obsList.end(); it++) {
178            if ((*it)->AsObject() == dataObserver) {
179                TAG_LOGD(AAFwkTag::DBOBSMGR, "erase");
180                obsList.erase(it);
181                break;
182            }
183        }
184        if (obsList.size() == 0) {
185            iter = observers_.erase(iter);
186        } else {
187            iter++;
188        }
189    }
190    RemoveObsDeathRecipient(dataObserver);
191}
192
193bool DataObsMgrInner::HaveRegistered(sptr<IDataAbilityObserver> dataObserver)
194{
195    for (auto &[key, value] : observers_) {
196        auto obs = std::find(value.begin(), value.end(), dataObserver);
197        if (obs != value.end()) {
198            return true;
199        }
200    }
201    return false;
202}
203}  // namespace AAFwk
204}  // namespace OHOS
205