1/*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15#include "kv_adapter.h"
16
17#include <cinttypes>
18#include <mutex>
19#include <unistd.h>
20
21#include "cJSON.h"
22#include "datetime_ex.h"
23#include "string_ex.h"
24
25#include "dm_anonymous.h"
26#include "dm_constants.h"
27#include "dm_log.h"
28#include "ffrt.h"
29
30namespace OHOS {
31namespace DistributedHardware {
32using namespace OHOS::DistributedKv;
33namespace {
34    const std::string APP_ID = "distributed_device_manager_service";
35    const std::string STORE_ID = "dm_kv_store";
36    const std::string DATABASE_DIR = "/data/service/el1/public/database/distributed_device_manager_service";
37    const std::string KV_REINIT_THREAD = "reinit_kv_store";
38    constexpr uint32_t MAX_BATCH_SIZE = 128;
39    constexpr int32_t MAX_STRING_LEN = 4096;
40    constexpr int32_t MAX_INIT_RETRY_TIMES = 20;
41    constexpr int32_t INIT_RETRY_SLEEP_INTERVAL = 200 * 1000; // 200ms
42}
43
44int32_t KVAdapter::Init()
45{
46    LOGI("Init local DB, dataType: %{public}d", static_cast<int32_t>(dataType_));
47    if (isInited_.load()) {
48        LOGI("Local DB already inited.");
49        return DM_OK;
50    }
51    this->appId_.appId = APP_ID;
52    this->storeId_.storeId = STORE_ID;
53    std::lock_guard<std::mutex> lock(kvAdapterMutex_);
54    int32_t tryTimes = MAX_INIT_RETRY_TIMES;
55    while (tryTimes > 0) {
56        DistributedKv::Status status = GetLocalKvStorePtr();
57        if (status == DistributedKv::Status::SUCCESS && kvStorePtr_) {
58            LOGI("Init KvStorePtr Success");
59            RegisterKvStoreDeathListener();
60            isInited_.store(true);
61            return DM_OK;
62        }
63        LOGE("CheckKvStore, left times: %{public}d, status: %{public}d", tryTimes, status);
64        if (status == DistributedKv::Status::STORE_META_CHANGED ||
65            status == DistributedKv::Status::SECURITY_LEVEL_ERROR ||
66            status == DistributedKv::Status::DATA_CORRUPTED) {
67            LOGE("init db error, remove and rebuild it");
68            DeleteKvStore();
69        }
70        usleep(INIT_RETRY_SLEEP_INTERVAL);
71        tryTimes--;
72    }
73    CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_INIT_FAILED);
74    isInited_.store(true);
75    return DM_OK;
76}
77
78void KVAdapter::UnInit()
79{
80    LOGI("KVAdapter Uninted");
81    if (isInited_.load()) {
82        std::lock_guard<std::mutex> lock(kvAdapterMutex_);
83        CHECK_NULL_VOID(kvStorePtr_);
84        UnregisterKvStoreDeathListener();
85        kvStorePtr_.reset();
86        isInited_.store(false);
87    }
88}
89
90int32_t KVAdapter::ReInit()
91{
92    LOGI("KVAdapter ReInit");
93    UnInit();
94    return Init();
95}
96
97int32_t KVAdapter::Put(const std::string &key, const std::string &value)
98{
99    if (key.empty() || key.size() > MAX_STRING_LEN || value.empty() || value.size() > MAX_STRING_LEN) {
100        LOGE("Param is invalid!");
101        return ERR_DM_FAILED;
102    }
103    DistributedKv::Status status;
104    {
105        std::lock_guard<std::mutex> lock(kvAdapterMutex_);
106        CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_POINT_NULL);
107
108        DistributedKv::Key kvKey(key);
109        DistributedKv::Value kvValue(value);
110        status = kvStorePtr_->Put(kvKey, kvValue);
111    }
112    if (status != DistributedKv::Status::SUCCESS) {
113        LOGE("Put kv to db failed, ret: %{public}d", status);
114        return ERR_DM_FAILED;
115    }
116    return DM_OK;
117}
118
119int32_t KVAdapter::Get(const std::string &key, std::string &value)
120{
121    LOGI("Get data by key: %{public}s", GetAnonyString(key).c_str());
122    DistributedKv::Key kvKey(key);
123    DistributedKv::Value kvValue;
124    DistributedKv::Status status;
125    {
126        std::lock_guard<std::mutex> lock(kvAdapterMutex_);
127        CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_POINT_NULL);
128        status = kvStorePtr_->Get(kvKey, kvValue);
129    }
130    if (status != DistributedKv::Status::SUCCESS) {
131        LOGE("Get data from kv failed, key: %{public}s", GetAnonyString(key).c_str());
132        return ERR_DM_FAILED;
133    }
134    value = kvValue.ToString();
135    return DM_OK;
136}
137
138void KVAdapter::OnRemoteDied()
139{
140    LOGI("OnRemoteDied, recover db begin");
141    auto reInitTask = [this]() {
142        LOGI("ReInit, storeId:%{public}s", storeId_.storeId.c_str());
143        ReInit();
144    };
145    ffrt::submit(reInitTask);
146}
147
148DistributedKv::Status KVAdapter::GetLocalKvStorePtr()
149{
150    DistributedKv::Options options = {
151        .createIfMissing = true,
152        .encrypt = false,
153        .autoSync = false,
154        .securityLevel = DistributedKv::SecurityLevel::S1,
155        .area = DistributedKv::EL1,
156        .kvStoreType = DistributedKv::KvStoreType::SINGLE_VERSION,
157        .baseDir = DATABASE_DIR
158    };
159    DistributedKv::Status status = kvDataMgr_.GetSingleKvStore(options, appId_, storeId_, kvStorePtr_);
160    return status;
161}
162
163void KVAdapter::RegisterKvStoreDeathListener()
164{
165    LOGI("Register syncCompleted listener");
166    kvDataMgr_.RegisterKvStoreServiceDeathRecipient(shared_from_this());
167}
168
169void KVAdapter::UnregisterKvStoreDeathListener()
170{
171    LOGI("UnRegister death listener");
172    kvDataMgr_.UnRegisterKvStoreServiceDeathRecipient(shared_from_this());
173}
174
175int32_t KVAdapter::DeleteKvStore()
176{
177    LOGI("Delete KvStore!");
178    kvDataMgr_.CloseKvStore(appId_, storeId_);
179    kvDataMgr_.DeleteKvStore(appId_, storeId_, DATABASE_DIR);
180    return DM_OK;
181}
182
183int32_t KVAdapter::DeleteByAppId(const std::string &appId, const std::string &prefix)
184{
185    if (appId.empty()) {
186        LOGE("appId is empty");
187        return ERR_DM_FAILED;
188    }
189    std::vector<DistributedKv::Entry> localEntries;
190    {
191        std::lock_guard<std::mutex> lock(kvAdapterMutex_);
192        if (kvStorePtr_ == nullptr) {
193            LOGE("kvStoragePtr_ is null");
194            return ERR_DM_POINT_NULL;
195        }
196        if (kvStorePtr_->GetEntries(prefix + appId, localEntries) != DistributedKv::Status::SUCCESS) {
197            LOGE("Get entrys from DB failed.");
198            return ERR_DM_FAILED;
199        }
200    }
201    std::vector<std::string> delKeys;
202    for (const auto &entry : localEntries) {
203        delKeys.emplace_back(entry.key.ToString());
204        DmKVValue kvValue;
205        ConvertJsonToDmKVValue(entry.value.ToString(), kvValue);
206        delKeys.emplace_back(prefix + kvValue.anoyDeviceId);
207    }
208    return DeleteBatch(delKeys);
209}
210
211int32_t KVAdapter::DeleteBatch(const std::vector<std::string> &keys)
212{
213    if (keys.empty()) {
214        LOGE("keys size(%{public}zu) is invalid!", keys.size());
215        return ERR_DM_FAILED;
216    }
217    uint32_t keysSize = static_cast<uint32_t>(keys.size());
218    std::vector<std::vector<DistributedKv::Key>> delKeyBatches;
219    for (uint32_t i = 0; i < keysSize; i += MAX_BATCH_SIZE) {
220        uint32_t end = (i + MAX_BATCH_SIZE) > keysSize ? keysSize : (i + MAX_BATCH_SIZE);
221        auto batch = std::vector<std::string>(keys.begin() + i, keys.begin() + end);
222        std::vector<DistributedKv::Key> delKeys;
223        for (auto item : batch) {
224            DistributedKv::Key key(item);
225            delKeys.emplace_back(key);
226        }
227        delKeyBatches.emplace_back(delKeys);
228    }
229
230    {
231        std::lock_guard<std::mutex> lock(kvAdapterMutex_);
232        if (kvStorePtr_ == nullptr) {
233            LOGE("kvStorePtr is nullptr!");
234            return ERR_DM_POINT_NULL;
235        }
236        for (auto delKeys : delKeyBatches) {
237            DistributedKv::Status status = kvStorePtr_->DeleteBatch(delKeys);
238            if (status != DistributedKv::Status::SUCCESS) {
239                LOGE("DeleteBatch failed!");
240                return ERR_DM_FAILED;
241            }
242        }
243    }
244    return DM_OK;
245}
246} // namespace DistributedHardware
247} // namespace OHOS
248