1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OHOS_DISTRIBUTED_DATA_DATAMGR_SERVICE_RDB_GENERAL_STORE_H
17 #define OHOS_DISTRIBUTED_DATA_DATAMGR_SERVICE_RDB_GENERAL_STORE_H
18 #include <atomic>
19 #include <functional>
20 #include <shared_mutex>
21 
22 #include "concurrent_map.h"
23 #include "metadata/store_meta_data.h"
24 #include "rdb_asset_loader.h"
25 #include "rdb_cloud.h"
26 #include "rdb_store.h"
27 #include "relational_store_delegate.h"
28 #include "relational_store_manager.h"
29 #include "snapshot/snapshot.h"
30 #include "store/general_store.h"
31 #include "store/general_value.h"
32 namespace OHOS::DistributedRdb {
33 class RdbGeneralStore : public DistributedData::GeneralStore {
34 public:
35     using Cursor = DistributedData::Cursor;
36     using GenQuery = DistributedData::GenQuery;
37     using VBucket = DistributedData::VBucket;
38     using VBuckets = DistributedData::VBuckets;
39     using Value = DistributedData::Value;
40     using Values = DistributedData::Values;
41     using StoreMetaData = DistributedData::StoreMetaData;
42     using Database = DistributedData::Database;
43     using GenErr = DistributedData::GeneralError;
44     using RdbStore = OHOS::NativeRdb::RdbStore;
45     using Reference = DistributedData::Reference;
46     using Snapshot = DistributedData::Snapshot;
47     using BindAssets = DistributedData::BindAssets;
48 
49     explicit RdbGeneralStore(const StoreMetaData &meta);
50     ~RdbGeneralStore();
51 
52     static void OnSyncStart(const DistributedData::StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode,
53         uint32_t traceId, uint32_t syncCount);
54     static void OnSyncFinish(const DistributedData::StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode,
55         uint32_t traceId);
56     void SetExecutor(std::shared_ptr<Executor> executor) override;
57     int32_t Bind(Database &database, const std::map<uint32_t, BindInfo> &bindInfos,
58         const CloudConfig &config) override;
59     bool IsBound() override;
60     bool IsValid();
61     int32_t Execute(const std::string &table, const std::string &sql) override;
62     int32_t SetDistributedTables(const std::vector<std::string> &tables, int32_t type,
63 	    const std::vector<Reference> &references) override;
64     int32_t SetTrackerTable(const std::string& tableName, const std::set<std::string>& trackerColNames,
65         const std::string& extendColName, bool isForceUpgrade = false) override;
66     int32_t Insert(const std::string &table, VBuckets &&values) override;
67     int32_t Update(const std::string &table, const std::string &setSql, Values &&values, const std::string &whereSql,
68         Values &&conditions) override;
69     int32_t Replace(const std::string &table, VBucket &&value) override;
70     int32_t Delete(const std::string &table, const std::string &sql, Values &&args) override;
71     std::pair<int32_t, std::shared_ptr<Cursor>> Query(const std::string &table, const std::string &sql,
72         Values &&args) override;
73     std::pair<int32_t, std::shared_ptr<Cursor>> Query(const std::string &table, GenQuery &query) override;
74     int32_t Sync(
75         const Devices &devices, GenQuery &query, DetailAsync async, DistributedData::SyncParam &syncParam) override;
76     std::pair<int32_t, std::shared_ptr<Cursor>> PreSharing(GenQuery &query) override;
77     int32_t Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName) override;
78     int32_t Watch(int32_t origin, Watcher &watcher) override;
79     int32_t Unwatch(int32_t origin, Watcher &watcher) override;
80     int32_t RegisterDetailProgressObserver(DetailAsync async) override;
81     int32_t UnregisterDetailProgressObserver() override;
82     int32_t Close(bool isForce = false) override;
83     int32_t AddRef() override;
84     int32_t Release() override;
85     int32_t BindSnapshots(std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>> bindAssets) override;
86     int32_t MergeMigratedData(const std::string &tableName, VBuckets&& values) override;
87     int32_t CleanTrackerData(const std::string &tableName, int64_t cursor) override;
88     std::vector<std::string> GetWaterVersion(const std::string &deviceId) override;
89     std::pair<int32_t, uint32_t> LockCloudDB() override;
90     int32_t UnLockCloudDB() override;
91 
92 private:
93     RdbGeneralStore(const RdbGeneralStore& rdbGeneralStore);
94     RdbGeneralStore& operator=(const RdbGeneralStore& rdbGeneralStore);
95     using RdbDelegate = DistributedDB::RelationalStoreDelegate;
96     using RdbManager = DistributedDB::RelationalStoreManager;
97     using SyncProcess = DistributedDB::SyncProcess;
98     using DBBriefCB = DistributedDB::SyncStatusCallback;
99     using DBProcessCB = std::function<void(const std::map<std::string, SyncProcess> &processes)>;
100     using TaskId = ExecutorPool::TaskId;
101     using Time = std::chrono::steady_clock::time_point;
102     using SyncId = uint64_t;
103     static GenErr ConvertStatus(DistributedDB::DBStatus status);
104     // GetIntersection and return results in the order of collecter1
105     static std::vector<std::string> GetIntersection(std::vector<std::string> &&syncTables,
106         const std::set<std::string> &localTables);
107     static constexpr inline uint64_t REMOTE_QUERY_TIME_OUT = 30 * 1000;
108     static constexpr int64_t INTERVAL = 1;
109     static constexpr const char* CLOUD_GID = "cloud_gid";
110     static constexpr const char* DATE_KEY = "data_key";
111     static constexpr const char* QUERY_TABLES_SQL = "select name from sqlite_master where type = 'table';";
112     static constexpr uint32_t ITER_V0 = 10000;
113     static constexpr uint32_t ITER_V1 = 5000;
114     static constexpr uint32_t ITERS[] = {ITER_V0, ITER_V1};
115     static constexpr uint32_t ITERS_COUNT = sizeof(ITERS) / sizeof(ITERS[0]);
116     class ObserverProxy : public DistributedDB::StoreObserver {
117     public:
118         using DBChangedIF = DistributedDB::StoreChangedData;
119         using DBChangedData = DistributedDB::ChangedData;
120         using DBOrigin = DistributedDB::Origin;
121         using GenOrigin = Watcher::Origin;
122         void OnChange(const DistributedDB::StoreChangedData &data) override;
123         void OnChange(DBOrigin origin, const std::string &originalId, DBChangedData &&data) override;
HasWatcher() const124         bool HasWatcher() const
125         {
126             return watcher_ != nullptr;
127         }
128     private:
129         enum ChangeType {
130             CLOUD_DATA_CHANGE = 0,
131             CLOUD_DATA_CLEAN
132         };
133         void PostDataChange(const StoreMetaData &meta, const std::vector<std::string> &tables, ChangeType type);
134         friend RdbGeneralStore;
135         Watcher *watcher_ = nullptr;
136         std::string storeId_;
137         StoreMetaData meta_;
138     };
139     DBBriefCB GetDBBriefCB(DetailAsync async);
140     DBProcessCB GetCB(SyncId syncId);
141     DBProcessCB GetDBProcessCB(DetailAsync async, uint32_t syncMode, SyncId syncId,
142         uint32_t highMode = AUTO_SYNC_MODE);
143     Executor::Task GetFinishTask(SyncId syncId);
144     std::shared_ptr<Cursor> RemoteQuery(const std::string &device,
145         const DistributedDB::RemoteCondition &remoteCondition);
146     std::string BuildSql(const std::string& table, const std::string& statement,
147         const std::vector<std::string>& columns) const;
148     std::pair<int32_t, VBuckets> QuerySql(const std::string& sql, Values &&args);
149     std::set<std::string> GetTables();
150     VBuckets ExtractExtend(VBuckets& values) const;
151     size_t SqlConcatenate(VBucket &value, std::string &strColumnSql, std::string &strRowValueSql);
152     bool IsPrintLog(DistributedDB::DBStatus status);
153     std::shared_ptr<RdbCloud> GetRdbCloud() const;
154     bool IsFinished(uint64_t syncId) const;
155     void RemoveTasks();
156 
157     ObserverProxy observer_;
158     RdbManager manager_;
159     RdbDelegate *delegate_ = nullptr;
160     DetailAsync async_ = nullptr;
161     std::shared_ptr<RdbCloud> rdbCloud_ {};
162     std::shared_ptr<RdbAssetLoader> rdbLoader_ {};
163     BindInfo bindInfo_;
164     std::atomic<bool> isBound_ = false;
165     std::mutex mutex_;
166     int32_t ref_ = 1;
167     mutable std::shared_timed_mutex rwMutex_;
168 
169     BindAssets snapshots_;
170     DistributedData::StoreInfo storeInfo_;
171 
172     DistributedDB::DBStatus lastError_ = DistributedDB::DBStatus::OK;
173     static constexpr uint32_t PRINT_ERROR_CNT = 150;
174     uint32_t lastErrCnt_ = 0;
175     uint32_t syncNotifyFlag_ = 0;
176     std::atomic<SyncId> syncTaskId_ = 0;
177     std::shared_mutex asyncMutex_ {};
178     mutable std::shared_mutex rdbCloudMutex_;
179     struct FinishTask {
180         TaskId taskId = Executor::INVALID_TASK_ID;
181         DBProcessCB cb = nullptr;
182     };
183     std::shared_ptr<Executor> executor_ = nullptr;
184     std::shared_ptr<ConcurrentMap<SyncId, FinishTask>> tasks_;
185 };
186 } // namespace OHOS::DistributedRdb
187 #endif // OHOS_DISTRIBUTED_DATA_DATAMGR_SERVICE_RDB_GENERAL_STORE_H
188