1/*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#ifndef CONTACTSDATAABILITY_ASYNC_TASK_H
17#define CONTACTSDATAABILITY_ASYNC_TASK_H
18
19#include <atomic>
20#include <chrono>
21#include <ctime>
22#include <exception>
23#include <iostream>
24#include <memory>
25#include <mutex>
26#include <queue>
27#include <string>
28#include <thread>
29#include <vector>
30
31#include "common.h"
32#include "contacts_database.h"
33#include "contacts_update_helper.h"
34#include "hilog_wrapper.h"
35#include "match_candidate.h"
36
37namespace OHOS {
38namespace Contacts {
39class AsyncItem {
40public:
41    virtual ~AsyncItem()
42    {
43    }
44
45    virtual void Run() = 0;
46};
47
48class AsyncTaskMutex {
49public:
50    void lock()
51    {
52        while (flag.test_and_set(std::memory_order_acquire)) {
53        }
54    }
55
56    void unlock()
57    {
58        flag.clear(std::memory_order_release);
59    }
60
61private:
62    std::atomic_flag flag = ATOMIC_FLAG_INIT;
63};
64
65class AsyncTaskQueue {
66public:
67    // single instance
68    static AsyncTaskQueue *Instance()
69    {
70        static AsyncTaskQueue obj;
71        return &obj;
72    }
73
74public:
75    // clear task
76    void Clear()
77    {
78        std::lock_guard<AsyncTaskMutex> lk(mtx);
79        while (que.size() > 0)
80            que.pop();
81    }
82
83    // que empty
84    bool Empty() const
85    {
86        std::lock_guard<AsyncTaskMutex> lk(mtx);
87        return que.empty();
88    }
89
90    size_t Size() const
91    {
92        std::lock_guard<AsyncTaskMutex> lk(mtx);
93        return que.size();
94    }
95
96    size_t GetThreads() const
97    {
98        return threads;
99    }
100
101    bool Push(std::unique_ptr<AsyncItem> &task)
102    {
103        std::lock_guard<AsyncTaskMutex> lk(mtx);
104        if (maxSize > 0 && que.size() >= maxSize) {
105            HILOG_ERROR("AsyncTask maxSize error");
106            return false;
107        }
108        que.push(task.release());
109        return true;
110    }
111
112    // startTask
113    void Start(size_t threads = 1, size_t maxSize = 1000000)
114    {
115        if (this->threads > 0) {
116            return;
117        }
118        this->threads = threads;
119        this->maxSize = maxSize;
120        for (size_t i = 0; i < this->threads; i++) {
121            std::thread(std::bind(&AsyncTaskQueue::Run, this)).detach();
122        }
123    }
124
125public:
126    void Run()
127    {
128        AsyncItem *item = nullptr;
129        while (this->threads > 0) {
130            if (Pop(&item)) {
131                if (item != nullptr) {
132                    item->Run();
133                    delete item;
134                    item = nullptr;
135                }
136            } else {
137                std::chrono::milliseconds dura(1);
138                std::this_thread::sleep_for(dura);
139            }
140        }
141    }
142
143private:
144    size_t maxSize;
145    size_t threads;
146    mutable AsyncTaskMutex mtx;
147    std::queue<AsyncItem *> que;
148    AsyncTaskQueue()
149    {
150        this->maxSize = 0;
151        this->threads = 0;
152    }
153
154    bool Pop(AsyncItem **item)
155    {
156        std::lock_guard<AsyncTaskMutex> lk(mtx);
157        if (que.empty()) {
158            return false;
159        }
160        *item = que.front();
161        que.pop();
162        return true;
163    }
164};
165
166// impl run
167class AsyncTask : public AsyncItem {
168    std::shared_ptr<OHOS::NativeRdb::RdbStore> store;
169    std::vector<int> rawContactIdVector;
170    bool isDeleted;
171
172public:
173    void Run()
174    {
175        ContactsUpdateHelper contactsUpdateHelper;
176        contactsUpdateHelper.UpdateCallLogByPhoneNum(rawContactIdVector, store, isDeleted);
177        std::shared_ptr<ContactsDataBase> contactsDataBase = ContactsDataBase::GetInstance();
178        contactsDataBase->InsertMergeData(store, rawContactIdVector);
179        contactsDataBase->MarkMerge(store);
180    }
181
182public:
183    AsyncTask(std::shared_ptr<OHOS::NativeRdb::RdbStore> &store, std::vector<int> &rawContactIdVector, bool isDeleted)
184    {
185        this->store = store;
186        this->rawContactIdVector = rawContactIdVector;
187        this->isDeleted = isDeleted;
188    }
189
190public:
191    AsyncTask()
192    {
193    }
194};
195
196class AsyncDeleteContactsTask : public AsyncItem {
197    std::vector<OHOS::NativeRdb::ValuesBucket> queryValuesBucket;
198    std::shared_ptr<OHOS::NativeRdb::RdbStore> store;
199
200public:
201    void Run()
202    {
203        std::shared_ptr<ContactsDataBase> contactsDataBase = ContactsDataBase::GetInstance();
204        contactsDataBase->DeleteRecordInsert(store, queryValuesBucket);
205    }
206
207public:
208    AsyncDeleteContactsTask(std::shared_ptr<OHOS::NativeRdb::RdbStore> &store,
209        std::vector<OHOS::NativeRdb::ValuesBucket> &queryValuesBucket)
210    {
211        this->queryValuesBucket = queryValuesBucket;
212        this->store = store;
213    }
214
215public:
216    AsyncDeleteContactsTask()
217    {
218    }
219};
220} // namespace Contacts
221} // namespace OHOS
222
223#endif // CONTACTSDATAABILITY_ASYNC_TASK_H
224