1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "utils_work_queue.h"
17
18 #include <pthread.h>
19 #include <stddef.h>
20 #include <sys/prctl.h>
21
22 #include "securec.h"
23
24 #include "utils_dslm_list.h"
25 #include "utils_log.h"
26 #include "utils_mem.h"
27
28 #define RUN 0
29 #define DIE 1
30
31 typedef struct WorkQueue {
32 ListHead head;
33 pthread_mutex_t mutex;
34 pthread_cond_t cond;
35 volatile int32_t state;
36 uint32_t capacity;
37 uint32_t size;
38 pthread_t pthreadId;
39 const char *name;
40 } WorkQueue;
41
42 typedef struct {
43 ListNode linkNode;
44 WorkProcess process; // callback func
45 uint32_t dataLen;
46 uint8_t *dataBuff; // user data ptr
47 } Worker;
48
WorkQueueThread(void *data)49 static void *WorkQueueThread(void *data)
50 {
51 WorkQueue *queue = (WorkQueue *)data;
52 Worker *worker = NULL;
53
54 #ifndef L0_MINI
55 prctl(PR_SET_NAME, queue->name, 0, 0, 0);
56 #endif
57
58 int ret = pthread_mutex_lock(&queue->mutex);
59 if (ret != 0) {
60 SECURITY_LOG_ERROR("pthread_mutex_lock error");
61 return NULL;
62 }
63 while (queue->state == RUN) {
64 while ((IsEmptyList(&queue->head)) && (queue->state == RUN)) {
65 pthread_cond_wait(&queue->cond, &queue->mutex);
66 }
67 // need to check again
68 if (queue->state != RUN) {
69 break;
70 }
71
72 worker = LIST_ENTRY(queue->head.next, Worker, linkNode);
73 RemoveListNode(&worker->linkNode);
74 queue->size--;
75
76 ret = pthread_mutex_unlock(&queue->mutex);
77 if (ret != 0) {
78 SECURITY_LOG_ERROR("pthread_mutex_unlock error");
79 }
80 worker->process(worker->dataBuff, worker->dataLen);
81 FREE(worker);
82 ret = pthread_mutex_lock(&queue->mutex);
83 if (ret != 0) {
84 SECURITY_LOG_ERROR("pthread_mutex_lock error");
85 return NULL;
86 }
87 }
88
89 // now the queue is stopped, just remove the nodes.
90 while (!IsEmptyList(&queue->head)) {
91 worker = LIST_ENTRY(queue->head.next, Worker, linkNode);
92 RemoveListNode(&worker->linkNode);
93 queue->size--;
94 FREE(worker);
95 }
96
97 ret = pthread_mutex_unlock(&queue->mutex);
98 if (ret != 0) {
99 SECURITY_LOG_ERROR("pthread_mutex_unlock error");
100 }
101 return NULL;
102 }
103
104 #ifndef L0_MINI
CreateWorkQueue(uint32_t capacity, const char *name)105 WorkQueue *CreateWorkQueue(uint32_t capacity, const char *name)
106 {
107 WorkQueue *queue = MALLOC(sizeof(WorkQueue));
108 if (queue == NULL) {
109 return NULL;
110 }
111 (void)memset_s(queue, sizeof(WorkQueue), 0, sizeof(WorkQueue));
112
113 InitListHead(&(queue->head));
114 queue->state = RUN;
115 queue->capacity = capacity;
116 queue->size = 0;
117 queue->name = name;
118
119 int32_t iRet = pthread_mutex_init(&(queue->mutex), NULL);
120 if (iRet != 0) {
121 FREE(queue);
122 return NULL;
123 }
124
125 iRet = pthread_cond_init(&queue->cond, NULL);
126 if (iRet != 0) {
127 (void)pthread_mutex_destroy(&(queue->mutex));
128 FREE(queue);
129 return NULL;
130 }
131
132 iRet = pthread_create(&queue->pthreadId, NULL, WorkQueueThread, queue);
133 if (iRet != 0) {
134 (void)pthread_cond_destroy(&(queue->cond));
135 (void)pthread_mutex_destroy(&(queue->mutex));
136 FREE(queue);
137 return NULL;
138 }
139
140 return queue;
141 }
142 #else
CreateWorkQueue(uint32_t capacity, const char *name)143 WorkQueue *CreateWorkQueue(uint32_t capacity, const char *name)
144 {
145 pthread_attr_t attr;
146 WorkQueue *queue = MALLOC(sizeof(WorkQueue));
147 if (queue == NULL) {
148 return NULL;
149 }
150 (void)memset_s(queue, sizeof(WorkQueue), 0, sizeof(WorkQueue));
151
152 InitListHead(&(queue->head));
153 queue->state = RUN;
154 queue->capacity = capacity;
155 queue->size = 0;
156 queue->name = name;
157
158 int32_t iRet = pthread_mutex_init(&(queue->mutex), NULL);
159 if (iRet != 0) {
160 FREE(queue);
161 return NULL;
162 }
163
164 iRet = pthread_cond_init(&queue->cond, NULL);
165 if (iRet != 0) {
166 (void)pthread_mutex_destroy(&(queue->mutex));
167 FREE(queue);
168 return NULL;
169 }
170
171 iRet = pthread_attr_init(&attr);
172 if (iRet != 0) {
173 (void)pthread_cond_destroy(&(queue->cond));
174 (void)pthread_mutex_destroy(&(queue->mutex));
175 (void)pthread_attr_destroy(&attr);
176 FREE(queue);
177 return NULL;
178 }
179
180 iRet = pthread_attr_setstacksize(&attr, 0x10000);
181 if (iRet != 0) {
182 (void)pthread_cond_destroy(&(queue->cond));
183 (void)pthread_mutex_destroy(&(queue->mutex));
184 (void)pthread_attr_destroy(&attr);
185 FREE(queue);
186 return NULL;
187 }
188
189 iRet = pthread_create(&queue->pthreadId, &attr, WorkQueueThread, queue);
190 if (iRet != 0) {
191 (void)pthread_cond_destroy(&(queue->cond));
192 (void)pthread_mutex_destroy(&(queue->mutex));
193 (void)pthread_attr_destroy(&attr);
194 FREE(queue);
195 return NULL;
196 }
197 (void)pthread_attr_destroy(&attr);
198
199 return queue;
200 }
201 #endif
202
DestroyWorkQueue(WorkQueue *queue)203 uint32_t DestroyWorkQueue(WorkQueue *queue)
204 {
205 if (queue == NULL) {
206 return WORK_QUEUE_NULL_PTR;
207 }
208
209 (void)pthread_mutex_lock(&queue->mutex);
210 queue->state = DIE;
211 int32_t iRet = pthread_cond_broadcast(&queue->cond);
212 if (iRet != 0) {
213 (void)pthread_mutex_unlock(&queue->mutex);
214 return WORK_QUEUE_THREAD_COND_ERR;
215 }
216 (void)pthread_mutex_unlock(&queue->mutex);
217
218 iRet = pthread_join(queue->pthreadId, NULL);
219 if (iRet != 0) {
220 return WORK_QUEUE_THREAD_JOIN_ERR;
221 }
222
223 FREE(queue);
224 return WORK_QUEUE_OK;
225 }
226
QueueWork(WorkQueue *queue, WorkProcess process, uint8_t *data, uint32_t length)227 uint32_t QueueWork(WorkQueue *queue, WorkProcess process, uint8_t *data, uint32_t length)
228 {
229 if ((queue == NULL) || (process == NULL)) {
230 return WORK_QUEUE_NULL_PTR;
231 }
232 if (queue->state != RUN) {
233 return WORK_QUEUE_STATE_ERR;
234 }
235 if (queue->size >= queue->capacity) {
236 return WORK_QUEUE_FULL;
237 }
238
239 Worker *worker = MALLOC(sizeof(Worker));
240 if (worker == NULL) {
241 return WORK_QUEUE_MALLOC_ERR;
242 }
243 (void)memset_s(worker, sizeof(Worker), 0, sizeof(Worker));
244
245 InitListHead(&worker->linkNode);
246 worker->dataLen = length;
247 worker->dataBuff = data;
248 worker->process = process;
249
250 (void)pthread_mutex_lock(&queue->mutex);
251 AddListNodeBefore(&worker->linkNode, &queue->head);
252 queue->size++;
253
254 (void)pthread_mutex_unlock(&queue->mutex);
255 (void)pthread_cond_broadcast(&queue->cond);
256 return WORK_QUEUE_OK;
257 }
258