1 /*
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "SyncManager"
16 #include "sync_manager.h"
17 
18 #include <chrono>
19 
20 #include "account/account_delegate.h"
21 #include "bootstrap.h"
22 #include "checker/checker_manager.h"
23 #include "cloud/cloud_lock_event.h"
24 #include "cloud/cloud_report.h"
25 #include "cloud/cloud_server.h"
26 #include "cloud/schema_meta.h"
27 #include "cloud/sync_event.h"
28 #include "cloud_value_util.h"
29 #include "device_manager_adapter.h"
30 #include "dfx/radar_reporter.h"
31 #include "eventcenter/event_center.h"
32 #include "log_print.h"
33 #include "metadata/meta_data_manager.h"
34 #include "sync_strategies/network_sync_strategy.h"
35 #include "user_delegate.h"
36 #include "utils/anonymous.h"
37 namespace OHOS::CloudData {
38 using namespace DistributedData;
39 using namespace DistributedDataDfx;
40 using namespace DistributedKv;
41 using namespace SharingUtil;
42 using namespace std::chrono;
43 using Account = OHOS::DistributedKv::AccountDelegate;
44 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
45 using Defer = EventCenter::Defer;
46 std::atomic<uint32_t> SyncManager::genId_ = 0;
SyncInfo( int32_t user, const std::string &bundleName, const Store &store, const Tables &tables, int32_t triggerMode)47 SyncManager::SyncInfo::SyncInfo(
48     int32_t user, const std::string &bundleName, const Store &store, const Tables &tables, int32_t triggerMode)
49     : user_(user), bundleName_(bundleName), triggerMode_(triggerMode)
50 {
51     if (!store.empty()) {
52         tables_[store] = tables;
53     }
54     syncId_ = SyncManager::GenerateId(user);
55 }
56 
SyncInfo(int32_t user, const std::string &bundleName, const Stores &stores)57 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const Stores &stores)
58     : user_(user), bundleName_(bundleName)
59 {
60     for (auto &store : stores) {
61         tables_[store] = {};
62     }
63     syncId_ = SyncManager::GenerateId(user);
64 }
65 
SyncInfo(int32_t user, const std::string &bundleName, const MutliStoreTables &tables)66 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const MutliStoreTables &tables)
67     : user_(user), bundleName_(bundleName), tables_(tables)
68 {
69     tables_ = tables;
70     syncId_ = SyncManager::GenerateId(user);
71 }
72 
SyncInfo(const Param &param)73 SyncManager::SyncInfo::SyncInfo(const Param &param)
74     : user_(param.user), bundleName_(param.bundleName), triggerMode_(param.triggerMode)
75 {
76     if (!param.store.empty()) {
77         tables_[param.store] = param.tables;
78     }
79     syncId_ = SyncManager::GenerateId(param.user);
80     prepareTraceId_ = param.prepareTraceId;
81 }
82 
SetMode(int32_t mode)83 void SyncManager::SyncInfo::SetMode(int32_t mode)
84 {
85     mode_ = mode;
86 }
87 
SetWait(int32_t wait)88 void SyncManager::SyncInfo::SetWait(int32_t wait)
89 {
90     wait_ = wait;
91 }
92 
SetAsyncDetail(GenAsync asyncDetail)93 void SyncManager::SyncInfo::SetAsyncDetail(GenAsync asyncDetail)
94 {
95     async_ = std::move(asyncDetail);
96 }
97 
SetQuery(std::shared_ptr<GenQuery> query)98 void SyncManager::SyncInfo::SetQuery(std::shared_ptr<GenQuery> query)
99 {
100     query_ = query;
101 }
102 
SetCompensation(bool isCompensation)103 void SyncManager::SyncInfo::SetCompensation(bool isCompensation)
104 {
105     isCompensation_ = isCompensation;
106 }
107 
SetTriggerMode(int32_t triggerMode)108 void SyncManager::SyncInfo::SetTriggerMode(int32_t triggerMode)
109 {
110     triggerMode_ = triggerMode;
111 }
112 
SetError(int32_t code) const113 void SyncManager::SyncInfo::SetError(int32_t code) const
114 {
115     if (async_) {
116         GenDetails details;
117         auto &detail = details[id_];
118         detail.progress = GenProgress::SYNC_FINISH;
119         detail.code = code;
120         async_(std::move(details));
121     }
122 }
123 
GenerateQuery(const std::string &store, const Tables &tables)124 std::shared_ptr<GenQuery> SyncManager::SyncInfo::GenerateQuery(const std::string &store, const Tables &tables)
125 {
126     if (query_ != nullptr) {
127         return query_;
128     }
129     class SyncQuery final : public GenQuery {
130     public:
131         explicit SyncQuery(const std::vector<std::string> &tables) : tables_(tables) {}
132 
133         bool IsEqual(uint64_t tid) override
134         {
135             return false;
136         }
137 
138         std::vector<std::string> GetTables() override
139         {
140             return tables_;
141         }
142 
143     private:
144         std::vector<std::string> tables_;
145     };
146     auto it = tables_.find(store);
147     return std::make_shared<SyncQuery>(it == tables_.end() || it->second.empty() ? tables : it->second);
148 }
149 
Contains(const std::string &storeName)150 bool SyncManager::SyncInfo::Contains(const std::string &storeName)
151 {
152     return tables_.empty() || tables_.find(storeName) != tables_.end();
153 }
154 
GetLockChangeHandler()155 std::function<void(const Event &)> SyncManager::GetLockChangeHandler()
156 {
157     return [](const Event &event) {
158         auto &evt = static_cast<const CloudLockEvent &>(event);
159         auto storeInfo = evt.GetStoreInfo();
160         auto callback = evt.GetCallback();
161         if (callback == nullptr) {
162             ZLOGE("callback is nullptr. bundleName: %{public}s, storeName: %{public}s, user: %{public}d.",
163                 storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str(), storeInfo.user);
164             return;
165         }
166         CloudInfo cloud;
167         cloud.user = storeInfo.user;
168         SyncInfo info(storeInfo.user, storeInfo.bundleName);
169         auto code = IsValid(info, cloud);
170         if (code != E_OK) {
171             return;
172         }
173 
174         StoreMetaData meta(storeInfo);
175         meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid;
176         if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
177             ZLOGE("not found meta. bundleName: %{public}s, storeName: %{public}s, user: %{public}d.",
178                 storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str(), storeInfo.user);
179             return;
180         }
181         auto store = GetStore(meta, storeInfo.user);
182         if (store == nullptr) {
183             ZLOGE("failed to get store. bundleName: %{public}s, storeName: %{public}s, user: %{public}d.",
184                 storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str(), storeInfo.user);
185             return;
186         }
187         if (evt.GetEventId() == CloudEvent::LOCK_CLOUD_CONTAINER) {
188             auto [result, expiredTime] = store->LockCloudDB();
189             callback(result, expiredTime);
190         } else {
191             auto result = store->UnLockCloudDB();
192             callback(result, 0);
193         }
194     };
195 }
196 
SyncManager()197 SyncManager::SyncManager()
198 {
199     EventCenter::GetInstance().Subscribe(CloudEvent::LOCK_CLOUD_CONTAINER, GetLockChangeHandler());
200     EventCenter::GetInstance().Subscribe(CloudEvent::UNLOCK_CLOUD_CONTAINER, GetLockChangeHandler());
201     EventCenter::GetInstance().Subscribe(CloudEvent::LOCAL_CHANGE, GetClientChangeHandler());
202     syncStrategy_ = std::make_shared<NetworkSyncStrategy>();
203     auto metaName = Bootstrap::GetInstance().GetProcessLabel();
204     kvApps_.insert(std::move(metaName));
205     auto stores = CheckerManager::GetInstance().GetStaticStores();
206     for (auto &store : stores) {
207         kvApps_.insert(std::move(store.bundleName));
208     }
209     stores = CheckerManager::GetInstance().GetDynamicStores();
210     for (auto &store : stores) {
211         kvApps_.insert(std::move(store.bundleName));
212     }
213 }
214 
~SyncManager()215 SyncManager::~SyncManager()
216 {
217     if (executor_ != nullptr) {
218         actives_.ForEachCopies([this](auto &syncId, auto &taskId) {
219             executor_->Remove(taskId);
220             return false;
221         });
222         executor_ = nullptr;
223     }
224 }
225 
Bind(std::shared_ptr<ExecutorPool> executor)226 int32_t SyncManager::Bind(std::shared_ptr<ExecutorPool> executor)
227 {
228     executor_ = executor;
229     return E_OK;
230 }
231 
DoCloudSync(SyncInfo syncInfo)232 int32_t SyncManager::DoCloudSync(SyncInfo syncInfo)
233 {
234     if (executor_ == nullptr) {
235         return E_NOT_INIT;
236     }
237     auto syncId = GenerateId(syncInfo.user_);
238     auto ref = GenSyncRef(syncId);
239     actives_.Compute(syncId, [this, &ref, &syncInfo](const uint64_t &key, TaskId &taskId) mutable {
240         taskId = executor_->Execute(GetSyncTask(0, true, ref, std::move(syncInfo)));
241         return true;
242     });
243     return E_OK;
244 }
245 
StopCloudSync(int32_t user)246 int32_t SyncManager::StopCloudSync(int32_t user)
247 {
248     if (executor_ == nullptr) {
249         return E_NOT_INIT;
250     }
251     actives_.ForEachCopies([this, user](auto &syncId, auto &taskId) {
252         if (Compare(syncId, user) == 0) {
253             executor_->Remove(taskId);
254         }
255         return false;
256     });
257     return E_OK;
258 }
259 
IsValid(SyncInfo &info, CloudInfo &cloud)260 GeneralError SyncManager::IsValid(SyncInfo &info, CloudInfo &cloud)
261 {
262     if (!MetaDataManager::GetInstance().LoadMeta(cloud.GetKey(), cloud, true) ||
263         (info.id_ != SyncInfo::DEFAULT_ID && cloud.id != info.id_)) {
264         info.SetError(E_CLOUD_DISABLED);
265         ZLOGE("cloudInfo invalid:%{public}d, <syncId:%{public}s, metaId:%{public}s>", cloud.IsValid(),
266             Anonymous::Change(info.id_).c_str(), Anonymous::Change(cloud.id).c_str());
267         return E_CLOUD_DISABLED;
268     }
269     if (!cloud.enableCloud || (!info.bundleName_.empty() && !cloud.IsOn(info.bundleName_))) {
270         info.SetError(E_CLOUD_DISABLED);
271         ZLOGD("enable:%{public}d, bundleName:%{public}s", cloud.enableCloud, info.bundleName_.c_str());
272         return E_CLOUD_DISABLED;
273     }
274     if (!DmAdapter::GetInstance().IsNetworkAvailable()) {
275         info.SetError(E_NETWORK_ERROR);
276         ZLOGD("network unavailable");
277         return E_NETWORK_ERROR;
278     }
279     if (!Account::GetInstance()->IsVerified(info.user_)) {
280         info.SetError(E_USER_UNLOCK);
281         ZLOGD("user unverified");
282         return E_ERROR;
283     }
284     return E_OK;
285 }
286 
GetPostEventTask(const std::vector<SchemaMeta> &schemas, CloudInfo &cloud, SyncInfo &info, bool retry, const TraceIds &traceIds)287 std::function<void()> SyncManager::GetPostEventTask(const std::vector<SchemaMeta> &schemas, CloudInfo &cloud,
288     SyncInfo &info, bool retry, const TraceIds &traceIds)
289 {
290     return [this, &cloud, &info, &schemas, retry, &traceIds]() {
291         bool isPostEvent = false;
292         for (auto &schema : schemas) {
293             auto it = traceIds.find(schema.bundleName);
294             if (!cloud.IsOn(schema.bundleName)) {
295                 UpdateFinishSyncInfo({ cloud.id, schema.bundleName, "" }, info.syncId_, E_ERROR);
296                 Report({ cloud.user, schema.bundleName, it == traceIds.end() ? "" : it->second, SyncStage::END,
297                          E_ERROR });
298                 continue;
299             }
300             for (const auto &database : schema.databases) {
301                 if (!info.Contains(database.name)) {
302                     UpdateFinishSyncInfo({ cloud.id, schema.bundleName, "" }, info.syncId_, E_ERROR);
303                     Report({ cloud.user, schema.bundleName, it == traceIds.end() ? "" : it->second, SyncStage::END,
304                              E_ERROR });
305                     continue;
306                 }
307                 StoreInfo storeInfo = { 0, schema.bundleName, database.name, cloud.apps[schema.bundleName].instanceId,
308                     info.user_, "", info.syncId_ };
309                 auto status = syncStrategy_->CheckSyncAction(storeInfo);
310                 if (status != SUCCESS) {
311                     ZLOGW("Verification strategy failed, status:%{public}d. %{public}d:%{public}s:%{public}s", status,
312                         storeInfo.user, storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str());
313                     UpdateFinishSyncInfo({ cloud.id, schema.bundleName, "" }, info.syncId_, status);
314                     Report({ cloud.user, schema.bundleName, it == traceIds.end() ? "" : it->second, SyncStage::END,
315                         status });
316                     info.SetError(status);
317                     continue;
318                 }
319                 auto query = info.GenerateQuery(database.name, database.GetTableNames());
320                 SyncParam syncParam = { info.mode_, info.wait_, info.isCompensation_, info.triggerMode_,
321                     it == traceIds.end() ? "" : it->second, cloud.user };
322                 auto evt = std::make_unique<SyncEvent>(std::move(storeInfo),
323                     SyncEvent::EventInfo{ syncParam, retry, std::move(query), info.async_ });
324                 EventCenter::GetInstance().PostEvent(std::move(evt));
325                 isPostEvent = true;
326             }
327         }
328         if (!isPostEvent) {
329             ZLOGE("schema is invalid, user: %{public}d", cloud.user);
330             info.SetError(E_ERROR);
331         }
332     };
333 }
334 
335 ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount ref, SyncInfo &&syncInfo)
336 {
337     times++;
338     return [this, times, retry, keep = std::move(ref), info = std::move(syncInfo)]() mutable {
339         activeInfos_.Erase(info.syncId_);
340         bool createdByDefaultUser = InitDefaultUser(info.user_);
341         CloudInfo cloud;
342         cloud.user = info.user_;
343 
344         auto cloudSyncInfos = GetCloudSyncInfo(info, cloud);
345         if (cloudSyncInfos.empty()) {
346             ZLOGD("get cloud info failed, user: %{public}d.", cloud.user);
347             info.SetError(E_CLOUD_DISABLED);
348             return;
349         }
350         auto traceIds = GetPrepareTraceId(info, cloud);
351         BatchReport(info.user_, traceIds, SyncStage::PREPARE, E_OK);
352         UpdateStartSyncInfo(cloudSyncInfos);
353         auto code = IsValid(info, cloud);
354         if (code != E_OK) {
355             BatchUpdateFinishState(cloudSyncInfos, code);
356             BatchReport(info.user_, traceIds, SyncStage::END, code);
357             return;
358         }
359 
360         auto retryer = GetRetryer(times, info, cloud.user);
361         auto schemas = GetSchemaMeta(cloud, info.bundleName_);
362         if (schemas.empty()) {
363             UpdateSchema(info);
364             schemas = GetSchemaMeta(cloud, info.bundleName_);
365             if (schemas.empty()) {
366                 auto it = traceIds.find(info.bundleName_);
367                 retryer(RETRY_INTERVAL, E_RETRY_TIMEOUT, E_CLOUD_DISABLED, it == traceIds.end() ? "" : it->second);
368                 BatchUpdateFinishState(cloudSyncInfos, E_CLOUD_DISABLED);
369                 BatchReport(info.user_, traceIds, SyncStage::END, E_CLOUD_DISABLED);
370                 return;
371             }
372         }
373         Defer defer(GetSyncHandler(std::move(retryer)), CloudEvent::CLOUD_SYNC);
374         if (createdByDefaultUser) {
375             info.user_ = 0;
376         }
377         auto task = GetPostEventTask(schemas, cloud, info, retry, traceIds);
378         task();
379     };
380 }
381 
382 std::function<void(const Event &)> SyncManager::GetSyncHandler(Retryer retryer)
383 {
384     return [this, retryer](const Event &event) {
385         auto &evt = static_cast<const SyncEvent &>(event);
386         auto &storeInfo = evt.GetStoreInfo();
387         GenAsync async = evt.GetAsyncDetail();
388         auto prepareTraceId = evt.GetPrepareTraceId();
389         auto user = evt.GetUser();
390         GenDetails details;
391         auto &detail = details[SyncInfo::DEFAULT_ID];
392         detail.progress = GenProgress::SYNC_FINISH;
393         auto [result, meta] = GetMetaData(storeInfo);
394         if (!result) {
395             return DoExceptionalCallback(async, details, storeInfo, prepareTraceId);
396         }
397         auto store = GetStore(meta, storeInfo.user);
398         if (store == nullptr) {
399             ZLOGE("store null, storeId:%{public}s, prepareTraceId:%{public}s", meta.GetStoreAlias().c_str(),
400                 prepareTraceId.c_str());
401             return DoExceptionalCallback(async, details, storeInfo, prepareTraceId);
402         }
403         ZLOGI("database:<%{public}d:%{public}s:%{public}s:%{public}s> sync start", storeInfo.user,
404             storeInfo.bundleName.c_str(), meta.GetStoreAlias().c_str(), prepareTraceId.c_str());
405         RadarReporter::Report(
406             { storeInfo.bundleName.c_str(), CLOUD_SYNC, TRIGGER_SYNC, storeInfo.syncId, evt.GetTriggerMode() },
407             "GetSyncHandler", BizState::BEGIN);
408         Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::START, E_OK });
409         SyncParam syncParam = { evt.GetMode(), evt.GetWait(), evt.IsCompensation(), MODE_DEFAULT, prepareTraceId };
410         auto status = store->Sync({ SyncInfo::DEFAULT_ID }, *(evt.GetQuery()),
411             evt.AutoRetry() ? RetryCallback(storeInfo, retryer, evt.GetTriggerMode(), prepareTraceId, user)
412                             : GetCallback(evt.GetAsyncDetail(), storeInfo, evt.GetTriggerMode(), prepareTraceId, user),
413             syncParam);
414         if (status != E_OK) {
415             if (async) {
416                 detail.code = status;
417                 async(std::move(details));
418             }
419             UpdateFinishSyncInfo({ GetAccountId(storeInfo.user), storeInfo.bundleName, "" }, storeInfo.syncId, E_ERROR);
420             if (status == GeneralError::E_NOT_SUPPORT) {
421                 return;
422             }
423             int32_t errCode = status + GenStore::DB_ERR_OFFSET;
424             RadarReporter::Report({ storeInfo.bundleName.c_str(), CLOUD_SYNC, FINISH_SYNC, storeInfo.syncId,
425                                   evt.GetTriggerMode(), false, errCode }, "GetSyncHandler", BizState::END);
426             Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::END, errCode });
427         }
428     };
429 }
430 
431 std::function<void(const Event &)> SyncManager::GetClientChangeHandler()
432 {
433     return [this](const Event &event) {
434         auto &evt = static_cast<const SyncEvent &>(event);
435         auto store = evt.GetStoreInfo();
436         SyncInfo syncInfo(store.user, store.bundleName, store.storeName);
437         syncInfo.SetMode(evt.GetMode());
438         syncInfo.SetWait(evt.GetWait());
439         syncInfo.SetAsyncDetail(evt.GetAsyncDetail());
440         syncInfo.SetQuery(evt.GetQuery());
441         syncInfo.SetCompensation(evt.IsCompensation());
442         syncInfo.SetTriggerMode(evt.GetTriggerMode());
443         auto times = evt.AutoRetry() ? RETRY_TIMES - CLIENT_RETRY_TIMES : RETRY_TIMES;
444         executor_->Execute(GetSyncTask(times, evt.AutoRetry(), RefCount(), std::move(syncInfo)));
445     };
446 }
447 
448 SyncManager::Retryer SyncManager::GetRetryer(int32_t times, const SyncInfo &syncInfo, int32_t user)
449 {
450     if (times >= RETRY_TIMES) {
451         return [this, user, info = SyncInfo(syncInfo)](Duration, int32_t code, int32_t dbCode,
452                    const std::string &prepareTraceId) mutable {
453             if (code == E_OK || code == E_SYNC_TASK_MERGED) {
454                 return true;
455             }
456             info.SetError(code);
457             RadarReporter::Report({ info.bundleName_.c_str(), CLOUD_SYNC, FINISH_SYNC, info.syncId_, info.triggerMode_,
458                                       false, dbCode },
459                 "GetRetryer", BizState::END);
460             Report({ user, info.bundleName_, prepareTraceId, SyncStage::END,
461                 dbCode == GenStore::DB_ERR_OFFSET ? 0 : dbCode });
462             return true;
463         };
464     }
465     return [this, times, user, info = SyncInfo(syncInfo)](Duration interval, int32_t code, int32_t dbCode,
466                const std::string &prepareTraceId) mutable {
467         if (code == E_OK || code == E_SYNC_TASK_MERGED) {
468             return true;
469         }
470         if (code == E_NO_SPACE_FOR_ASSET || code == E_RECODE_LIMIT_EXCEEDED) {
471             info.SetError(code);
472             RadarReporter::Report({ info.bundleName_.c_str(), CLOUD_SYNC, FINISH_SYNC, info.syncId_, info.triggerMode_,
473                                       false, dbCode },
474                 "GetRetryer", BizState::END);
475             Report({ user, info.bundleName_, prepareTraceId, SyncStage::END,
476                 dbCode == GenStore::DB_ERR_OFFSET ? 0 : dbCode });
477             return true;
478         }
479 
480         activeInfos_.ComputeIfAbsent(info.syncId_, [this, times, interval, &info](uint64_t key) mutable {
481             auto syncId = GenerateId(info.user_);
482             auto ref = GenSyncRef(syncId);
483             actives_.Compute(syncId, [this, times, interval, &ref, &info](const uint64_t &key, TaskId &value) mutable {
484                 value = executor_->Schedule(interval, GetSyncTask(times, true, ref, std::move(info)));
485                 return true;
486             });
487             return syncId;
488         });
489         return true;
490     };
491 }
492 
493 uint64_t SyncManager::GenerateId(int32_t user)
494 {
495     uint64_t syncId = static_cast<uint64_t>(user) & 0xFFFFFFFF;
496     return (syncId << MV_BIT) | (++genId_);
497 }
498 
499 RefCount SyncManager::GenSyncRef(uint64_t syncId)
500 {
501     return RefCount([syncId, this]() {
502         actives_.Erase(syncId);
503     });
504 }
505 
506 int32_t SyncManager::Compare(uint64_t syncId, int32_t user)
507 {
508     uint64_t inner = static_cast<uint64_t>(user) & 0xFFFFFFFF;
509     return (syncId & USER_MARK) == (inner << MV_BIT);
510 }
511 
512 void SyncManager::UpdateSchema(const SyncManager::SyncInfo &syncInfo)
513 {
514     StoreInfo storeInfo;
515     storeInfo.user = syncInfo.user_;
516     storeInfo.bundleName = syncInfo.bundleName_;
517     EventCenter::GetInstance().PostEvent(std::make_unique<CloudEvent>(CloudEvent::GET_SCHEMA, storeInfo));
518 }
519 
520 std::map<uint32_t, GeneralStore::BindInfo> SyncManager::GetBindInfos(const StoreMetaData &meta,
521     const std::vector<int32_t> &users, const Database &schemaDatabase)
522 {
523     auto instance = CloudServer::GetInstance();
524     if (instance == nullptr) {
525         ZLOGD("not support cloud sync");
526         return {};
527     }
528     std::map<uint32_t, GeneralStore::BindInfo> bindInfos;
529     for (auto &activeUser : users) {
530         if (activeUser == 0) {
531             continue;
532         }
533         auto cloudDB = instance->ConnectCloudDB(meta.bundleName, activeUser, schemaDatabase);
534         if (cloudDB == nullptr) {
535             ZLOGE("failed, no cloud DB <%{public}d:0x%{public}x %{public}s<->%{public}s>", meta.tokenId, activeUser,
536                 Anonymous::Change(schemaDatabase.name).c_str(), Anonymous::Change(schemaDatabase.alias).c_str());
537             return {};
538         }
539         if (meta.storeType >= StoreMetaData::StoreType::STORE_KV_BEGIN &&
540             meta.storeType <= StoreMetaData::StoreType::STORE_KV_END) {
541             bindInfos.insert_or_assign(activeUser, GeneralStore::BindInfo{ std::move(cloudDB), nullptr });
542             continue;
543         }
544         auto assetLoader = instance->ConnectAssetLoader(meta.bundleName, activeUser, schemaDatabase);
545         if (assetLoader == nullptr) {
546             ZLOGE("failed, no cloud DB <%{public}d:0x%{public}x %{public}s<->%{public}s>", meta.tokenId, activeUser,
547                 Anonymous::Change(schemaDatabase.name).c_str(), Anonymous::Change(schemaDatabase.alias).c_str());
548             return {};
549         }
550         bindInfos.insert_or_assign(activeUser, GeneralStore::BindInfo{ std::move(cloudDB), std::move(assetLoader) });
551     }
552     return bindInfos;
553 }
554 
555 AutoCache::Store SyncManager::GetStore(const StoreMetaData &meta, int32_t user, bool mustBind)
556 {
557     if (user != 0 && !Account::GetInstance()->IsVerified(user)) {
558         ZLOGW("user:%{public}d is locked!", user);
559         return nullptr;
560     }
561     auto instance = CloudServer::GetInstance();
562     if (instance == nullptr) {
563         ZLOGD("not support cloud sync");
564         return nullptr;
565     }
566     auto store = AutoCache::GetInstance().GetStore(meta, {});
567     if (store == nullptr) {
568         ZLOGE("store null, storeId:%{public}s", meta.GetStoreAlias().c_str());
569         return nullptr;
570     }
571     if (!store->IsBound()) {
572         std::vector<int32_t> users{};
573         CloudInfo info;
574         if (user == 0) {
575             AccountDelegate::GetInstance()->QueryForegroundUsers(users);
576         } else {
577             users.push_back(user);
578         }
579         if (!users.empty()) {
580             info.user = users[0];
581         }
582         SchemaMeta schemaMeta;
583         std::string schemaKey = info.GetSchemaKey(meta.bundleName, meta.instanceId);
584         if (!MetaDataManager::GetInstance().LoadMeta(schemaKey, schemaMeta, true)) {
585             ZLOGE("failed, no schema bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
586                 meta.GetStoreAlias().c_str());
587             return nullptr;
588         }
589         auto dbMeta = schemaMeta.GetDataBase(meta.storeId);
590         std::map<uint32_t, GeneralStore::BindInfo> bindInfos = GetBindInfos(meta, users, dbMeta);
591         if (mustBind && bindInfos.size() != users.size()) {
592             return nullptr;
593         }
594         GeneralStore::CloudConfig config;
595         if (MetaDataManager::GetInstance().LoadMeta(info.GetKey(), info, true)) {
596             config.maxNumber = info.maxNumber;
597             config.maxSize = info.maxSize;
598         }
599         store->Bind(dbMeta, bindInfos, config);
600     }
601     return store;
602 }
603 
604 void SyncManager::Report(const ReportParam &reportParam)
605 {
606     auto cloudReport = CloudReport::GetInstance();
607     if (cloudReport == nullptr) {
608         return;
609     }
610     cloudReport->Report(reportParam);
611 }
612 
613 SyncManager::TraceIds SyncManager::GetPrepareTraceId(const SyncInfo &info, const CloudInfo &cloud)
614 {
615     TraceIds traceIds;
616     if (!info.prepareTraceId_.empty()) {
617         traceIds.emplace(info.bundleName_, info.prepareTraceId_);
618         return traceIds;
619     }
620     auto cloudReport = CloudReport::GetInstance();
621     if (cloudReport == nullptr) {
622         return traceIds;
623     }
624     if (info.bundleName_.empty()) {
625         for (const auto &it : cloud.apps) {
626             traceIds.emplace(it.first, cloudReport->GetPrepareTraceId(info.user_));
627         }
628     } else {
629         traceIds.emplace(info.bundleName_, cloudReport->GetPrepareTraceId(info.user_));
630     }
631     return traceIds;
632 }
633 
634 bool SyncManager::NeedGetCloudInfo(CloudInfo &cloud)
635 {
636     return (!MetaDataManager::GetInstance().LoadMeta(cloud.GetKey(), cloud, true) || !cloud.enableCloud) &&
637            DmAdapter::GetInstance().IsNetworkAvailable() && Account::GetInstance()->IsLoginAccount();
638 }
639 
640 std::vector<std::tuple<QueryKey, uint64_t>> SyncManager::GetCloudSyncInfo(const SyncInfo &info, CloudInfo &cloud)
641 {
642     std::vector<std::tuple<QueryKey, uint64_t>> cloudSyncInfos;
643     if (NeedGetCloudInfo(cloud)) {
644         ZLOGI("get cloud info from server, user: %{public}d.", cloud.user);
645         auto instance = CloudServer::GetInstance();
646         if (instance == nullptr) {
647             return cloudSyncInfos;
648         }
649         cloud = instance->GetServerInfo(cloud.user, false);
650         if (!cloud.IsValid()) {
651             ZLOGE("cloud is empty, user: %{public}d", cloud.user);
652             return cloudSyncInfos;
653         }
654         if (!MetaDataManager::GetInstance().SaveMeta(cloud.GetKey(), cloud, true)) {
655             ZLOGW("save cloud info fail, user: %{public}d", cloud.user);
656         }
657     }
658     if (info.bundleName_.empty()) {
659         for (const auto &it : cloud.apps) {
660             QueryKey queryKey{ .accountId = cloud.id, .bundleName = it.first, .storeId = "" };
661             cloudSyncInfos.emplace_back(std::make_tuple(queryKey, info.syncId_));
662         }
663     } else {
664         QueryKey queryKey{ .accountId = cloud.id, .bundleName = info.bundleName_, .storeId = "" };
665         cloudSyncInfos.emplace_back(std::make_tuple(queryKey, info.syncId_));
666     }
667     return cloudSyncInfos;
668 }
669 
670 void SyncManager::GetLastResults(
671     const std::string &storeId, std::map<SyncId, CloudSyncInfo> &infos, QueryLastResults &results)
672 {
673     for (auto &[key, info] : infos) {
674         if (info.code != -1) {
675             results.insert(std::pair<std::string, CloudSyncInfo>(storeId, info));
676         }
677     }
678 }
679 
680 bool SyncManager::NeedSaveSyncInfo(const QueryKey &queryKey)
681 {
682     if (queryKey.accountId.empty()) {
683         return false;
684     }
685     if (std::find(kvApps_.begin(), kvApps_.end(), queryKey.bundleName) != kvApps_.end()) {
686         return false;
687     }
688     return true;
689 }
690 
691 int32_t SyncManager::QueryLastSyncInfo(const std::vector<QueryKey> &queryKeys, QueryLastResults &results)
692 {
693     for (const auto &queryKey : queryKeys) {
694         std::string storeId = queryKey.storeId;
695         QueryKey key{ .accountId = queryKey.accountId, .bundleName = queryKey.bundleName, .storeId = "" };
696         lastSyncInfos_.ComputeIfPresent(
697             key, [&storeId, &results](auto &key, std::map<SyncId, CloudSyncInfo> &vals) {
698                 GetLastResults(storeId, vals, results);
699                 return !vals.empty();
700             });
701     }
702     return SUCCESS;
703 }
704 
705 void SyncManager::UpdateStartSyncInfo(const std::vector<std::tuple<QueryKey, uint64_t>> &cloudSyncInfos)
706 {
707     int64_t startTime = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
708     for (const auto &[queryKey, syncId] : cloudSyncInfos) {
709         if (!NeedSaveSyncInfo(queryKey)) {
710             continue;
711         }
712         lastSyncInfos_.Compute(queryKey, [id = syncId, startTime](auto &, std::map<SyncId, CloudSyncInfo> &val) {
713             val[id] = { .startTime = startTime };
714             return !val.empty();
715         });
716     }
717 }
718 
719 void SyncManager::UpdateFinishSyncInfo(const QueryKey &queryKey, uint64_t syncId, int32_t code)
720 {
721     if (!NeedSaveSyncInfo(queryKey)) {
722         return;
723     }
724     lastSyncInfos_.ComputeIfPresent(queryKey, [syncId, code](auto &key, std::map<SyncId, CloudSyncInfo> &val) {
725         auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
726         for (auto iter = val.begin(); iter != val.end();) {
727             bool isExpired = ((now - iter->second.startTime) >= EXPIRATION_TIME) && iter->second.code == -1;
728             if ((iter->first != syncId && ((iter->second.code != -1) || isExpired))) {
729                 iter = val.erase(iter);
730             } else if (iter->first == syncId) {
731                 iter->second.finishTime = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
732                 iter->second.code = code;
733                 iter++;
734             } else {
735                 iter++;
736             }
737         }
738         return true;
739     });
740 }
741 
742 std::function<void(const GenDetails &result)> SyncManager::GetCallback(const GenAsync &async,
743     const StoreInfo &storeInfo, int32_t triggerMode, const std::string &prepareTraceId, int32_t user)
744 {
745     return [this, async, storeInfo, triggerMode, prepareTraceId, user](const GenDetails &result) {
746         if (async != nullptr) {
747             async(result);
748         }
749 
750         if (result.empty()) {
751             ZLOGE("result is empty");
752             return;
753         }
754 
755         if (result.begin()->second.progress != GenProgress::SYNC_FINISH) {
756             return;
757         }
758 
759         int32_t dbCode = (result.begin()->second.dbCode == GenStore::DB_ERR_OFFSET) ? 0 : result.begin()->second.dbCode;
760         RadarReporter::Report({ storeInfo.bundleName.c_str(), CLOUD_SYNC, FINISH_SYNC, storeInfo.syncId, triggerMode,
761                                   result.begin()->second.changeCount, dbCode },
762             "GetCallback", BizState::END);
763         Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::END, dbCode });
764 
765         auto id = GetAccountId(storeInfo.user);
766         if (id.empty()) {
767             ZLOGD("account id is empty");
768             return;
769         }
770         QueryKey queryKey{
771             .accountId = id,
772             .bundleName = storeInfo.bundleName,
773             .storeId = ""
774         };
775 
776         int32_t code = result.begin()->second.code;
777         UpdateFinishSyncInfo(queryKey, storeInfo.syncId, code);
778     };
779 }
780 
781 std::string SyncManager::GetAccountId(int32_t user)
782 {
783     CloudInfo cloudInfo;
784     cloudInfo.user = user;
785     if (!MetaDataManager::GetInstance().LoadMeta(cloudInfo.GetKey(), cloudInfo, true)) {
786         ZLOGE("not exist meta, user:%{public}d.", cloudInfo.user);
787         return "";
788     }
789     return cloudInfo.id;
790 }
791 
792 ExecutorPool::Duration SyncManager::GetInterval(int32_t code)
793 {
794     switch (code) {
795         case E_LOCKED_BY_OTHERS:
796             return LOCKED_INTERVAL;
797         case E_BUSY:
798             return BUSY_INTERVAL;
799         default:
800             return RETRY_INTERVAL;
801     }
802 }
803 
804 std::vector<SchemaMeta> SyncManager::GetSchemaMeta(const CloudInfo &cloud, const std::string &bundleName)
805 {
806     std::vector<SchemaMeta> schemas;
807     auto key = cloud.GetSchemaPrefix(bundleName);
808     MetaDataManager::GetInstance().LoadMeta(key, schemas, true);
809     return schemas;
810 }
811 
812 void SyncManager::DoExceptionalCallback(const GenAsync &async, GenDetails &details, const StoreInfo &storeInfo,
813     const std::string &prepareTraceId)
814 {
815     if (async) {
816         details[SyncInfo::DEFAULT_ID].code = E_ERROR;
817         async(details);
818     }
819     QueryKey queryKey{ GetAccountId(storeInfo.user), storeInfo.bundleName, "" };
820     UpdateFinishSyncInfo(queryKey, storeInfo.syncId, E_ERROR);
821     Report({ storeInfo.user, storeInfo.bundleName, prepareTraceId, SyncStage::END, E_ERROR });
822 }
823 
824 bool SyncManager::InitDefaultUser(int32_t &user)
825 {
826     if (user != 0) {
827         return false;
828     }
829     std::vector<int32_t> users;
830     AccountDelegate::GetInstance()->QueryUsers(users);
831     if (!users.empty()) {
832         user = users[0];
833     }
834     return true;
835 }
836 
837 std::function<void(const DistributedData::GenDetails &result)> SyncManager::RetryCallback(const StoreInfo &storeInfo,
838     Retryer retryer, int32_t triggerMode, const std::string &prepareTraceId, int32_t user)
839 {
840     return [this, retryer, storeInfo, triggerMode, prepareTraceId, user](const GenDetails &details) {
841         if (details.empty()) {
842             ZLOGE("retry, details empty");
843             return;
844         }
845         int32_t code = details.begin()->second.code;
846         int32_t dbCode = details.begin()->second.dbCode;
847         if (details.begin()->second.progress == GenProgress::SYNC_FINISH) {
848             QueryKey queryKey{ GetAccountId(storeInfo.user), storeInfo.bundleName, "" };
849             UpdateFinishSyncInfo(queryKey, storeInfo.syncId, code);
850             if (code == E_OK) {
851                 RadarReporter::Report({ storeInfo.bundleName.c_str(), CLOUD_SYNC, FINISH_SYNC, storeInfo.syncId,
852                                           triggerMode, details.begin()->second.changeCount },
853                     "RetryCallback", BizState::END);
854                 Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::END,
855                     dbCode == GenStore::DB_ERR_OFFSET ? 0 : dbCode });
856             }
857         }
858         retryer(GetInterval(code), code, dbCode, prepareTraceId);
859     };
860 }
861 
862 void SyncManager::BatchUpdateFinishState(const std::vector<std::tuple<QueryKey, uint64_t>> &cloudSyncInfos,
863     int32_t code)
864 {
865     for (const auto &[queryKey, syncId] : cloudSyncInfos) {
866         UpdateFinishSyncInfo(queryKey, syncId, code);
867     }
868 }
869 
870 void SyncManager::BatchReport(int32_t userId, const TraceIds &traceIds, SyncStage syncStage, int32_t errCode)
871 {
872     for (const auto &[bundle, id] : traceIds) {
873         Report({ userId, bundle, id, syncStage, errCode });
874     }
875 }
876 
877 std::pair<bool, StoreMetaData> SyncManager::GetMetaData(const StoreInfo &storeInfo)
878 {
879     StoreMetaData meta(storeInfo);
880     meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid;
881     if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
882         meta.user = "0"; // check if it is a public store.
883         StoreMetaDataLocal localMetaData;
884         if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKeyLocal(), localMetaData, true) ||
885             !localMetaData.isPublic || !MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
886             ZLOGE("failed, no store meta. bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
887                   meta.GetStoreAlias().c_str());
888             return { false, meta };
889         }
890     }
891     return { true, meta };
892 }
893 } // namespace OHOS::CloudData