1/*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15#include "msg_handle_thread.h"
16#include <sys/types.h>
17#include <unistd.h>
18#include "intell_voice_log.h"
19
20#define LOG_TAG "MsgHandleThread"
21
22using namespace std;
23
24namespace OHOS {
25namespace IntellVoiceUtils {
26static const uint32_t MSQ_QUEUE_MAX_LEN = 100;
27static const int32_t MSG_MAX_SYNC_TIMEOUT = 5;
28
29MsgHandleThread::MsgHandleThread() : msgQue_(MSQ_QUEUE_MAX_LEN), callbackThread_(nullptr) {}
30
31MsgHandleThread::MsgHandleThread(std::shared_ptr<MessageQueue> callbackMsgQue)
32    : callbackMsgQue_(callbackMsgQue), msgQue_(MSQ_QUEUE_MAX_LEN), callbackThread_(nullptr)
33{}
34
35MsgHandleThread::MsgHandleThread(MsgHandleThread *callbackThread)
36    : msgQue_(MSQ_QUEUE_MAX_LEN), callbackThread_(callbackThread)
37{}
38
39MsgHandleThread::~MsgHandleThread() {}
40
41void MsgHandleThread::SetCallbackThread(MsgHandleThread *tmpCallbackThread)
42{
43    callbackThread_ = tmpCallbackThread;
44}
45
46// the default realization is for debug, subclass should override this func
47bool MsgHandleThread::HandleMsg(Message &msg)
48{
49    INTELL_VOICE_LOG_INFO("run thread %{public}u process msg %{public}u", Gettid(), msg.what_);
50
51    SendbackMsg(msg);
52
53    return true;
54}
55
56bool MsgHandleThread::SendMsg(Message msg)
57{
58    try {
59        msgQue_.SendMsg(std::make_shared<Message>(msg));
60    } catch (const std::length_error& err) {
61        INTELL_VOICE_LOG_ERROR("length error");
62        return false;
63    }
64
65    return true;
66}
67
68bool MsgHandleThread::SendMsg(std::shared_ptr<Message> msg)
69{
70    if (msg == nullptr) {
71        return false;
72    }
73
74    msgQue_.SendMsg(msg);
75    return true;
76}
77
78bool MsgHandleThread::SendSynMsg(shared_ptr<Message> msg)
79{
80    if (msg == nullptr) {
81        return false;
82    }
83
84    msg->result_ = std::make_shared<SynInfo>();
85    if (msg->result_ == nullptr) {
86        INTELL_VOICE_LOG_ERROR("create sync info failed");
87        return false;
88    }
89
90    unique_lock<mutex> lock(msg->result_->mutex_);
91    msgQue_.SendMsg(msg);
92    if (msg->result_->cv_.wait_for(lock, chrono::seconds(MSG_MAX_SYNC_TIMEOUT)) == std::cv_status::no_timeout) {
93        return true;
94    } else {
95        INTELL_VOICE_LOG_WARN("send syn msg timeout");
96        return false;
97    }
98}
99
100void MsgHandleThread::SendbackMsg(Message msg)
101{
102    if (callbackThread_ != nullptr) {
103        callbackThread_->SendMsg(msg);
104    }
105
106    if (callbackMsgQue_ != nullptr) {
107        callbackMsgQue_->SendMsg(make_shared<Message>(msg));
108    }
109}
110
111void MsgHandleThread::Run()
112{
113    bool isQuit = false;
114
115    while (!isQuit) {
116        shared_ptr<Message> msg = msgQue_.ReceiveMsg();
117
118        isQuit = HandleMsg(*msg);
119
120        if (msg->result_ != nullptr) {
121            unique_lock<mutex> lock(msg->result_->mutex_);
122            msg->result_->cv_.notify_all();
123        }
124    }
125}
126}
127}
128