19c6d7c21Sopenharmony_ci/*
29c6d7c21Sopenharmony_ci * Copyright (c) 2020 Huawei Device Co., Ltd.
39c6d7c21Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
49c6d7c21Sopenharmony_ci * you may not use this file except in compliance with the License.
59c6d7c21Sopenharmony_ci * You may obtain a copy of the License at
69c6d7c21Sopenharmony_ci *
79c6d7c21Sopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
89c6d7c21Sopenharmony_ci *
99c6d7c21Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software
109c6d7c21Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
119c6d7c21Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
129c6d7c21Sopenharmony_ci * See the License for the specific language governing permissions and
139c6d7c21Sopenharmony_ci * limitations under the License.
149c6d7c21Sopenharmony_ci */
159c6d7c21Sopenharmony_ci#include "queue_adapter.h"
169c6d7c21Sopenharmony_ci#include <ohos_errno.h>
179c6d7c21Sopenharmony_ci#include <pthread.h>
189c6d7c21Sopenharmony_ci#include "memory_adapter.h"
199c6d7c21Sopenharmony_ci#include "lock_free_queue.h"
209c6d7c21Sopenharmony_ci
219c6d7c21Sopenharmony_citypedef struct LockFreeBlockQueue LockFreeBlockQueue;
229c6d7c21Sopenharmony_cistruct LockFreeBlockQueue {
239c6d7c21Sopenharmony_ci    pthread_mutex_t wMutex;
249c6d7c21Sopenharmony_ci    pthread_mutex_t rMutex;
259c6d7c21Sopenharmony_ci    pthread_cond_t cond;
269c6d7c21Sopenharmony_ci    LockFreeQueue *queue;
279c6d7c21Sopenharmony_ci};
289c6d7c21Sopenharmony_ci
299c6d7c21Sopenharmony_ciMQueueId QUEUE_Create(const char *name, int size, int count)
309c6d7c21Sopenharmony_ci{
319c6d7c21Sopenharmony_ci    LockFreeBlockQueue *queue = (LockFreeBlockQueue *)SAMGR_Malloc(sizeof(LockFreeBlockQueue));
329c6d7c21Sopenharmony_ci    if (queue == NULL) {
339c6d7c21Sopenharmony_ci        return NULL;
349c6d7c21Sopenharmony_ci    }
359c6d7c21Sopenharmony_ci    queue->queue = LFQUE_Create(size, count);
369c6d7c21Sopenharmony_ci    if (queue->queue == NULL) {
379c6d7c21Sopenharmony_ci        SAMGR_Free(queue);
389c6d7c21Sopenharmony_ci        return NULL;
399c6d7c21Sopenharmony_ci    }
409c6d7c21Sopenharmony_ci    pthread_mutex_init(&queue->wMutex, NULL);
419c6d7c21Sopenharmony_ci    pthread_mutex_init(&queue->rMutex, NULL);
429c6d7c21Sopenharmony_ci    pthread_cond_init(&queue->cond, NULL);
439c6d7c21Sopenharmony_ci    return (MQueueId)queue;
449c6d7c21Sopenharmony_ci}
459c6d7c21Sopenharmony_ci
469c6d7c21Sopenharmony_ciint QUEUE_Put(MQueueId queueId, const void *element, uint8 pri, int timeout)
479c6d7c21Sopenharmony_ci{
489c6d7c21Sopenharmony_ci    if (queueId == NULL || element == NULL || timeout > 0) {
499c6d7c21Sopenharmony_ci        return EC_INVALID;
509c6d7c21Sopenharmony_ci    }
519c6d7c21Sopenharmony_ci    LockFreeBlockQueue *queue = (LockFreeBlockQueue *)queueId;
529c6d7c21Sopenharmony_ci    pthread_mutex_lock(&queue->wMutex);
539c6d7c21Sopenharmony_ci    int ret = LFQUE_Push(queue->queue, element, pri);
549c6d7c21Sopenharmony_ci    pthread_mutex_unlock(&queue->wMutex);
559c6d7c21Sopenharmony_ci    pthread_mutex_lock(&queue->rMutex);
569c6d7c21Sopenharmony_ci    pthread_cond_broadcast(&queue->cond);
579c6d7c21Sopenharmony_ci    pthread_mutex_unlock(&queue->rMutex);
589c6d7c21Sopenharmony_ci    return ret;
599c6d7c21Sopenharmony_ci}
609c6d7c21Sopenharmony_ci
619c6d7c21Sopenharmony_ciint QUEUE_Pop(MQueueId queueId, void *element, uint8 *pri, int timeout)
629c6d7c21Sopenharmony_ci{
639c6d7c21Sopenharmony_ci    if (queueId == NULL || element == NULL || timeout > 0) {
649c6d7c21Sopenharmony_ci        return EC_INVALID;
659c6d7c21Sopenharmony_ci    }
669c6d7c21Sopenharmony_ci
679c6d7c21Sopenharmony_ci    LockFreeBlockQueue *queue = (LockFreeBlockQueue *)queueId;
689c6d7c21Sopenharmony_ci    pthread_mutex_lock(&queue->rMutex);
699c6d7c21Sopenharmony_ci    while (LFQUE_Pop(queue->queue, element, pri) != EC_SUCCESS) {
709c6d7c21Sopenharmony_ci        pthread_cond_wait(&queue->cond, &queue->rMutex);
719c6d7c21Sopenharmony_ci    }
729c6d7c21Sopenharmony_ci    pthread_mutex_unlock(&queue->rMutex);
739c6d7c21Sopenharmony_ci    return EC_SUCCESS;
749c6d7c21Sopenharmony_ci}
759c6d7c21Sopenharmony_ci
769c6d7c21Sopenharmony_ciint QUEUE_Destroy(MQueueId queueId)
779c6d7c21Sopenharmony_ci{
789c6d7c21Sopenharmony_ci    if (queueId == NULL) {
799c6d7c21Sopenharmony_ci        return EC_INVALID;
809c6d7c21Sopenharmony_ci    }
819c6d7c21Sopenharmony_ci
829c6d7c21Sopenharmony_ci    LockFreeBlockQueue *queue = (LockFreeBlockQueue *)queueId;
839c6d7c21Sopenharmony_ci    pthread_mutex_destroy(&queue->wMutex);
849c6d7c21Sopenharmony_ci    pthread_mutex_destroy(&queue->rMutex);
859c6d7c21Sopenharmony_ci    pthread_cond_destroy(&queue->cond);
869c6d7c21Sopenharmony_ci    SAMGR_Free(queue->queue);
879c6d7c21Sopenharmony_ci    SAMGR_Free(queue);
889c6d7c21Sopenharmony_ci    return EC_SUCCESS;
899c6d7c21Sopenharmony_ci}
90