1 /*
2  * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "profiler_data_repeater.h"
16 
ProfilerDataRepeater(size_t maxSize)17 ProfilerDataRepeater::ProfilerDataRepeater(size_t maxSize)
18 {
19     maxSize_ = maxSize;
20     closed_ = false;
21 }
22 
~ProfilerDataRepeater()23 ProfilerDataRepeater::~ProfilerDataRepeater()
24 {
25     Close();
26 }
27 
Size()28 size_t ProfilerDataRepeater::Size()
29 {
30     std::unique_lock<std::mutex> lock(mutex_);
31     return dataQueue_.size();
32 }
33 
Reset()34 void ProfilerDataRepeater::Reset()
35 {
36     std::unique_lock<std::mutex> lock(mutex_);
37     closed_ = false;
38 }
39 
Close()40 void ProfilerDataRepeater::Close()
41 {
42     {
43         std::unique_lock<std::mutex> lock(mutex_);
44         dataQueue_.clear();
45         closed_ = true;
46     }
47     slotCondVar_.notify_all();
48     itemCondVar_.notify_all();
49 }
50 
PutPluginData(const ProfilerPluginDataPtr& pluginData)51 bool ProfilerDataRepeater::PutPluginData(const ProfilerPluginDataPtr& pluginData)
52 {
53     std::unique_lock<std::mutex> lock(mutex_);
54 
55     if ((pluginData == nullptr) && (dataQueue_.size() > 0)) {
56         PROFILER_LOG_INFO(LOG_CORE, "no need put nullptr if queue has data, dataQueue_.size() = %zu",
57                           dataQueue_.size());
58         return true;
59     }
60 
61     while (dataQueue_.size() >= maxSize_ && !closed_) {
62         slotCondVar_.wait(lock);
63     }
64     if (closed_) {
65         return false;
66     }
67 
68     dataQueue_.push_back(pluginData);
69     lock.unlock();
70 
71     itemCondVar_.notify_one();
72     return true;
73 }
74 
TakePluginData()75 ProfilerPluginDataPtr ProfilerDataRepeater::TakePluginData()
76 {
77     std::unique_lock<std::mutex> lock(mutex_);
78     while (dataQueue_.empty() && !closed_) {
79         itemCondVar_.wait(lock);
80     }
81     if (closed_) {
82         return nullptr;
83     }
84 
85     auto result = dataQueue_.front();
86     dataQueue_.pop_front();
87     lock.unlock();
88 
89     slotCondVar_.notify_one();
90     return result;
91 }
92 
TakePluginData(std::vector<ProfilerPluginDataPtr>& pluginDataVec)93 int ProfilerDataRepeater::TakePluginData(std::vector<ProfilerPluginDataPtr>& pluginDataVec)
94 {
95     std::unique_lock<std::mutex> lock(mutex_);
96     while (dataQueue_.empty() && !closed_) {
97         itemCondVar_.wait(lock);
98     }
99     if (closed_) {
100         return -1;
101     }
102 
103     int count = 0;
104     while (dataQueue_.size() > 0) {
105         auto result = dataQueue_.front();
106         pluginDataVec.push_back(result);
107         dataQueue_.pop_front();
108         count++;
109     }
110     lock.unlock();
111 
112     slotCondVar_.notify_one();
113     return count;
114 }
115 
ClearQueue()116 void ProfilerDataRepeater::ClearQueue()
117 {
118     std::unique_lock<std::mutex> lock(mutex_);
119     dataQueue_.clear();
120 }
121