153c3577eSopenharmony_ci/*
253c3577eSopenharmony_ci * Copyright (c) 2024 Huawei Device Co., Ltd.
353c3577eSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
453c3577eSopenharmony_ci * you may not use this file except in compliance with the License.
553c3577eSopenharmony_ci * You may obtain a copy of the License at
653c3577eSopenharmony_ci *
753c3577eSopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
853c3577eSopenharmony_ci *
953c3577eSopenharmony_ci * Unless required by applicable law or agreed to in writing, software
1053c3577eSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
1153c3577eSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1253c3577eSopenharmony_ci * See the License for the specific language governing permissions and
1353c3577eSopenharmony_ci * limitations under the License.
1453c3577eSopenharmony_ci */
1553c3577eSopenharmony_ci
1653c3577eSopenharmony_ci#define LOG_TAG "KVDBWatcher"
1753c3577eSopenharmony_ci
1853c3577eSopenharmony_ci#include "kvdb_watcher.h"
1953c3577eSopenharmony_ci
2053c3577eSopenharmony_ci#include "error/general_error.h"
2153c3577eSopenharmony_ci#include "ikvstore_observer.h"
2253c3577eSopenharmony_ci#include "log_print.h"
2353c3577eSopenharmony_ci#include "types.h"
2453c3577eSopenharmony_ci#include "utils/anonymous.h"
2553c3577eSopenharmony_ci
2653c3577eSopenharmony_cinamespace OHOS::DistributedKv {
2753c3577eSopenharmony_ciusing namespace DistributedData;
2853c3577eSopenharmony_ciusing Error = DistributedData::GeneralError;
2953c3577eSopenharmony_ciKVDBWatcher::KVDBWatcher() {}
3053c3577eSopenharmony_ci
3153c3577eSopenharmony_ciint32_t KVDBWatcher::OnChange(const Origin &origin, const PRIFields &primaryFields, ChangeInfo &&values)
3253c3577eSopenharmony_ci{
3353c3577eSopenharmony_ci    auto store = origin.store;
3453c3577eSopenharmony_ci    auto changeData = values.find(store);
3553c3577eSopenharmony_ci    if (changeData != values.end()) {
3653c3577eSopenharmony_ci        auto observer = GetObserver();
3753c3577eSopenharmony_ci        if (observer == nullptr) {
3853c3577eSopenharmony_ci            return E_NOT_INIT;
3953c3577eSopenharmony_ci        }
4053c3577eSopenharmony_ci        std::vector<std::string> keys[OP_BUTT]{};
4153c3577eSopenharmony_ci        keys[OP_INSERT] = ConvertToKeys(changeData->second[OP_INSERT]);
4253c3577eSopenharmony_ci        keys[OP_UPDATE] = ConvertToKeys(changeData->second[OP_UPDATE]);
4353c3577eSopenharmony_ci        keys[OP_DELETE] = ConvertToKeys(changeData->second[OP_DELETE]);
4453c3577eSopenharmony_ci        DataOrigin dataOrigin;
4553c3577eSopenharmony_ci        dataOrigin.id = origin.id;
4653c3577eSopenharmony_ci        dataOrigin.store = origin.store;
4753c3577eSopenharmony_ci        observer->OnChange(dataOrigin, std::move(keys));
4853c3577eSopenharmony_ci    }
4953c3577eSopenharmony_ci    return E_OK;
5053c3577eSopenharmony_ci}
5153c3577eSopenharmony_ci
5253c3577eSopenharmony_ciint32_t KVDBWatcher::OnChange(const Origin &origin, const Fields &fields, ChangeData &&datas)
5353c3577eSopenharmony_ci{
5453c3577eSopenharmony_ci    auto store = origin.store;
5553c3577eSopenharmony_ci    auto changeData = datas.find(store);
5653c3577eSopenharmony_ci    if (changeData != datas.end()) {
5753c3577eSopenharmony_ci        auto observer = GetObserver();
5853c3577eSopenharmony_ci        if (observer == nullptr) {
5953c3577eSopenharmony_ci            return E_NOT_INIT;
6053c3577eSopenharmony_ci        }
6153c3577eSopenharmony_ci        auto inserts = ConvertToEntries(changeData->second[OP_INSERT]);
6253c3577eSopenharmony_ci        auto updates = ConvertToEntries(changeData->second[OP_UPDATE]);
6353c3577eSopenharmony_ci        auto deletes = ConvertToEntries(changeData->second[OP_DELETE]);
6453c3577eSopenharmony_ci        ChangeNotification change(std::move(inserts), std::move(updates), std::move(deletes), {}, false);
6553c3577eSopenharmony_ci        observer->OnChange(change);
6653c3577eSopenharmony_ci    }
6753c3577eSopenharmony_ci    return E_OK;
6853c3577eSopenharmony_ci}
6953c3577eSopenharmony_ci
7053c3577eSopenharmony_cisptr<IKvStoreObserver> KVDBWatcher::GetObserver() const
7153c3577eSopenharmony_ci{
7253c3577eSopenharmony_ci    std::shared_lock<decltype(mutex_)> lock(mutex_);
7353c3577eSopenharmony_ci    return observer_;
7453c3577eSopenharmony_ci}
7553c3577eSopenharmony_ci
7653c3577eSopenharmony_civoid KVDBWatcher::SetObserver(sptr<IKvStoreObserver> observer)
7753c3577eSopenharmony_ci{
7853c3577eSopenharmony_ci    std::unique_lock<decltype(mutex_)> lock(mutex_);
7953c3577eSopenharmony_ci    observer_ = std::move(observer);
8053c3577eSopenharmony_ci}
8153c3577eSopenharmony_ci
8253c3577eSopenharmony_cistd::vector<Entry> KVDBWatcher::ConvertToEntries(const std::vector<Values> &values)
8353c3577eSopenharmony_ci{
8453c3577eSopenharmony_ci    std::vector<Entry> changeData{};
8553c3577eSopenharmony_ci    for (auto &info : values) {
8653c3577eSopenharmony_ci        auto key = std::get_if<Bytes>(&info[0]);
8753c3577eSopenharmony_ci        auto value = std::get_if<Bytes>(&info[1]);
8853c3577eSopenharmony_ci        if (key == nullptr || value == nullptr) {
8953c3577eSopenharmony_ci            continue;
9053c3577eSopenharmony_ci        }
9153c3577eSopenharmony_ci        Entry tmpEntry{ *key, *value };
9253c3577eSopenharmony_ci        changeData.push_back(std::move(tmpEntry));
9353c3577eSopenharmony_ci    }
9453c3577eSopenharmony_ci    return changeData;
9553c3577eSopenharmony_ci}
9653c3577eSopenharmony_ci
9753c3577eSopenharmony_cistd::vector<std::string> KVDBWatcher::ConvertToKeys(const std::vector<PRIValue> &values)
9853c3577eSopenharmony_ci{
9953c3577eSopenharmony_ci    std::vector<std::string> keys{};
10053c3577eSopenharmony_ci    for (auto &info : values) {
10153c3577eSopenharmony_ci        auto key = std::get_if<std::string>(&info);
10253c3577eSopenharmony_ci        if (key == nullptr) {
10353c3577eSopenharmony_ci            continue;
10453c3577eSopenharmony_ci        }
10553c3577eSopenharmony_ci        keys.push_back(std::move(*key));
10653c3577eSopenharmony_ci    }
10753c3577eSopenharmony_ci    return keys;
10853c3577eSopenharmony_ci}
10953c3577eSopenharmony_ci} // namespace OHOS::DistributedKv
110