1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "utils_work_queue.h"
17 
18 #include <pthread.h>
19 #include <stddef.h>
20 #include <sys/prctl.h>
21 
22 #include "securec.h"
23 
24 #include "utils_dslm_list.h"
25 #include "utils_log.h"
26 #include "utils_mem.h"
27 
28 #define RUN 0
29 #define DIE 1
30 
31 typedef struct WorkQueue {
32     ListHead head;
33     pthread_mutex_t mutex;
34     pthread_cond_t cond;
35     volatile int32_t state;
36     uint32_t capacity;
37     uint32_t size;
38     pthread_t pthreadId;
39     const char *name;
40 } WorkQueue;
41 
42 typedef struct {
43     ListNode linkNode;
44     WorkProcess process; // callback func
45     uint32_t dataLen;
46     uint8_t *dataBuff; // user data ptr
47 } Worker;
48 
WorkQueueThread(void *data)49 static void *WorkQueueThread(void *data)
50 {
51     WorkQueue *queue = (WorkQueue *)data;
52     Worker *worker = NULL;
53 
54 #ifndef L0_MINI
55     prctl(PR_SET_NAME, queue->name, 0, 0, 0);
56 #endif
57 
58     int ret = pthread_mutex_lock(&queue->mutex);
59     if (ret != 0) {
60         SECURITY_LOG_ERROR("pthread_mutex_lock error");
61         return NULL;
62     }
63     while (queue->state == RUN) {
64         while ((IsEmptyList(&queue->head)) && (queue->state == RUN)) {
65             pthread_cond_wait(&queue->cond, &queue->mutex);
66         }
67         // need to check again
68         if (queue->state != RUN) {
69             break;
70         }
71 
72         worker = LIST_ENTRY(queue->head.next, Worker, linkNode);
73         RemoveListNode(&worker->linkNode);
74         queue->size--;
75 
76         ret = pthread_mutex_unlock(&queue->mutex);
77         if (ret != 0) {
78             SECURITY_LOG_ERROR("pthread_mutex_unlock error");
79         }
80         worker->process(worker->dataBuff, worker->dataLen);
81         FREE(worker);
82         ret = pthread_mutex_lock(&queue->mutex);
83         if (ret != 0) {
84             SECURITY_LOG_ERROR("pthread_mutex_lock error");
85             return NULL;
86         }
87     }
88 
89     // now the queue is stopped, just remove the nodes.
90     while (!IsEmptyList(&queue->head)) {
91         worker = LIST_ENTRY(queue->head.next, Worker, linkNode);
92         RemoveListNode(&worker->linkNode);
93         queue->size--;
94         FREE(worker);
95     }
96 
97     ret = pthread_mutex_unlock(&queue->mutex);
98     if (ret != 0) {
99         SECURITY_LOG_ERROR("pthread_mutex_unlock error");
100     }
101     return NULL;
102 }
103 
104 #ifndef L0_MINI
CreateWorkQueue(uint32_t capacity, const char *name)105 WorkQueue *CreateWorkQueue(uint32_t capacity, const char *name)
106 {
107     WorkQueue *queue = MALLOC(sizeof(WorkQueue));
108     if (queue == NULL) {
109         return NULL;
110     }
111     (void)memset_s(queue, sizeof(WorkQueue), 0, sizeof(WorkQueue));
112 
113     InitListHead(&(queue->head));
114     queue->state = RUN;
115     queue->capacity = capacity;
116     queue->size = 0;
117     queue->name = name;
118 
119     int32_t iRet = pthread_mutex_init(&(queue->mutex), NULL);
120     if (iRet != 0) {
121         FREE(queue);
122         return NULL;
123     }
124 
125     iRet = pthread_cond_init(&queue->cond, NULL);
126     if (iRet != 0) {
127         (void)pthread_mutex_destroy(&(queue->mutex));
128         FREE(queue);
129         return NULL;
130     }
131 
132     iRet = pthread_create(&queue->pthreadId, NULL, WorkQueueThread, queue);
133     if (iRet != 0) {
134         (void)pthread_cond_destroy(&(queue->cond));
135         (void)pthread_mutex_destroy(&(queue->mutex));
136         FREE(queue);
137         return NULL;
138     }
139 
140     return queue;
141 }
142 #else
CreateWorkQueue(uint32_t capacity, const char *name)143 WorkQueue *CreateWorkQueue(uint32_t capacity, const char *name)
144 {
145     pthread_attr_t attr;
146     WorkQueue *queue = MALLOC(sizeof(WorkQueue));
147     if (queue == NULL) {
148         return NULL;
149     }
150     (void)memset_s(queue, sizeof(WorkQueue), 0, sizeof(WorkQueue));
151 
152     InitListHead(&(queue->head));
153     queue->state = RUN;
154     queue->capacity = capacity;
155     queue->size = 0;
156     queue->name = name;
157 
158     int32_t iRet = pthread_mutex_init(&(queue->mutex), NULL);
159     if (iRet != 0) {
160         FREE(queue);
161         return NULL;
162     }
163 
164     iRet = pthread_cond_init(&queue->cond, NULL);
165     if (iRet != 0) {
166         (void)pthread_mutex_destroy(&(queue->mutex));
167         FREE(queue);
168         return NULL;
169     }
170 
171     iRet = pthread_attr_init(&attr);
172     if (iRet != 0) {
173         (void)pthread_cond_destroy(&(queue->cond));
174         (void)pthread_mutex_destroy(&(queue->mutex));
175         (void)pthread_attr_destroy(&attr);
176         FREE(queue);
177         return NULL;
178     }
179 
180     iRet = pthread_attr_setstacksize(&attr, 0x10000);
181     if (iRet != 0) {
182         (void)pthread_cond_destroy(&(queue->cond));
183         (void)pthread_mutex_destroy(&(queue->mutex));
184         (void)pthread_attr_destroy(&attr);
185         FREE(queue);
186         return NULL;
187     }
188 
189     iRet = pthread_create(&queue->pthreadId, &attr, WorkQueueThread, queue);
190     if (iRet != 0) {
191         (void)pthread_cond_destroy(&(queue->cond));
192         (void)pthread_mutex_destroy(&(queue->mutex));
193         (void)pthread_attr_destroy(&attr);
194         FREE(queue);
195         return NULL;
196     }
197     (void)pthread_attr_destroy(&attr);
198 
199     return queue;
200 }
201 #endif
202 
DestroyWorkQueue(WorkQueue *queue)203 uint32_t DestroyWorkQueue(WorkQueue *queue)
204 {
205     if (queue == NULL) {
206         return WORK_QUEUE_NULL_PTR;
207     }
208 
209     (void)pthread_mutex_lock(&queue->mutex);
210     queue->state = DIE;
211     int32_t iRet = pthread_cond_broadcast(&queue->cond);
212     if (iRet != 0) {
213         (void)pthread_mutex_unlock(&queue->mutex);
214         return WORK_QUEUE_THREAD_COND_ERR;
215     }
216     (void)pthread_mutex_unlock(&queue->mutex);
217 
218     iRet = pthread_join(queue->pthreadId, NULL);
219     if (iRet != 0) {
220         return WORK_QUEUE_THREAD_JOIN_ERR;
221     }
222 
223     FREE(queue);
224     return WORK_QUEUE_OK;
225 }
226 
QueueWork(WorkQueue *queue, WorkProcess process, uint8_t *data, uint32_t length)227 uint32_t QueueWork(WorkQueue *queue, WorkProcess process, uint8_t *data, uint32_t length)
228 {
229     if ((queue == NULL) || (process == NULL)) {
230         return WORK_QUEUE_NULL_PTR;
231     }
232     if (queue->state != RUN) {
233         return WORK_QUEUE_STATE_ERR;
234     }
235     if (queue->size >= queue->capacity) {
236         return WORK_QUEUE_FULL;
237     }
238 
239     Worker *worker = MALLOC(sizeof(Worker));
240     if (worker == NULL) {
241         return WORK_QUEUE_MALLOC_ERR;
242     }
243     (void)memset_s(worker, sizeof(Worker), 0, sizeof(Worker));
244 
245     InitListHead(&worker->linkNode);
246     worker->dataLen = length;
247     worker->dataBuff = data;
248     worker->process = process;
249 
250     (void)pthread_mutex_lock(&queue->mutex);
251     AddListNodeBefore(&worker->linkNode, &queue->head);
252     queue->size++;
253 
254     (void)pthread_mutex_unlock(&queue->mutex);
255     (void)pthread_cond_broadcast(&queue->cond);
256     return WORK_QUEUE_OK;
257 }
258