1/* 2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#ifndef CONTACTSDATAABILITY_ASYNC_TASK_H 17#define CONTACTSDATAABILITY_ASYNC_TASK_H 18 19#include <atomic> 20#include <chrono> 21#include <ctime> 22#include <exception> 23#include <iostream> 24#include <memory> 25#include <mutex> 26#include <queue> 27#include <string> 28#include <thread> 29#include <vector> 30 31#include "common.h" 32#include "contacts_database.h" 33#include "contacts_update_helper.h" 34#include "hilog_wrapper.h" 35#include "match_candidate.h" 36 37namespace OHOS { 38namespace Contacts { 39class AsyncItem { 40public: 41 virtual ~AsyncItem() 42 { 43 } 44 45 virtual void Run() = 0; 46}; 47 48class AsyncTaskMutex { 49public: 50 void lock() 51 { 52 while (flag.test_and_set(std::memory_order_acquire)) { 53 } 54 } 55 56 void unlock() 57 { 58 flag.clear(std::memory_order_release); 59 } 60 61private: 62 std::atomic_flag flag = ATOMIC_FLAG_INIT; 63}; 64 65class AsyncTaskQueue { 66public: 67 // single instance 68 static AsyncTaskQueue *Instance() 69 { 70 static AsyncTaskQueue obj; 71 return &obj; 72 } 73 74public: 75 // clear task 76 void Clear() 77 { 78 std::lock_guard<AsyncTaskMutex> lk(mtx); 79 while (que.size() > 0) 80 que.pop(); 81 } 82 83 // que empty 84 bool Empty() const 85 { 86 std::lock_guard<AsyncTaskMutex> lk(mtx); 87 return que.empty(); 88 } 89 90 size_t Size() const 91 { 92 std::lock_guard<AsyncTaskMutex> lk(mtx); 93 return que.size(); 94 } 95 96 size_t GetThreads() const 97 { 98 return threads; 99 } 100 101 bool Push(std::unique_ptr<AsyncItem> &task) 102 { 103 std::lock_guard<AsyncTaskMutex> lk(mtx); 104 if (maxSize > 0 && que.size() >= maxSize) { 105 HILOG_ERROR("AsyncTask maxSize error"); 106 return false; 107 } 108 que.push(task.release()); 109 return true; 110 } 111 112 // startTask 113 void Start(size_t threads = 1, size_t maxSize = 1000000) 114 { 115 if (this->threads > 0) { 116 return; 117 } 118 this->threads = threads; 119 this->maxSize = maxSize; 120 for (size_t i = 0; i < this->threads; i++) { 121 std::thread(std::bind(&AsyncTaskQueue::Run, this)).detach(); 122 } 123 } 124 125public: 126 void Run() 127 { 128 AsyncItem *item = nullptr; 129 while (this->threads > 0) { 130 if (Pop(&item)) { 131 if (item != nullptr) { 132 item->Run(); 133 delete item; 134 item = nullptr; 135 } 136 } else { 137 std::chrono::milliseconds dura(1); 138 std::this_thread::sleep_for(dura); 139 } 140 } 141 } 142 143private: 144 size_t maxSize; 145 size_t threads; 146 mutable AsyncTaskMutex mtx; 147 std::queue<AsyncItem *> que; 148 AsyncTaskQueue() 149 { 150 this->maxSize = 0; 151 this->threads = 0; 152 } 153 154 bool Pop(AsyncItem **item) 155 { 156 std::lock_guard<AsyncTaskMutex> lk(mtx); 157 if (que.empty()) { 158 return false; 159 } 160 *item = que.front(); 161 que.pop(); 162 return true; 163 } 164}; 165 166// impl run 167class AsyncTask : public AsyncItem { 168 std::shared_ptr<OHOS::NativeRdb::RdbStore> store; 169 std::vector<int> rawContactIdVector; 170 bool isDeleted; 171 172public: 173 void Run() 174 { 175 ContactsUpdateHelper contactsUpdateHelper; 176 contactsUpdateHelper.UpdateCallLogByPhoneNum(rawContactIdVector, store, isDeleted); 177 std::shared_ptr<ContactsDataBase> contactsDataBase = ContactsDataBase::GetInstance(); 178 contactsDataBase->InsertMergeData(store, rawContactIdVector); 179 contactsDataBase->MarkMerge(store); 180 } 181 182public: 183 AsyncTask(std::shared_ptr<OHOS::NativeRdb::RdbStore> &store, std::vector<int> &rawContactIdVector, bool isDeleted) 184 { 185 this->store = store; 186 this->rawContactIdVector = rawContactIdVector; 187 this->isDeleted = isDeleted; 188 } 189 190public: 191 AsyncTask() 192 { 193 } 194}; 195 196class AsyncDeleteContactsTask : public AsyncItem { 197 std::vector<OHOS::NativeRdb::ValuesBucket> queryValuesBucket; 198 std::shared_ptr<OHOS::NativeRdb::RdbStore> store; 199 200public: 201 void Run() 202 { 203 std::shared_ptr<ContactsDataBase> contactsDataBase = ContactsDataBase::GetInstance(); 204 contactsDataBase->DeleteRecordInsert(store, queryValuesBucket); 205 } 206 207public: 208 AsyncDeleteContactsTask(std::shared_ptr<OHOS::NativeRdb::RdbStore> &store, 209 std::vector<OHOS::NativeRdb::ValuesBucket> &queryValuesBucket) 210 { 211 this->queryValuesBucket = queryValuesBucket; 212 this->store = store; 213 } 214 215public: 216 AsyncDeleteContactsTask() 217 { 218 } 219}; 220} // namespace Contacts 221} // namespace OHOS 222 223#endif // CONTACTSDATAABILITY_ASYNC_TASK_H 224