1/*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 *
15 * Description: ResultTransporter class implements
16 */
17#include "result_transporter.h"
18
19#include <chrono>
20#include <cinttypes>
21#include <pthread.h>
22#include <unistd.h>
23#include "logging.h"
24
25namespace {
26constexpr auto DEFAULT_FLUSH_INTERVAL = std::chrono::milliseconds(1000);
27constexpr uint32_t DEFAULT_FLUSH_THRESHOLD = 1024 * 1024;
28} // namespace
29
30FTRACE_NS_BEGIN
31ResultTransporter::ResultTransporter(const std::string& name, WriterStructPtr writer)
32    : name_(name), flushThreshold_(DEFAULT_FLUSH_THRESHOLD), flushInterval_(DEFAULT_FLUSH_INTERVAL), writer_(writer)
33{
34}
35
36ResultTransporter::~ResultTransporter(void)
37{
38    PROFILER_LOG_INFO(LOG_CORE, "ResultTransporter destroy!");
39}
40
41void ResultTransporter::SetFlushInterval(int ms)
42{
43    PROFILER_LOG_INFO(LOG_CORE, "ResultTransporter set flush interval to %d", ms);
44    flushInterval_ = std::chrono::milliseconds(ms);
45}
46
47void ResultTransporter::SetFlushThreshold(uint32_t nbytes)
48{
49    PROFILER_LOG_INFO(LOG_CORE, "ResultTransporter set flush threshold to %u", nbytes);
50    flushThreshold_ = nbytes;
51}
52
53bool ResultTransporter::IsFlushTime() const
54{
55    static auto lastTime = std::chrono::high_resolution_clock::now();
56    auto currentTime = std::chrono::high_resolution_clock::now();
57    auto elapsedTime = std::chrono::duration_cast<decltype(flushInterval_)>(currentTime - lastTime);
58    if (elapsedTime < flushInterval_) {
59        return false;
60    }
61    lastTime = currentTime;
62    return true;
63}
64
65long ResultTransporter::Write(ResultPtr&& packet)
66{
67    if (writer_ == nullptr || writer_->write == nullptr) {
68        return 0;
69    }
70
71    size_t size = packet->ByteSizeLong();
72    buffer_.resize(size);
73    CHECK_TRUE(buffer_.size() == size, -1,
74               "%s: buffer resize failed, size: %zu, buffer size: %zu, errno: %d(%s)",
75               __func__, size, buffer_.size(), errno, strerror(errno));
76
77    int ret = packet->SerializeToArray(buffer_.data(), buffer_.size());
78    CHECK_TRUE(ret > 0, ret, "%s: SerializeToArray failed with %d, size: %zu", __func__, ret, size);
79
80    writer_->write(writer_, buffer_.data(), buffer_.size());
81    return buffer_.size();
82}
83
84void ResultTransporter::Flush()
85{
86    if (writer_ == nullptr || writer_->flush == nullptr) {
87        return;
88    }
89    writer_->flush(writer_);
90
91    auto count = bytesCount_.load();
92    auto pending = bytesPending_.load();
93    bytesPending_ = 0;
94    PROFILER_LOG_DEBUG(LOG_CORE, "ResultTransporter TX stats B: %" PRIu64 ", P: %u", count, pending);
95}
96
97bool ResultTransporter::Submit(ResultPtr&& packet)
98{
99    std::unique_lock<std::mutex> lock(mutex_);
100    long nbytes = Write(std::move(packet));
101    if (nbytes < 0) {
102        PROFILER_LOG_ERROR(LOG_CORE, "send result FAILED!");
103        lock.unlock();
104        return false;
105    }
106    bytesCount_ += nbytes;
107    bytesPending_ += nbytes;
108
109    if (IsFlushTime() || bytesPending_ >= flushThreshold_) {
110        Flush();
111    }
112    lock.unlock();
113    return true;
114}
115
116void ResultTransporter::Report(size_t msgSize)
117{
118    bytesCount_ += msgSize;
119    bytesPending_ += msgSize;
120
121    if (IsFlushTime() || bytesPending_ >= flushThreshold_) {
122        Flush();
123    }
124}
125FTRACE_NS_END
126