1/* 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15#include "kv_adapter.h" 16 17#include <cinttypes> 18#include <mutex> 19#include <unistd.h> 20 21#include "cJSON.h" 22#include "datetime_ex.h" 23#include "string_ex.h" 24 25#include "dm_anonymous.h" 26#include "dm_constants.h" 27#include "dm_log.h" 28#include "ffrt.h" 29 30namespace OHOS { 31namespace DistributedHardware { 32using namespace OHOS::DistributedKv; 33namespace { 34 const std::string APP_ID = "distributed_device_manager_service"; 35 const std::string STORE_ID = "dm_kv_store"; 36 const std::string DATABASE_DIR = "/data/service/el1/public/database/distributed_device_manager_service"; 37 const std::string KV_REINIT_THREAD = "reinit_kv_store"; 38 constexpr uint32_t MAX_BATCH_SIZE = 128; 39 constexpr int32_t MAX_STRING_LEN = 4096; 40 constexpr int32_t MAX_INIT_RETRY_TIMES = 20; 41 constexpr int32_t INIT_RETRY_SLEEP_INTERVAL = 200 * 1000; // 200ms 42} 43 44int32_t KVAdapter::Init() 45{ 46 LOGI("Init local DB, dataType: %{public}d", static_cast<int32_t>(dataType_)); 47 if (isInited_.load()) { 48 LOGI("Local DB already inited."); 49 return DM_OK; 50 } 51 this->appId_.appId = APP_ID; 52 this->storeId_.storeId = STORE_ID; 53 std::lock_guard<std::mutex> lock(kvAdapterMutex_); 54 int32_t tryTimes = MAX_INIT_RETRY_TIMES; 55 while (tryTimes > 0) { 56 DistributedKv::Status status = GetLocalKvStorePtr(); 57 if (status == DistributedKv::Status::SUCCESS && kvStorePtr_) { 58 LOGI("Init KvStorePtr Success"); 59 RegisterKvStoreDeathListener(); 60 isInited_.store(true); 61 return DM_OK; 62 } 63 LOGE("CheckKvStore, left times: %{public}d, status: %{public}d", tryTimes, status); 64 if (status == DistributedKv::Status::STORE_META_CHANGED || 65 status == DistributedKv::Status::SECURITY_LEVEL_ERROR || 66 status == DistributedKv::Status::DATA_CORRUPTED) { 67 LOGE("init db error, remove and rebuild it"); 68 DeleteKvStore(); 69 } 70 usleep(INIT_RETRY_SLEEP_INTERVAL); 71 tryTimes--; 72 } 73 CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_INIT_FAILED); 74 isInited_.store(true); 75 return DM_OK; 76} 77 78void KVAdapter::UnInit() 79{ 80 LOGI("KVAdapter Uninted"); 81 if (isInited_.load()) { 82 std::lock_guard<std::mutex> lock(kvAdapterMutex_); 83 CHECK_NULL_VOID(kvStorePtr_); 84 UnregisterKvStoreDeathListener(); 85 kvStorePtr_.reset(); 86 isInited_.store(false); 87 } 88} 89 90int32_t KVAdapter::ReInit() 91{ 92 LOGI("KVAdapter ReInit"); 93 UnInit(); 94 return Init(); 95} 96 97int32_t KVAdapter::Put(const std::string &key, const std::string &value) 98{ 99 if (key.empty() || key.size() > MAX_STRING_LEN || value.empty() || value.size() > MAX_STRING_LEN) { 100 LOGE("Param is invalid!"); 101 return ERR_DM_FAILED; 102 } 103 DistributedKv::Status status; 104 { 105 std::lock_guard<std::mutex> lock(kvAdapterMutex_); 106 CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_POINT_NULL); 107 108 DistributedKv::Key kvKey(key); 109 DistributedKv::Value kvValue(value); 110 status = kvStorePtr_->Put(kvKey, kvValue); 111 } 112 if (status != DistributedKv::Status::SUCCESS) { 113 LOGE("Put kv to db failed, ret: %{public}d", status); 114 return ERR_DM_FAILED; 115 } 116 return DM_OK; 117} 118 119int32_t KVAdapter::Get(const std::string &key, std::string &value) 120{ 121 LOGI("Get data by key: %{public}s", GetAnonyString(key).c_str()); 122 DistributedKv::Key kvKey(key); 123 DistributedKv::Value kvValue; 124 DistributedKv::Status status; 125 { 126 std::lock_guard<std::mutex> lock(kvAdapterMutex_); 127 CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_POINT_NULL); 128 status = kvStorePtr_->Get(kvKey, kvValue); 129 } 130 if (status != DistributedKv::Status::SUCCESS) { 131 LOGE("Get data from kv failed, key: %{public}s", GetAnonyString(key).c_str()); 132 return ERR_DM_FAILED; 133 } 134 value = kvValue.ToString(); 135 return DM_OK; 136} 137 138void KVAdapter::OnRemoteDied() 139{ 140 LOGI("OnRemoteDied, recover db begin"); 141 auto reInitTask = [this]() { 142 LOGI("ReInit, storeId:%{public}s", storeId_.storeId.c_str()); 143 ReInit(); 144 }; 145 ffrt::submit(reInitTask); 146} 147 148DistributedKv::Status KVAdapter::GetLocalKvStorePtr() 149{ 150 DistributedKv::Options options = { 151 .createIfMissing = true, 152 .encrypt = false, 153 .autoSync = false, 154 .securityLevel = DistributedKv::SecurityLevel::S1, 155 .area = DistributedKv::EL1, 156 .kvStoreType = DistributedKv::KvStoreType::SINGLE_VERSION, 157 .baseDir = DATABASE_DIR 158 }; 159 DistributedKv::Status status = kvDataMgr_.GetSingleKvStore(options, appId_, storeId_, kvStorePtr_); 160 return status; 161} 162 163void KVAdapter::RegisterKvStoreDeathListener() 164{ 165 LOGI("Register syncCompleted listener"); 166 kvDataMgr_.RegisterKvStoreServiceDeathRecipient(shared_from_this()); 167} 168 169void KVAdapter::UnregisterKvStoreDeathListener() 170{ 171 LOGI("UnRegister death listener"); 172 kvDataMgr_.UnRegisterKvStoreServiceDeathRecipient(shared_from_this()); 173} 174 175int32_t KVAdapter::DeleteKvStore() 176{ 177 LOGI("Delete KvStore!"); 178 kvDataMgr_.CloseKvStore(appId_, storeId_); 179 kvDataMgr_.DeleteKvStore(appId_, storeId_, DATABASE_DIR); 180 return DM_OK; 181} 182 183int32_t KVAdapter::DeleteByAppId(const std::string &appId, const std::string &prefix) 184{ 185 if (appId.empty()) { 186 LOGE("appId is empty"); 187 return ERR_DM_FAILED; 188 } 189 std::vector<DistributedKv::Entry> localEntries; 190 { 191 std::lock_guard<std::mutex> lock(kvAdapterMutex_); 192 if (kvStorePtr_ == nullptr) { 193 LOGE("kvStoragePtr_ is null"); 194 return ERR_DM_POINT_NULL; 195 } 196 if (kvStorePtr_->GetEntries(prefix + appId, localEntries) != DistributedKv::Status::SUCCESS) { 197 LOGE("Get entrys from DB failed."); 198 return ERR_DM_FAILED; 199 } 200 } 201 std::vector<std::string> delKeys; 202 for (const auto &entry : localEntries) { 203 delKeys.emplace_back(entry.key.ToString()); 204 DmKVValue kvValue; 205 ConvertJsonToDmKVValue(entry.value.ToString(), kvValue); 206 delKeys.emplace_back(prefix + kvValue.anoyDeviceId); 207 } 208 return DeleteBatch(delKeys); 209} 210 211int32_t KVAdapter::DeleteBatch(const std::vector<std::string> &keys) 212{ 213 if (keys.empty()) { 214 LOGE("keys size(%{public}zu) is invalid!", keys.size()); 215 return ERR_DM_FAILED; 216 } 217 uint32_t keysSize = static_cast<uint32_t>(keys.size()); 218 std::vector<std::vector<DistributedKv::Key>> delKeyBatches; 219 for (uint32_t i = 0; i < keysSize; i += MAX_BATCH_SIZE) { 220 uint32_t end = (i + MAX_BATCH_SIZE) > keysSize ? keysSize : (i + MAX_BATCH_SIZE); 221 auto batch = std::vector<std::string>(keys.begin() + i, keys.begin() + end); 222 std::vector<DistributedKv::Key> delKeys; 223 for (auto item : batch) { 224 DistributedKv::Key key(item); 225 delKeys.emplace_back(key); 226 } 227 delKeyBatches.emplace_back(delKeys); 228 } 229 230 { 231 std::lock_guard<std::mutex> lock(kvAdapterMutex_); 232 if (kvStorePtr_ == nullptr) { 233 LOGE("kvStorePtr is nullptr!"); 234 return ERR_DM_POINT_NULL; 235 } 236 for (auto delKeys : delKeyBatches) { 237 DistributedKv::Status status = kvStorePtr_->DeleteBatch(delKeys); 238 if (status != DistributedKv::Status::SUCCESS) { 239 LOGE("DeleteBatch failed!"); 240 return ERR_DM_FAILED; 241 } 242 } 243 } 244 return DM_OK; 245} 246} // namespace DistributedHardware 247} // namespace OHOS 248