19c6d7c21Sopenharmony_ci/* 29c6d7c21Sopenharmony_ci * Copyright (c) 2020 Huawei Device Co., Ltd. 39c6d7c21Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 49c6d7c21Sopenharmony_ci * you may not use this file except in compliance with the License. 59c6d7c21Sopenharmony_ci * You may obtain a copy of the License at 69c6d7c21Sopenharmony_ci * 79c6d7c21Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 89c6d7c21Sopenharmony_ci * 99c6d7c21Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software 109c6d7c21Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 119c6d7c21Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 129c6d7c21Sopenharmony_ci * See the License for the specific language governing permissions and 139c6d7c21Sopenharmony_ci * limitations under the License. 149c6d7c21Sopenharmony_ci */ 159c6d7c21Sopenharmony_ci#include "queue_adapter.h" 169c6d7c21Sopenharmony_ci#include <ohos_errno.h> 179c6d7c21Sopenharmony_ci#include <pthread.h> 189c6d7c21Sopenharmony_ci#include "memory_adapter.h" 199c6d7c21Sopenharmony_ci#include "lock_free_queue.h" 209c6d7c21Sopenharmony_ci 219c6d7c21Sopenharmony_citypedef struct LockFreeBlockQueue LockFreeBlockQueue; 229c6d7c21Sopenharmony_cistruct LockFreeBlockQueue { 239c6d7c21Sopenharmony_ci pthread_mutex_t wMutex; 249c6d7c21Sopenharmony_ci pthread_mutex_t rMutex; 259c6d7c21Sopenharmony_ci pthread_cond_t cond; 269c6d7c21Sopenharmony_ci LockFreeQueue *queue; 279c6d7c21Sopenharmony_ci}; 289c6d7c21Sopenharmony_ci 299c6d7c21Sopenharmony_ciMQueueId QUEUE_Create(const char *name, int size, int count) 309c6d7c21Sopenharmony_ci{ 319c6d7c21Sopenharmony_ci LockFreeBlockQueue *queue = (LockFreeBlockQueue *)SAMGR_Malloc(sizeof(LockFreeBlockQueue)); 329c6d7c21Sopenharmony_ci if (queue == NULL) { 339c6d7c21Sopenharmony_ci return NULL; 349c6d7c21Sopenharmony_ci } 359c6d7c21Sopenharmony_ci queue->queue = LFQUE_Create(size, count); 369c6d7c21Sopenharmony_ci if (queue->queue == NULL) { 379c6d7c21Sopenharmony_ci SAMGR_Free(queue); 389c6d7c21Sopenharmony_ci return NULL; 399c6d7c21Sopenharmony_ci } 409c6d7c21Sopenharmony_ci pthread_mutex_init(&queue->wMutex, NULL); 419c6d7c21Sopenharmony_ci pthread_mutex_init(&queue->rMutex, NULL); 429c6d7c21Sopenharmony_ci pthread_cond_init(&queue->cond, NULL); 439c6d7c21Sopenharmony_ci return (MQueueId)queue; 449c6d7c21Sopenharmony_ci} 459c6d7c21Sopenharmony_ci 469c6d7c21Sopenharmony_ciint QUEUE_Put(MQueueId queueId, const void *element, uint8 pri, int timeout) 479c6d7c21Sopenharmony_ci{ 489c6d7c21Sopenharmony_ci if (queueId == NULL || element == NULL || timeout > 0) { 499c6d7c21Sopenharmony_ci return EC_INVALID; 509c6d7c21Sopenharmony_ci } 519c6d7c21Sopenharmony_ci LockFreeBlockQueue *queue = (LockFreeBlockQueue *)queueId; 529c6d7c21Sopenharmony_ci pthread_mutex_lock(&queue->wMutex); 539c6d7c21Sopenharmony_ci int ret = LFQUE_Push(queue->queue, element, pri); 549c6d7c21Sopenharmony_ci pthread_mutex_unlock(&queue->wMutex); 559c6d7c21Sopenharmony_ci pthread_mutex_lock(&queue->rMutex); 569c6d7c21Sopenharmony_ci pthread_cond_broadcast(&queue->cond); 579c6d7c21Sopenharmony_ci pthread_mutex_unlock(&queue->rMutex); 589c6d7c21Sopenharmony_ci return ret; 599c6d7c21Sopenharmony_ci} 609c6d7c21Sopenharmony_ci 619c6d7c21Sopenharmony_ciint QUEUE_Pop(MQueueId queueId, void *element, uint8 *pri, int timeout) 629c6d7c21Sopenharmony_ci{ 639c6d7c21Sopenharmony_ci if (queueId == NULL || element == NULL || timeout > 0) { 649c6d7c21Sopenharmony_ci return EC_INVALID; 659c6d7c21Sopenharmony_ci } 669c6d7c21Sopenharmony_ci 679c6d7c21Sopenharmony_ci LockFreeBlockQueue *queue = (LockFreeBlockQueue *)queueId; 689c6d7c21Sopenharmony_ci pthread_mutex_lock(&queue->rMutex); 699c6d7c21Sopenharmony_ci while (LFQUE_Pop(queue->queue, element, pri) != EC_SUCCESS) { 709c6d7c21Sopenharmony_ci pthread_cond_wait(&queue->cond, &queue->rMutex); 719c6d7c21Sopenharmony_ci } 729c6d7c21Sopenharmony_ci pthread_mutex_unlock(&queue->rMutex); 739c6d7c21Sopenharmony_ci return EC_SUCCESS; 749c6d7c21Sopenharmony_ci} 759c6d7c21Sopenharmony_ci 769c6d7c21Sopenharmony_ciint QUEUE_Destroy(MQueueId queueId) 779c6d7c21Sopenharmony_ci{ 789c6d7c21Sopenharmony_ci if (queueId == NULL) { 799c6d7c21Sopenharmony_ci return EC_INVALID; 809c6d7c21Sopenharmony_ci } 819c6d7c21Sopenharmony_ci 829c6d7c21Sopenharmony_ci LockFreeBlockQueue *queue = (LockFreeBlockQueue *)queueId; 839c6d7c21Sopenharmony_ci pthread_mutex_destroy(&queue->wMutex); 849c6d7c21Sopenharmony_ci pthread_mutex_destroy(&queue->rMutex); 859c6d7c21Sopenharmony_ci pthread_cond_destroy(&queue->cond); 869c6d7c21Sopenharmony_ci SAMGR_Free(queue->queue); 879c6d7c21Sopenharmony_ci SAMGR_Free(queue); 889c6d7c21Sopenharmony_ci return EC_SUCCESS; 899c6d7c21Sopenharmony_ci} 90