1/*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2021-2023. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include "buffer_writer.h"
17#include "command_poller.h"
18#include "common_types.pb.h"
19#include "logging.h"
20#include "plugin_service_types.pb.h"
21#include "share_memory_allocator.h"
22
23#include <algorithm>
24#include <cinttypes>
25#include <thread>
26#include <unistd.h>
27
28using namespace OHOS::Developtools::Profiler;
29
30BufferWriter::BufferWriter(std::string name,
31                           std::string version,
32                           uint32_t size,
33                           int smbFd,
34                           int eventFd,
35                           uint32_t pluginId)
36    : pluginName_(name), pluginVersion_(version)
37{
38    PROFILER_LOG_INFO(LOG_CORE, "%s:%s %d [%d] [%d]", __func__, name.c_str(), size, smbFd, eventFd);
39    shareMemoryBlock_ = ShareMemoryAllocator::GetInstance().CreateMemoryBlockRemote(name, size, smbFd);
40    if (shareMemoryBlock_ == nullptr) {
41        PROFILER_LOG_DEBUG(LOG_CORE, "%s:create shareMemoryBlock_ failed!", __func__);
42    }
43    eventNotifier_ = EventNotifier::CreateWithFd(eventFd);
44    pluginId_ = pluginId;
45    lastFlushTime_ = std::chrono::steady_clock::now();
46    if (shareMemoryBlock_ != nullptr) {
47        writeCtx_ = reinterpret_cast<RandomWriteCtx*>(shareMemoryBlock_->GetCtx());
48        profilerPluginData_ = ProtoEncoder::ProfilerPluginData(writeCtx_);
49    }
50}
51
52BufferWriter::~BufferWriter()
53{
54    PROFILER_LOG_DEBUG(LOG_CORE, "%s:destroy eventfd = %d!", __func__, eventNotifier_ ? eventNotifier_->GetFd() : -1);
55    eventNotifier_ = nullptr;
56    ShareMemoryAllocator::GetInstance().ReleaseMemoryBlockRemote(pluginName_);
57}
58
59void BufferWriter::Report() const
60{
61    PROFILER_LOG_DEBUG(LOG_CORE, "%s:stats B: %" PRIu64 ", P: %d, W:%" PRIu64 ", F: %d", __func__,
62        bytesCount_.load(), bytesPending_.load(), writeCount_.load(), flushCount_.load());
63}
64
65void BufferWriter::DoStats(long bytes)
66{
67    ++writeCount_;
68    bytesCount_ += static_cast<uint64_t>(bytes);
69    bytesPending_ += static_cast<uint32_t>(bytes);
70}
71
72long BufferWriter::Write(const void* data, size_t size)
73{
74    if (shareMemoryBlock_ == nullptr || data == nullptr || size == 0) {
75        return false;
76    }
77    ProfilerPluginData pluginData;
78    pluginData.set_name(pluginName_);
79    pluginData.set_version(pluginVersion_);
80    pluginData.set_status(0);
81    pluginData.set_data(data, size);
82
83    struct timespec ts = { 0, 0 };
84    clock_gettime(clockId_, &ts);
85
86    pluginData.set_clock_id(static_cast<ProfilerPluginData_ClockId>(clockId_));
87    pluginData.set_tv_sec(ts.tv_sec);
88    pluginData.set_tv_nsec(ts.tv_nsec);
89
90    DoStats(pluginData.ByteSizeLong());
91    return shareMemoryBlock_->PutMessage(pluginData, pluginName_);
92}
93
94RandomWriteCtx* BufferWriter::StartReport()
95{
96    if (shareMemoryBlock_ == nullptr || writeCtx_ == nullptr) {
97        return nullptr;
98    }
99
100    shareMemoryBlock_->ResetPos();
101    profilerPluginData_.Reset(writeCtx_);
102    profilerPluginData_.set_name(pluginName_);
103    profilerPluginData_.set_version(pluginVersion_);
104    profilerPluginData_.set_status(0);
105    return profilerPluginData_.startAdd_data();
106}
107
108void BufferWriter::FinishReport(int32_t size)
109{
110    if (shareMemoryBlock_ == nullptr) {
111        return;
112    }
113
114    profilerPluginData_.finishAdd_data(size);
115    struct timespec ts;
116    clock_gettime(CLOCK_REALTIME, &ts);
117    profilerPluginData_.set_clock_id(ProfilerPluginData::CLOCKID_REALTIME);
118    profilerPluginData_.set_tv_sec(ts.tv_sec);
119    profilerPluginData_.set_tv_nsec(ts.tv_nsec);
120    int32_t len = profilerPluginData_.Finish();
121    shareMemoryBlock_->UseMemory(len);
122    DoStats(len);
123}
124
125bool BufferWriter::WriteMessage(const google::protobuf::Message& pmsg, const std::string& pluginName)
126{
127    if (shareMemoryBlock_ == nullptr) {
128        return false;
129    }
130    DoStats(pmsg.ByteSizeLong());
131    return shareMemoryBlock_->PutMessage(pmsg, pluginName);
132}
133
134bool BufferWriter::Flush()
135{
136    ++flushCount_;
137    if (eventNotifier_ == nullptr) {
138        return false;
139    }
140    eventNotifier_->Post(flushCount_.load());
141    lastFlushTime_ = std::chrono::steady_clock::now();
142    bytesPending_ = 0;
143    return true;
144}
145
146void BufferWriter::UseMemory(int32_t size)
147{
148    if (shareMemoryBlock_ == nullptr) {
149        return;
150    }
151
152    shareMemoryBlock_->UseMemory(size);
153    DoStats(size);
154}
155
156void BufferWriter::ResetPos()
157{
158    if (shareMemoryBlock_ == nullptr) {
159        return;
160    }
161
162    shareMemoryBlock_->ResetPos();
163}