1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "DelayNotify"
16 #include "delay_notify.h"
17 #include "logger.h"
18 namespace OHOS::NativeRdb {
19 using namespace OHOS::Rdb;
DelayNotify()20 DelayNotify::DelayNotify() : pauseCount_(0), task_(nullptr), pool_(nullptr)
21 {
22 }
23 
~DelayNotify()24 DelayNotify::~DelayNotify()
25 {
26     if (pool_ == nullptr) {
27         return;
28     }
29     if (delaySyncTaskId_ != Executor::INVALID_TASK_ID) {
30         pool_->Remove(delaySyncTaskId_);
31     }
32     if (task_ != nullptr && changedData_.tableData.size() > 0) {
33         DistributedRdb::RdbNotifyConfig rdbNotifyConfig;
34         rdbNotifyConfig.delay_ = 0;
35         rdbNotifyConfig.isFull_ = isFull_;
36         auto errCode = task_(changedData_, rdbNotifyConfig);
37         if (errCode != 0) {
38             LOG_ERROR("NotifyDataChange is failed, err is %{public}d.", errCode);
39         }
40     }
41 }
42 
UpdateNotify(const DistributedRdb::RdbChangedData &changedData, bool isFull)43 void DelayNotify::UpdateNotify(const DistributedRdb::RdbChangedData &changedData, bool isFull)
44 {
45     LOG_DEBUG("Update changed data.");
46     {
47         std::lock_guard<std::mutex> lock(mutex_);
48         for (auto& [k, v] : changedData.tableData) {
49             if (!v.isTrackedDataChange) {
50                 continue;
51             }
52             auto it = changedData_.tableData.find(k);
53             if (it == changedData_.tableData.end()) {
54                 changedData_.tableData.insert_or_assign(k, v);
55             }
56         }
57         isFull_ |= isFull;
58     }
59     StartTimer();
60 }
61 
SetExecutorPool(std::shared_ptr<ExecutorPool> pool)62 void DelayNotify::SetExecutorPool(std::shared_ptr<ExecutorPool> pool)
63 {
64     if (pool_ != nullptr) {
65         return;
66     }
67     pool_ = pool;
68 }
69 
SetTask(Task task)70 void DelayNotify::SetTask(Task task)
71 {
72     task_ = std::move(task);
73 }
74 
StartTimer()75 void DelayNotify::StartTimer()
76 {
77     DistributedRdb::RdbChangedData changedData;
78     bool needExecTask = false;
79     bool isFull = false;
80     {
81         std::lock_guard<std::mutex> lock(mutex_);
82         changedData.tableData = changedData_.tableData;
83         isFull = isFull_;
84         if (pool_ == nullptr) {
85             return;
86         }
87 
88         if (delaySyncTaskId_ == Executor::INVALID_TASK_ID) {
89             delaySyncTaskId_ = pool_->Schedule(std::chrono::milliseconds(autoSyncInterval_),
90                 [this]() { ExecuteTask(); });
91         } else {
92             delaySyncTaskId_ =
93                 pool_->Reset(delaySyncTaskId_, std::chrono::milliseconds(autoSyncInterval_));
94         }
95 
96         if (autoSyncInterval_ == AUTO_SYNC_INTERVAL || changedData.tableData.empty()) {
97             return;
98         }
99 
100         if (!isInitialized_) {
101             needExecTask = true;
102             lastTimePoint_ = std::chrono::steady_clock::now();
103             isInitialized_ = true;
104         } else {
105             Time curTime = std::chrono::steady_clock::now();
106             auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(curTime - lastTimePoint_);
107             if (duration >= std::chrono::milliseconds(MAX_NOTIFY_INTERVAL)) {
108                 needExecTask = true;
109                 lastTimePoint_ = std::chrono::steady_clock::now();
110             }
111         }
112     }
113 
114     if (needExecTask) {
115         DistributedRdb::RdbNotifyConfig rdbNotifyConfig;
116         rdbNotifyConfig.delay_ = SERVICE_INTERVAL;
117         rdbNotifyConfig.isFull_ = isFull;
118         task_(changedData, rdbNotifyConfig);
119     }
120 }
121 
StopTimer()122 void DelayNotify::StopTimer()
123 {
124     if (pool_ != nullptr) {
125         pool_->Remove(delaySyncTaskId_);
126     }
127     delaySyncTaskId_ = Executor::INVALID_TASK_ID;
128 }
129 
ExecuteTask()130 void DelayNotify::ExecuteTask()
131 {
132     LOG_DEBUG("Notify data change.");
133     DistributedRdb::RdbChangedData changedData;
134     bool isFull = false;
135     {
136         std::lock_guard<std::mutex> lock(mutex_);
137         changedData.tableData = std::move(changedData_.tableData);
138         isFull = isFull_;
139         RestoreDefaultSyncInterval();
140         StopTimer();
141         isFull_ = false;
142     }
143     if (task_ != nullptr && (changedData.tableData.size() > 0 || isFull)) {
144         DistributedRdb::RdbNotifyConfig rdbNotifyConfig;
145         rdbNotifyConfig.delay_ = 0;
146         rdbNotifyConfig.isFull_ = isFull;
147         int errCode = task_(changedData, rdbNotifyConfig);
148         if (errCode != 0) {
149             LOG_ERROR("NotifyDataChange is failed, err is %{public}d.", errCode);
150             std::lock_guard<std::mutex> lock(mutex_);
151             for (auto& [k, v] : changedData.tableData) {
152                 changedData_.tableData.insert_or_assign(k, v);
153             }
154             return;
155         }
156     }
157 }
158 
SetAutoSyncInterval(uint32_t interval)159 void DelayNotify::SetAutoSyncInterval(uint32_t interval)
160 {
161     autoSyncInterval_ = interval;
162 }
163 
RestoreDefaultSyncInterval()164 void DelayNotify::RestoreDefaultSyncInterval()
165 {
166     autoSyncInterval_ = AUTO_SYNC_INTERVAL;
167 }
168 
Pause()169 void DelayNotify::Pause()
170 {
171     StopTimer();
172     pauseCount_.fetch_add(1, std::memory_order_relaxed);
173 }
174 
Resume()175 void DelayNotify::Resume()
176 {
177     pauseCount_.fetch_sub(1, std::memory_order_relaxed);
178     if (pauseCount_.load() == 0) {
179         StartTimer();
180     }
181 }
182 
PauseDelayNotify(std::shared_ptr<DelayNotify> delayNotifier)183 PauseDelayNotify::PauseDelayNotify(std::shared_ptr<DelayNotify> delayNotifier) : delayNotifier_(delayNotifier)
184 {
185     if (delayNotifier_ != nullptr) {
186         delayNotifier_->Pause();
187         delayNotifier_->SetAutoSyncInterval(AUTO_SYNC_MAX_INTERVAL);
188     }
189 }
190 
~PauseDelayNotify()191 PauseDelayNotify::~PauseDelayNotify()
192 {
193     if (delayNotifier_ != nullptr) {
194         delayNotifier_->Resume();
195     }
196 }
197 }