153c3577eSopenharmony_ci/* 253c3577eSopenharmony_ci * Copyright (c) 2024 Huawei Device Co., Ltd. 353c3577eSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 453c3577eSopenharmony_ci * you may not use this file except in compliance with the License. 553c3577eSopenharmony_ci * You may obtain a copy of the License at 653c3577eSopenharmony_ci * 753c3577eSopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 853c3577eSopenharmony_ci * 953c3577eSopenharmony_ci * Unless required by applicable law or agreed to in writing, software 1053c3577eSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 1153c3577eSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1253c3577eSopenharmony_ci * See the License for the specific language governing permissions and 1353c3577eSopenharmony_ci * limitations under the License. 1453c3577eSopenharmony_ci */ 1553c3577eSopenharmony_ci 1653c3577eSopenharmony_ci#define LOG_TAG "KVDBWatcher" 1753c3577eSopenharmony_ci 1853c3577eSopenharmony_ci#include "kvdb_watcher.h" 1953c3577eSopenharmony_ci 2053c3577eSopenharmony_ci#include "error/general_error.h" 2153c3577eSopenharmony_ci#include "ikvstore_observer.h" 2253c3577eSopenharmony_ci#include "log_print.h" 2353c3577eSopenharmony_ci#include "types.h" 2453c3577eSopenharmony_ci#include "utils/anonymous.h" 2553c3577eSopenharmony_ci 2653c3577eSopenharmony_cinamespace OHOS::DistributedKv { 2753c3577eSopenharmony_ciusing namespace DistributedData; 2853c3577eSopenharmony_ciusing Error = DistributedData::GeneralError; 2953c3577eSopenharmony_ciKVDBWatcher::KVDBWatcher() {} 3053c3577eSopenharmony_ci 3153c3577eSopenharmony_ciint32_t KVDBWatcher::OnChange(const Origin &origin, const PRIFields &primaryFields, ChangeInfo &&values) 3253c3577eSopenharmony_ci{ 3353c3577eSopenharmony_ci auto store = origin.store; 3453c3577eSopenharmony_ci auto changeData = values.find(store); 3553c3577eSopenharmony_ci if (changeData != values.end()) { 3653c3577eSopenharmony_ci auto observer = GetObserver(); 3753c3577eSopenharmony_ci if (observer == nullptr) { 3853c3577eSopenharmony_ci return E_NOT_INIT; 3953c3577eSopenharmony_ci } 4053c3577eSopenharmony_ci std::vector<std::string> keys[OP_BUTT]{}; 4153c3577eSopenharmony_ci keys[OP_INSERT] = ConvertToKeys(changeData->second[OP_INSERT]); 4253c3577eSopenharmony_ci keys[OP_UPDATE] = ConvertToKeys(changeData->second[OP_UPDATE]); 4353c3577eSopenharmony_ci keys[OP_DELETE] = ConvertToKeys(changeData->second[OP_DELETE]); 4453c3577eSopenharmony_ci DataOrigin dataOrigin; 4553c3577eSopenharmony_ci dataOrigin.id = origin.id; 4653c3577eSopenharmony_ci dataOrigin.store = origin.store; 4753c3577eSopenharmony_ci observer->OnChange(dataOrigin, std::move(keys)); 4853c3577eSopenharmony_ci } 4953c3577eSopenharmony_ci return E_OK; 5053c3577eSopenharmony_ci} 5153c3577eSopenharmony_ci 5253c3577eSopenharmony_ciint32_t KVDBWatcher::OnChange(const Origin &origin, const Fields &fields, ChangeData &&datas) 5353c3577eSopenharmony_ci{ 5453c3577eSopenharmony_ci auto store = origin.store; 5553c3577eSopenharmony_ci auto changeData = datas.find(store); 5653c3577eSopenharmony_ci if (changeData != datas.end()) { 5753c3577eSopenharmony_ci auto observer = GetObserver(); 5853c3577eSopenharmony_ci if (observer == nullptr) { 5953c3577eSopenharmony_ci return E_NOT_INIT; 6053c3577eSopenharmony_ci } 6153c3577eSopenharmony_ci auto inserts = ConvertToEntries(changeData->second[OP_INSERT]); 6253c3577eSopenharmony_ci auto updates = ConvertToEntries(changeData->second[OP_UPDATE]); 6353c3577eSopenharmony_ci auto deletes = ConvertToEntries(changeData->second[OP_DELETE]); 6453c3577eSopenharmony_ci ChangeNotification change(std::move(inserts), std::move(updates), std::move(deletes), {}, false); 6553c3577eSopenharmony_ci observer->OnChange(change); 6653c3577eSopenharmony_ci } 6753c3577eSopenharmony_ci return E_OK; 6853c3577eSopenharmony_ci} 6953c3577eSopenharmony_ci 7053c3577eSopenharmony_cisptr<IKvStoreObserver> KVDBWatcher::GetObserver() const 7153c3577eSopenharmony_ci{ 7253c3577eSopenharmony_ci std::shared_lock<decltype(mutex_)> lock(mutex_); 7353c3577eSopenharmony_ci return observer_; 7453c3577eSopenharmony_ci} 7553c3577eSopenharmony_ci 7653c3577eSopenharmony_civoid KVDBWatcher::SetObserver(sptr<IKvStoreObserver> observer) 7753c3577eSopenharmony_ci{ 7853c3577eSopenharmony_ci std::unique_lock<decltype(mutex_)> lock(mutex_); 7953c3577eSopenharmony_ci observer_ = std::move(observer); 8053c3577eSopenharmony_ci} 8153c3577eSopenharmony_ci 8253c3577eSopenharmony_cistd::vector<Entry> KVDBWatcher::ConvertToEntries(const std::vector<Values> &values) 8353c3577eSopenharmony_ci{ 8453c3577eSopenharmony_ci std::vector<Entry> changeData{}; 8553c3577eSopenharmony_ci for (auto &info : values) { 8653c3577eSopenharmony_ci auto key = std::get_if<Bytes>(&info[0]); 8753c3577eSopenharmony_ci auto value = std::get_if<Bytes>(&info[1]); 8853c3577eSopenharmony_ci if (key == nullptr || value == nullptr) { 8953c3577eSopenharmony_ci continue; 9053c3577eSopenharmony_ci } 9153c3577eSopenharmony_ci Entry tmpEntry{ *key, *value }; 9253c3577eSopenharmony_ci changeData.push_back(std::move(tmpEntry)); 9353c3577eSopenharmony_ci } 9453c3577eSopenharmony_ci return changeData; 9553c3577eSopenharmony_ci} 9653c3577eSopenharmony_ci 9753c3577eSopenharmony_cistd::vector<std::string> KVDBWatcher::ConvertToKeys(const std::vector<PRIValue> &values) 9853c3577eSopenharmony_ci{ 9953c3577eSopenharmony_ci std::vector<std::string> keys{}; 10053c3577eSopenharmony_ci for (auto &info : values) { 10153c3577eSopenharmony_ci auto key = std::get_if<std::string>(&info); 10253c3577eSopenharmony_ci if (key == nullptr) { 10353c3577eSopenharmony_ci continue; 10453c3577eSopenharmony_ci } 10553c3577eSopenharmony_ci keys.push_back(std::move(*key)); 10653c3577eSopenharmony_ci } 10753c3577eSopenharmony_ci return keys; 10853c3577eSopenharmony_ci} 10953c3577eSopenharmony_ci} // namespace OHOS::DistributedKv 110