1 /*
2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "SyncManager"
16 #include "sync_manager.h"
17
18 #include <chrono>
19
20 #include "account/account_delegate.h"
21 #include "bootstrap.h"
22 #include "checker/checker_manager.h"
23 #include "cloud/cloud_lock_event.h"
24 #include "cloud/cloud_report.h"
25 #include "cloud/cloud_server.h"
26 #include "cloud/schema_meta.h"
27 #include "cloud/sync_event.h"
28 #include "cloud_value_util.h"
29 #include "device_manager_adapter.h"
30 #include "dfx/radar_reporter.h"
31 #include "eventcenter/event_center.h"
32 #include "log_print.h"
33 #include "metadata/meta_data_manager.h"
34 #include "sync_strategies/network_sync_strategy.h"
35 #include "user_delegate.h"
36 #include "utils/anonymous.h"
37 namespace OHOS::CloudData {
38 using namespace DistributedData;
39 using namespace DistributedDataDfx;
40 using namespace DistributedKv;
41 using namespace SharingUtil;
42 using namespace std::chrono;
43 using Account = OHOS::DistributedKv::AccountDelegate;
44 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
45 using Defer = EventCenter::Defer;
46 std::atomic<uint32_t> SyncManager::genId_ = 0;
SyncInfo( int32_t user, const std::string &bundleName, const Store &store, const Tables &tables, int32_t triggerMode)47 SyncManager::SyncInfo::SyncInfo(
48 int32_t user, const std::string &bundleName, const Store &store, const Tables &tables, int32_t triggerMode)
49 : user_(user), bundleName_(bundleName), triggerMode_(triggerMode)
50 {
51 if (!store.empty()) {
52 tables_[store] = tables;
53 }
54 syncId_ = SyncManager::GenerateId(user);
55 }
56
SyncInfo(int32_t user, const std::string &bundleName, const Stores &stores)57 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const Stores &stores)
58 : user_(user), bundleName_(bundleName)
59 {
60 for (auto &store : stores) {
61 tables_[store] = {};
62 }
63 syncId_ = SyncManager::GenerateId(user);
64 }
65
SyncInfo(int32_t user, const std::string &bundleName, const MutliStoreTables &tables)66 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const MutliStoreTables &tables)
67 : user_(user), bundleName_(bundleName), tables_(tables)
68 {
69 tables_ = tables;
70 syncId_ = SyncManager::GenerateId(user);
71 }
72
SyncInfo(const Param ¶m)73 SyncManager::SyncInfo::SyncInfo(const Param ¶m)
74 : user_(param.user), bundleName_(param.bundleName), triggerMode_(param.triggerMode)
75 {
76 if (!param.store.empty()) {
77 tables_[param.store] = param.tables;
78 }
79 syncId_ = SyncManager::GenerateId(param.user);
80 prepareTraceId_ = param.prepareTraceId;
81 }
82
SetMode(int32_t mode)83 void SyncManager::SyncInfo::SetMode(int32_t mode)
84 {
85 mode_ = mode;
86 }
87
SetWait(int32_t wait)88 void SyncManager::SyncInfo::SetWait(int32_t wait)
89 {
90 wait_ = wait;
91 }
92
SetAsyncDetail(GenAsync asyncDetail)93 void SyncManager::SyncInfo::SetAsyncDetail(GenAsync asyncDetail)
94 {
95 async_ = std::move(asyncDetail);
96 }
97
SetQuery(std::shared_ptr<GenQuery> query)98 void SyncManager::SyncInfo::SetQuery(std::shared_ptr<GenQuery> query)
99 {
100 query_ = query;
101 }
102
SetCompensation(bool isCompensation)103 void SyncManager::SyncInfo::SetCompensation(bool isCompensation)
104 {
105 isCompensation_ = isCompensation;
106 }
107
SetTriggerMode(int32_t triggerMode)108 void SyncManager::SyncInfo::SetTriggerMode(int32_t triggerMode)
109 {
110 triggerMode_ = triggerMode;
111 }
112
SetError(int32_t code) const113 void SyncManager::SyncInfo::SetError(int32_t code) const
114 {
115 if (async_) {
116 GenDetails details;
117 auto &detail = details[id_];
118 detail.progress = GenProgress::SYNC_FINISH;
119 detail.code = code;
120 async_(std::move(details));
121 }
122 }
123
GenerateQuery(const std::string &store, const Tables &tables)124 std::shared_ptr<GenQuery> SyncManager::SyncInfo::GenerateQuery(const std::string &store, const Tables &tables)
125 {
126 if (query_ != nullptr) {
127 return query_;
128 }
129 class SyncQuery final : public GenQuery {
130 public:
131 explicit SyncQuery(const std::vector<std::string> &tables) : tables_(tables) {}
132
133 bool IsEqual(uint64_t tid) override
134 {
135 return false;
136 }
137
138 std::vector<std::string> GetTables() override
139 {
140 return tables_;
141 }
142
143 private:
144 std::vector<std::string> tables_;
145 };
146 auto it = tables_.find(store);
147 return std::make_shared<SyncQuery>(it == tables_.end() || it->second.empty() ? tables : it->second);
148 }
149
Contains(const std::string &storeName)150 bool SyncManager::SyncInfo::Contains(const std::string &storeName)
151 {
152 return tables_.empty() || tables_.find(storeName) != tables_.end();
153 }
154
GetLockChangeHandler()155 std::function<void(const Event &)> SyncManager::GetLockChangeHandler()
156 {
157 return [](const Event &event) {
158 auto &evt = static_cast<const CloudLockEvent &>(event);
159 auto storeInfo = evt.GetStoreInfo();
160 auto callback = evt.GetCallback();
161 if (callback == nullptr) {
162 ZLOGE("callback is nullptr. bundleName: %{public}s, storeName: %{public}s, user: %{public}d.",
163 storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str(), storeInfo.user);
164 return;
165 }
166 CloudInfo cloud;
167 cloud.user = storeInfo.user;
168 SyncInfo info(storeInfo.user, storeInfo.bundleName);
169 auto code = IsValid(info, cloud);
170 if (code != E_OK) {
171 return;
172 }
173
174 StoreMetaData meta(storeInfo);
175 meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid;
176 if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
177 ZLOGE("not found meta. bundleName: %{public}s, storeName: %{public}s, user: %{public}d.",
178 storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str(), storeInfo.user);
179 return;
180 }
181 auto store = GetStore(meta, storeInfo.user);
182 if (store == nullptr) {
183 ZLOGE("failed to get store. bundleName: %{public}s, storeName: %{public}s, user: %{public}d.",
184 storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str(), storeInfo.user);
185 return;
186 }
187 if (evt.GetEventId() == CloudEvent::LOCK_CLOUD_CONTAINER) {
188 auto [result, expiredTime] = store->LockCloudDB();
189 callback(result, expiredTime);
190 } else {
191 auto result = store->UnLockCloudDB();
192 callback(result, 0);
193 }
194 };
195 }
196
SyncManager()197 SyncManager::SyncManager()
198 {
199 EventCenter::GetInstance().Subscribe(CloudEvent::LOCK_CLOUD_CONTAINER, GetLockChangeHandler());
200 EventCenter::GetInstance().Subscribe(CloudEvent::UNLOCK_CLOUD_CONTAINER, GetLockChangeHandler());
201 EventCenter::GetInstance().Subscribe(CloudEvent::LOCAL_CHANGE, GetClientChangeHandler());
202 syncStrategy_ = std::make_shared<NetworkSyncStrategy>();
203 auto metaName = Bootstrap::GetInstance().GetProcessLabel();
204 kvApps_.insert(std::move(metaName));
205 auto stores = CheckerManager::GetInstance().GetStaticStores();
206 for (auto &store : stores) {
207 kvApps_.insert(std::move(store.bundleName));
208 }
209 stores = CheckerManager::GetInstance().GetDynamicStores();
210 for (auto &store : stores) {
211 kvApps_.insert(std::move(store.bundleName));
212 }
213 }
214
~SyncManager()215 SyncManager::~SyncManager()
216 {
217 if (executor_ != nullptr) {
218 actives_.ForEachCopies([this](auto &syncId, auto &taskId) {
219 executor_->Remove(taskId);
220 return false;
221 });
222 executor_ = nullptr;
223 }
224 }
225
Bind(std::shared_ptr<ExecutorPool> executor)226 int32_t SyncManager::Bind(std::shared_ptr<ExecutorPool> executor)
227 {
228 executor_ = executor;
229 return E_OK;
230 }
231
DoCloudSync(SyncInfo syncInfo)232 int32_t SyncManager::DoCloudSync(SyncInfo syncInfo)
233 {
234 if (executor_ == nullptr) {
235 return E_NOT_INIT;
236 }
237 auto syncId = GenerateId(syncInfo.user_);
238 auto ref = GenSyncRef(syncId);
239 actives_.Compute(syncId, [this, &ref, &syncInfo](const uint64_t &key, TaskId &taskId) mutable {
240 taskId = executor_->Execute(GetSyncTask(0, true, ref, std::move(syncInfo)));
241 return true;
242 });
243 return E_OK;
244 }
245
StopCloudSync(int32_t user)246 int32_t SyncManager::StopCloudSync(int32_t user)
247 {
248 if (executor_ == nullptr) {
249 return E_NOT_INIT;
250 }
251 actives_.ForEachCopies([this, user](auto &syncId, auto &taskId) {
252 if (Compare(syncId, user) == 0) {
253 executor_->Remove(taskId);
254 }
255 return false;
256 });
257 return E_OK;
258 }
259
IsValid(SyncInfo &info, CloudInfo &cloud)260 GeneralError SyncManager::IsValid(SyncInfo &info, CloudInfo &cloud)
261 {
262 if (!MetaDataManager::GetInstance().LoadMeta(cloud.GetKey(), cloud, true) ||
263 (info.id_ != SyncInfo::DEFAULT_ID && cloud.id != info.id_)) {
264 info.SetError(E_CLOUD_DISABLED);
265 ZLOGE("cloudInfo invalid:%{public}d, <syncId:%{public}s, metaId:%{public}s>", cloud.IsValid(),
266 Anonymous::Change(info.id_).c_str(), Anonymous::Change(cloud.id).c_str());
267 return E_CLOUD_DISABLED;
268 }
269 if (!cloud.enableCloud || (!info.bundleName_.empty() && !cloud.IsOn(info.bundleName_))) {
270 info.SetError(E_CLOUD_DISABLED);
271 ZLOGD("enable:%{public}d, bundleName:%{public}s", cloud.enableCloud, info.bundleName_.c_str());
272 return E_CLOUD_DISABLED;
273 }
274 if (!DmAdapter::GetInstance().IsNetworkAvailable()) {
275 info.SetError(E_NETWORK_ERROR);
276 ZLOGD("network unavailable");
277 return E_NETWORK_ERROR;
278 }
279 if (!Account::GetInstance()->IsVerified(info.user_)) {
280 info.SetError(E_USER_UNLOCK);
281 ZLOGD("user unverified");
282 return E_ERROR;
283 }
284 return E_OK;
285 }
286
GetPostEventTask(const std::vector<SchemaMeta> &schemas, CloudInfo &cloud, SyncInfo &info, bool retry, const TraceIds &traceIds)287 std::function<void()> SyncManager::GetPostEventTask(const std::vector<SchemaMeta> &schemas, CloudInfo &cloud,
288 SyncInfo &info, bool retry, const TraceIds &traceIds)
289 {
290 return [this, &cloud, &info, &schemas, retry, &traceIds]() {
291 bool isPostEvent = false;
292 for (auto &schema : schemas) {
293 auto it = traceIds.find(schema.bundleName);
294 if (!cloud.IsOn(schema.bundleName)) {
295 UpdateFinishSyncInfo({ cloud.id, schema.bundleName, "" }, info.syncId_, E_ERROR);
296 Report({ cloud.user, schema.bundleName, it == traceIds.end() ? "" : it->second, SyncStage::END,
297 E_ERROR });
298 continue;
299 }
300 for (const auto &database : schema.databases) {
301 if (!info.Contains(database.name)) {
302 UpdateFinishSyncInfo({ cloud.id, schema.bundleName, "" }, info.syncId_, E_ERROR);
303 Report({ cloud.user, schema.bundleName, it == traceIds.end() ? "" : it->second, SyncStage::END,
304 E_ERROR });
305 continue;
306 }
307 StoreInfo storeInfo = { 0, schema.bundleName, database.name, cloud.apps[schema.bundleName].instanceId,
308 info.user_, "", info.syncId_ };
309 auto status = syncStrategy_->CheckSyncAction(storeInfo);
310 if (status != SUCCESS) {
311 ZLOGW("Verification strategy failed, status:%{public}d. %{public}d:%{public}s:%{public}s", status,
312 storeInfo.user, storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str());
313 UpdateFinishSyncInfo({ cloud.id, schema.bundleName, "" }, info.syncId_, status);
314 Report({ cloud.user, schema.bundleName, it == traceIds.end() ? "" : it->second, SyncStage::END,
315 status });
316 info.SetError(status);
317 continue;
318 }
319 auto query = info.GenerateQuery(database.name, database.GetTableNames());
320 SyncParam syncParam = { info.mode_, info.wait_, info.isCompensation_, info.triggerMode_,
321 it == traceIds.end() ? "" : it->second, cloud.user };
322 auto evt = std::make_unique<SyncEvent>(std::move(storeInfo),
323 SyncEvent::EventInfo{ syncParam, retry, std::move(query), info.async_ });
324 EventCenter::GetInstance().PostEvent(std::move(evt));
325 isPostEvent = true;
326 }
327 }
328 if (!isPostEvent) {
329 ZLOGE("schema is invalid, user: %{public}d", cloud.user);
330 info.SetError(E_ERROR);
331 }
332 };
333 }
334
335 ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount ref, SyncInfo &&syncInfo)
336 {
337 times++;
338 return [this, times, retry, keep = std::move(ref), info = std::move(syncInfo)]() mutable {
339 activeInfos_.Erase(info.syncId_);
340 bool createdByDefaultUser = InitDefaultUser(info.user_);
341 CloudInfo cloud;
342 cloud.user = info.user_;
343
344 auto cloudSyncInfos = GetCloudSyncInfo(info, cloud);
345 if (cloudSyncInfos.empty()) {
346 ZLOGD("get cloud info failed, user: %{public}d.", cloud.user);
347 info.SetError(E_CLOUD_DISABLED);
348 return;
349 }
350 auto traceIds = GetPrepareTraceId(info, cloud);
351 BatchReport(info.user_, traceIds, SyncStage::PREPARE, E_OK);
352 UpdateStartSyncInfo(cloudSyncInfos);
353 auto code = IsValid(info, cloud);
354 if (code != E_OK) {
355 BatchUpdateFinishState(cloudSyncInfos, code);
356 BatchReport(info.user_, traceIds, SyncStage::END, code);
357 return;
358 }
359
360 auto retryer = GetRetryer(times, info, cloud.user);
361 auto schemas = GetSchemaMeta(cloud, info.bundleName_);
362 if (schemas.empty()) {
363 UpdateSchema(info);
364 schemas = GetSchemaMeta(cloud, info.bundleName_);
365 if (schemas.empty()) {
366 auto it = traceIds.find(info.bundleName_);
367 retryer(RETRY_INTERVAL, E_RETRY_TIMEOUT, E_CLOUD_DISABLED, it == traceIds.end() ? "" : it->second);
368 BatchUpdateFinishState(cloudSyncInfos, E_CLOUD_DISABLED);
369 BatchReport(info.user_, traceIds, SyncStage::END, E_CLOUD_DISABLED);
370 return;
371 }
372 }
373 Defer defer(GetSyncHandler(std::move(retryer)), CloudEvent::CLOUD_SYNC);
374 if (createdByDefaultUser) {
375 info.user_ = 0;
376 }
377 auto task = GetPostEventTask(schemas, cloud, info, retry, traceIds);
378 task();
379 };
380 }
381
382 std::function<void(const Event &)> SyncManager::GetSyncHandler(Retryer retryer)
383 {
384 return [this, retryer](const Event &event) {
385 auto &evt = static_cast<const SyncEvent &>(event);
386 auto &storeInfo = evt.GetStoreInfo();
387 GenAsync async = evt.GetAsyncDetail();
388 auto prepareTraceId = evt.GetPrepareTraceId();
389 auto user = evt.GetUser();
390 GenDetails details;
391 auto &detail = details[SyncInfo::DEFAULT_ID];
392 detail.progress = GenProgress::SYNC_FINISH;
393 auto [result, meta] = GetMetaData(storeInfo);
394 if (!result) {
395 return DoExceptionalCallback(async, details, storeInfo, prepareTraceId);
396 }
397 auto store = GetStore(meta, storeInfo.user);
398 if (store == nullptr) {
399 ZLOGE("store null, storeId:%{public}s, prepareTraceId:%{public}s", meta.GetStoreAlias().c_str(),
400 prepareTraceId.c_str());
401 return DoExceptionalCallback(async, details, storeInfo, prepareTraceId);
402 }
403 ZLOGI("database:<%{public}d:%{public}s:%{public}s:%{public}s> sync start", storeInfo.user,
404 storeInfo.bundleName.c_str(), meta.GetStoreAlias().c_str(), prepareTraceId.c_str());
405 RadarReporter::Report(
406 { storeInfo.bundleName.c_str(), CLOUD_SYNC, TRIGGER_SYNC, storeInfo.syncId, evt.GetTriggerMode() },
407 "GetSyncHandler", BizState::BEGIN);
408 Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::START, E_OK });
409 SyncParam syncParam = { evt.GetMode(), evt.GetWait(), evt.IsCompensation(), MODE_DEFAULT, prepareTraceId };
410 auto status = store->Sync({ SyncInfo::DEFAULT_ID }, *(evt.GetQuery()),
411 evt.AutoRetry() ? RetryCallback(storeInfo, retryer, evt.GetTriggerMode(), prepareTraceId, user)
412 : GetCallback(evt.GetAsyncDetail(), storeInfo, evt.GetTriggerMode(), prepareTraceId, user),
413 syncParam);
414 if (status != E_OK) {
415 if (async) {
416 detail.code = status;
417 async(std::move(details));
418 }
419 UpdateFinishSyncInfo({ GetAccountId(storeInfo.user), storeInfo.bundleName, "" }, storeInfo.syncId, E_ERROR);
420 if (status == GeneralError::E_NOT_SUPPORT) {
421 return;
422 }
423 int32_t errCode = status + GenStore::DB_ERR_OFFSET;
424 RadarReporter::Report({ storeInfo.bundleName.c_str(), CLOUD_SYNC, FINISH_SYNC, storeInfo.syncId,
425 evt.GetTriggerMode(), false, errCode }, "GetSyncHandler", BizState::END);
426 Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::END, errCode });
427 }
428 };
429 }
430
431 std::function<void(const Event &)> SyncManager::GetClientChangeHandler()
432 {
433 return [this](const Event &event) {
434 auto &evt = static_cast<const SyncEvent &>(event);
435 auto store = evt.GetStoreInfo();
436 SyncInfo syncInfo(store.user, store.bundleName, store.storeName);
437 syncInfo.SetMode(evt.GetMode());
438 syncInfo.SetWait(evt.GetWait());
439 syncInfo.SetAsyncDetail(evt.GetAsyncDetail());
440 syncInfo.SetQuery(evt.GetQuery());
441 syncInfo.SetCompensation(evt.IsCompensation());
442 syncInfo.SetTriggerMode(evt.GetTriggerMode());
443 auto times = evt.AutoRetry() ? RETRY_TIMES - CLIENT_RETRY_TIMES : RETRY_TIMES;
444 executor_->Execute(GetSyncTask(times, evt.AutoRetry(), RefCount(), std::move(syncInfo)));
445 };
446 }
447
448 SyncManager::Retryer SyncManager::GetRetryer(int32_t times, const SyncInfo &syncInfo, int32_t user)
449 {
450 if (times >= RETRY_TIMES) {
451 return [this, user, info = SyncInfo(syncInfo)](Duration, int32_t code, int32_t dbCode,
452 const std::string &prepareTraceId) mutable {
453 if (code == E_OK || code == E_SYNC_TASK_MERGED) {
454 return true;
455 }
456 info.SetError(code);
457 RadarReporter::Report({ info.bundleName_.c_str(), CLOUD_SYNC, FINISH_SYNC, info.syncId_, info.triggerMode_,
458 false, dbCode },
459 "GetRetryer", BizState::END);
460 Report({ user, info.bundleName_, prepareTraceId, SyncStage::END,
461 dbCode == GenStore::DB_ERR_OFFSET ? 0 : dbCode });
462 return true;
463 };
464 }
465 return [this, times, user, info = SyncInfo(syncInfo)](Duration interval, int32_t code, int32_t dbCode,
466 const std::string &prepareTraceId) mutable {
467 if (code == E_OK || code == E_SYNC_TASK_MERGED) {
468 return true;
469 }
470 if (code == E_NO_SPACE_FOR_ASSET || code == E_RECODE_LIMIT_EXCEEDED) {
471 info.SetError(code);
472 RadarReporter::Report({ info.bundleName_.c_str(), CLOUD_SYNC, FINISH_SYNC, info.syncId_, info.triggerMode_,
473 false, dbCode },
474 "GetRetryer", BizState::END);
475 Report({ user, info.bundleName_, prepareTraceId, SyncStage::END,
476 dbCode == GenStore::DB_ERR_OFFSET ? 0 : dbCode });
477 return true;
478 }
479
480 activeInfos_.ComputeIfAbsent(info.syncId_, [this, times, interval, &info](uint64_t key) mutable {
481 auto syncId = GenerateId(info.user_);
482 auto ref = GenSyncRef(syncId);
483 actives_.Compute(syncId, [this, times, interval, &ref, &info](const uint64_t &key, TaskId &value) mutable {
484 value = executor_->Schedule(interval, GetSyncTask(times, true, ref, std::move(info)));
485 return true;
486 });
487 return syncId;
488 });
489 return true;
490 };
491 }
492
493 uint64_t SyncManager::GenerateId(int32_t user)
494 {
495 uint64_t syncId = static_cast<uint64_t>(user) & 0xFFFFFFFF;
496 return (syncId << MV_BIT) | (++genId_);
497 }
498
499 RefCount SyncManager::GenSyncRef(uint64_t syncId)
500 {
501 return RefCount([syncId, this]() {
502 actives_.Erase(syncId);
503 });
504 }
505
506 int32_t SyncManager::Compare(uint64_t syncId, int32_t user)
507 {
508 uint64_t inner = static_cast<uint64_t>(user) & 0xFFFFFFFF;
509 return (syncId & USER_MARK) == (inner << MV_BIT);
510 }
511
512 void SyncManager::UpdateSchema(const SyncManager::SyncInfo &syncInfo)
513 {
514 StoreInfo storeInfo;
515 storeInfo.user = syncInfo.user_;
516 storeInfo.bundleName = syncInfo.bundleName_;
517 EventCenter::GetInstance().PostEvent(std::make_unique<CloudEvent>(CloudEvent::GET_SCHEMA, storeInfo));
518 }
519
520 std::map<uint32_t, GeneralStore::BindInfo> SyncManager::GetBindInfos(const StoreMetaData &meta,
521 const std::vector<int32_t> &users, const Database &schemaDatabase)
522 {
523 auto instance = CloudServer::GetInstance();
524 if (instance == nullptr) {
525 ZLOGD("not support cloud sync");
526 return {};
527 }
528 std::map<uint32_t, GeneralStore::BindInfo> bindInfos;
529 for (auto &activeUser : users) {
530 if (activeUser == 0) {
531 continue;
532 }
533 auto cloudDB = instance->ConnectCloudDB(meta.bundleName, activeUser, schemaDatabase);
534 if (cloudDB == nullptr) {
535 ZLOGE("failed, no cloud DB <%{public}d:0x%{public}x %{public}s<->%{public}s>", meta.tokenId, activeUser,
536 Anonymous::Change(schemaDatabase.name).c_str(), Anonymous::Change(schemaDatabase.alias).c_str());
537 return {};
538 }
539 if (meta.storeType >= StoreMetaData::StoreType::STORE_KV_BEGIN &&
540 meta.storeType <= StoreMetaData::StoreType::STORE_KV_END) {
541 bindInfos.insert_or_assign(activeUser, GeneralStore::BindInfo{ std::move(cloudDB), nullptr });
542 continue;
543 }
544 auto assetLoader = instance->ConnectAssetLoader(meta.bundleName, activeUser, schemaDatabase);
545 if (assetLoader == nullptr) {
546 ZLOGE("failed, no cloud DB <%{public}d:0x%{public}x %{public}s<->%{public}s>", meta.tokenId, activeUser,
547 Anonymous::Change(schemaDatabase.name).c_str(), Anonymous::Change(schemaDatabase.alias).c_str());
548 return {};
549 }
550 bindInfos.insert_or_assign(activeUser, GeneralStore::BindInfo{ std::move(cloudDB), std::move(assetLoader) });
551 }
552 return bindInfos;
553 }
554
555 AutoCache::Store SyncManager::GetStore(const StoreMetaData &meta, int32_t user, bool mustBind)
556 {
557 if (user != 0 && !Account::GetInstance()->IsVerified(user)) {
558 ZLOGW("user:%{public}d is locked!", user);
559 return nullptr;
560 }
561 auto instance = CloudServer::GetInstance();
562 if (instance == nullptr) {
563 ZLOGD("not support cloud sync");
564 return nullptr;
565 }
566 auto store = AutoCache::GetInstance().GetStore(meta, {});
567 if (store == nullptr) {
568 ZLOGE("store null, storeId:%{public}s", meta.GetStoreAlias().c_str());
569 return nullptr;
570 }
571 if (!store->IsBound()) {
572 std::vector<int32_t> users{};
573 CloudInfo info;
574 if (user == 0) {
575 AccountDelegate::GetInstance()->QueryForegroundUsers(users);
576 } else {
577 users.push_back(user);
578 }
579 if (!users.empty()) {
580 info.user = users[0];
581 }
582 SchemaMeta schemaMeta;
583 std::string schemaKey = info.GetSchemaKey(meta.bundleName, meta.instanceId);
584 if (!MetaDataManager::GetInstance().LoadMeta(schemaKey, schemaMeta, true)) {
585 ZLOGE("failed, no schema bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
586 meta.GetStoreAlias().c_str());
587 return nullptr;
588 }
589 auto dbMeta = schemaMeta.GetDataBase(meta.storeId);
590 std::map<uint32_t, GeneralStore::BindInfo> bindInfos = GetBindInfos(meta, users, dbMeta);
591 if (mustBind && bindInfos.size() != users.size()) {
592 return nullptr;
593 }
594 GeneralStore::CloudConfig config;
595 if (MetaDataManager::GetInstance().LoadMeta(info.GetKey(), info, true)) {
596 config.maxNumber = info.maxNumber;
597 config.maxSize = info.maxSize;
598 }
599 store->Bind(dbMeta, bindInfos, config);
600 }
601 return store;
602 }
603
604 void SyncManager::Report(const ReportParam &reportParam)
605 {
606 auto cloudReport = CloudReport::GetInstance();
607 if (cloudReport == nullptr) {
608 return;
609 }
610 cloudReport->Report(reportParam);
611 }
612
613 SyncManager::TraceIds SyncManager::GetPrepareTraceId(const SyncInfo &info, const CloudInfo &cloud)
614 {
615 TraceIds traceIds;
616 if (!info.prepareTraceId_.empty()) {
617 traceIds.emplace(info.bundleName_, info.prepareTraceId_);
618 return traceIds;
619 }
620 auto cloudReport = CloudReport::GetInstance();
621 if (cloudReport == nullptr) {
622 return traceIds;
623 }
624 if (info.bundleName_.empty()) {
625 for (const auto &it : cloud.apps) {
626 traceIds.emplace(it.first, cloudReport->GetPrepareTraceId(info.user_));
627 }
628 } else {
629 traceIds.emplace(info.bundleName_, cloudReport->GetPrepareTraceId(info.user_));
630 }
631 return traceIds;
632 }
633
634 bool SyncManager::NeedGetCloudInfo(CloudInfo &cloud)
635 {
636 return (!MetaDataManager::GetInstance().LoadMeta(cloud.GetKey(), cloud, true) || !cloud.enableCloud) &&
637 DmAdapter::GetInstance().IsNetworkAvailable() && Account::GetInstance()->IsLoginAccount();
638 }
639
640 std::vector<std::tuple<QueryKey, uint64_t>> SyncManager::GetCloudSyncInfo(const SyncInfo &info, CloudInfo &cloud)
641 {
642 std::vector<std::tuple<QueryKey, uint64_t>> cloudSyncInfos;
643 if (NeedGetCloudInfo(cloud)) {
644 ZLOGI("get cloud info from server, user: %{public}d.", cloud.user);
645 auto instance = CloudServer::GetInstance();
646 if (instance == nullptr) {
647 return cloudSyncInfos;
648 }
649 cloud = instance->GetServerInfo(cloud.user, false);
650 if (!cloud.IsValid()) {
651 ZLOGE("cloud is empty, user: %{public}d", cloud.user);
652 return cloudSyncInfos;
653 }
654 if (!MetaDataManager::GetInstance().SaveMeta(cloud.GetKey(), cloud, true)) {
655 ZLOGW("save cloud info fail, user: %{public}d", cloud.user);
656 }
657 }
658 if (info.bundleName_.empty()) {
659 for (const auto &it : cloud.apps) {
660 QueryKey queryKey{ .accountId = cloud.id, .bundleName = it.first, .storeId = "" };
661 cloudSyncInfos.emplace_back(std::make_tuple(queryKey, info.syncId_));
662 }
663 } else {
664 QueryKey queryKey{ .accountId = cloud.id, .bundleName = info.bundleName_, .storeId = "" };
665 cloudSyncInfos.emplace_back(std::make_tuple(queryKey, info.syncId_));
666 }
667 return cloudSyncInfos;
668 }
669
670 void SyncManager::GetLastResults(
671 const std::string &storeId, std::map<SyncId, CloudSyncInfo> &infos, QueryLastResults &results)
672 {
673 for (auto &[key, info] : infos) {
674 if (info.code != -1) {
675 results.insert(std::pair<std::string, CloudSyncInfo>(storeId, info));
676 }
677 }
678 }
679
680 bool SyncManager::NeedSaveSyncInfo(const QueryKey &queryKey)
681 {
682 if (queryKey.accountId.empty()) {
683 return false;
684 }
685 if (std::find(kvApps_.begin(), kvApps_.end(), queryKey.bundleName) != kvApps_.end()) {
686 return false;
687 }
688 return true;
689 }
690
691 int32_t SyncManager::QueryLastSyncInfo(const std::vector<QueryKey> &queryKeys, QueryLastResults &results)
692 {
693 for (const auto &queryKey : queryKeys) {
694 std::string storeId = queryKey.storeId;
695 QueryKey key{ .accountId = queryKey.accountId, .bundleName = queryKey.bundleName, .storeId = "" };
696 lastSyncInfos_.ComputeIfPresent(
697 key, [&storeId, &results](auto &key, std::map<SyncId, CloudSyncInfo> &vals) {
698 GetLastResults(storeId, vals, results);
699 return !vals.empty();
700 });
701 }
702 return SUCCESS;
703 }
704
705 void SyncManager::UpdateStartSyncInfo(const std::vector<std::tuple<QueryKey, uint64_t>> &cloudSyncInfos)
706 {
707 int64_t startTime = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
708 for (const auto &[queryKey, syncId] : cloudSyncInfos) {
709 if (!NeedSaveSyncInfo(queryKey)) {
710 continue;
711 }
712 lastSyncInfos_.Compute(queryKey, [id = syncId, startTime](auto &, std::map<SyncId, CloudSyncInfo> &val) {
713 val[id] = { .startTime = startTime };
714 return !val.empty();
715 });
716 }
717 }
718
719 void SyncManager::UpdateFinishSyncInfo(const QueryKey &queryKey, uint64_t syncId, int32_t code)
720 {
721 if (!NeedSaveSyncInfo(queryKey)) {
722 return;
723 }
724 lastSyncInfos_.ComputeIfPresent(queryKey, [syncId, code](auto &key, std::map<SyncId, CloudSyncInfo> &val) {
725 auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
726 for (auto iter = val.begin(); iter != val.end();) {
727 bool isExpired = ((now - iter->second.startTime) >= EXPIRATION_TIME) && iter->second.code == -1;
728 if ((iter->first != syncId && ((iter->second.code != -1) || isExpired))) {
729 iter = val.erase(iter);
730 } else if (iter->first == syncId) {
731 iter->second.finishTime = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
732 iter->second.code = code;
733 iter++;
734 } else {
735 iter++;
736 }
737 }
738 return true;
739 });
740 }
741
742 std::function<void(const GenDetails &result)> SyncManager::GetCallback(const GenAsync &async,
743 const StoreInfo &storeInfo, int32_t triggerMode, const std::string &prepareTraceId, int32_t user)
744 {
745 return [this, async, storeInfo, triggerMode, prepareTraceId, user](const GenDetails &result) {
746 if (async != nullptr) {
747 async(result);
748 }
749
750 if (result.empty()) {
751 ZLOGE("result is empty");
752 return;
753 }
754
755 if (result.begin()->second.progress != GenProgress::SYNC_FINISH) {
756 return;
757 }
758
759 int32_t dbCode = (result.begin()->second.dbCode == GenStore::DB_ERR_OFFSET) ? 0 : result.begin()->second.dbCode;
760 RadarReporter::Report({ storeInfo.bundleName.c_str(), CLOUD_SYNC, FINISH_SYNC, storeInfo.syncId, triggerMode,
761 result.begin()->second.changeCount, dbCode },
762 "GetCallback", BizState::END);
763 Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::END, dbCode });
764
765 auto id = GetAccountId(storeInfo.user);
766 if (id.empty()) {
767 ZLOGD("account id is empty");
768 return;
769 }
770 QueryKey queryKey{
771 .accountId = id,
772 .bundleName = storeInfo.bundleName,
773 .storeId = ""
774 };
775
776 int32_t code = result.begin()->second.code;
777 UpdateFinishSyncInfo(queryKey, storeInfo.syncId, code);
778 };
779 }
780
781 std::string SyncManager::GetAccountId(int32_t user)
782 {
783 CloudInfo cloudInfo;
784 cloudInfo.user = user;
785 if (!MetaDataManager::GetInstance().LoadMeta(cloudInfo.GetKey(), cloudInfo, true)) {
786 ZLOGE("not exist meta, user:%{public}d.", cloudInfo.user);
787 return "";
788 }
789 return cloudInfo.id;
790 }
791
792 ExecutorPool::Duration SyncManager::GetInterval(int32_t code)
793 {
794 switch (code) {
795 case E_LOCKED_BY_OTHERS:
796 return LOCKED_INTERVAL;
797 case E_BUSY:
798 return BUSY_INTERVAL;
799 default:
800 return RETRY_INTERVAL;
801 }
802 }
803
804 std::vector<SchemaMeta> SyncManager::GetSchemaMeta(const CloudInfo &cloud, const std::string &bundleName)
805 {
806 std::vector<SchemaMeta> schemas;
807 auto key = cloud.GetSchemaPrefix(bundleName);
808 MetaDataManager::GetInstance().LoadMeta(key, schemas, true);
809 return schemas;
810 }
811
812 void SyncManager::DoExceptionalCallback(const GenAsync &async, GenDetails &details, const StoreInfo &storeInfo,
813 const std::string &prepareTraceId)
814 {
815 if (async) {
816 details[SyncInfo::DEFAULT_ID].code = E_ERROR;
817 async(details);
818 }
819 QueryKey queryKey{ GetAccountId(storeInfo.user), storeInfo.bundleName, "" };
820 UpdateFinishSyncInfo(queryKey, storeInfo.syncId, E_ERROR);
821 Report({ storeInfo.user, storeInfo.bundleName, prepareTraceId, SyncStage::END, E_ERROR });
822 }
823
824 bool SyncManager::InitDefaultUser(int32_t &user)
825 {
826 if (user != 0) {
827 return false;
828 }
829 std::vector<int32_t> users;
830 AccountDelegate::GetInstance()->QueryUsers(users);
831 if (!users.empty()) {
832 user = users[0];
833 }
834 return true;
835 }
836
837 std::function<void(const DistributedData::GenDetails &result)> SyncManager::RetryCallback(const StoreInfo &storeInfo,
838 Retryer retryer, int32_t triggerMode, const std::string &prepareTraceId, int32_t user)
839 {
840 return [this, retryer, storeInfo, triggerMode, prepareTraceId, user](const GenDetails &details) {
841 if (details.empty()) {
842 ZLOGE("retry, details empty");
843 return;
844 }
845 int32_t code = details.begin()->second.code;
846 int32_t dbCode = details.begin()->second.dbCode;
847 if (details.begin()->second.progress == GenProgress::SYNC_FINISH) {
848 QueryKey queryKey{ GetAccountId(storeInfo.user), storeInfo.bundleName, "" };
849 UpdateFinishSyncInfo(queryKey, storeInfo.syncId, code);
850 if (code == E_OK) {
851 RadarReporter::Report({ storeInfo.bundleName.c_str(), CLOUD_SYNC, FINISH_SYNC, storeInfo.syncId,
852 triggerMode, details.begin()->second.changeCount },
853 "RetryCallback", BizState::END);
854 Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::END,
855 dbCode == GenStore::DB_ERR_OFFSET ? 0 : dbCode });
856 }
857 }
858 retryer(GetInterval(code), code, dbCode, prepareTraceId);
859 };
860 }
861
862 void SyncManager::BatchUpdateFinishState(const std::vector<std::tuple<QueryKey, uint64_t>> &cloudSyncInfos,
863 int32_t code)
864 {
865 for (const auto &[queryKey, syncId] : cloudSyncInfos) {
866 UpdateFinishSyncInfo(queryKey, syncId, code);
867 }
868 }
869
870 void SyncManager::BatchReport(int32_t userId, const TraceIds &traceIds, SyncStage syncStage, int32_t errCode)
871 {
872 for (const auto &[bundle, id] : traceIds) {
873 Report({ userId, bundle, id, syncStage, errCode });
874 }
875 }
876
877 std::pair<bool, StoreMetaData> SyncManager::GetMetaData(const StoreInfo &storeInfo)
878 {
879 StoreMetaData meta(storeInfo);
880 meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid;
881 if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
882 meta.user = "0"; // check if it is a public store.
883 StoreMetaDataLocal localMetaData;
884 if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKeyLocal(), localMetaData, true) ||
885 !localMetaData.isPublic || !MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
886 ZLOGE("failed, no store meta. bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
887 meta.GetStoreAlias().c_str());
888 return { false, meta };
889 }
890 }
891 return { true, meta };
892 }
893 } // namespace OHOS::CloudData