1/* 2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 * 15 * Description: ResultTransporter class implements 16 */ 17#include "result_transporter.h" 18 19#include <chrono> 20#include <cinttypes> 21#include <pthread.h> 22#include <unistd.h> 23#include "logging.h" 24 25namespace { 26constexpr auto DEFAULT_FLUSH_INTERVAL = std::chrono::milliseconds(1000); 27constexpr uint32_t DEFAULT_FLUSH_THRESHOLD = 1024 * 1024; 28} // namespace 29 30FTRACE_NS_BEGIN 31ResultTransporter::ResultTransporter(const std::string& name, WriterStructPtr writer) 32 : name_(name), flushThreshold_(DEFAULT_FLUSH_THRESHOLD), flushInterval_(DEFAULT_FLUSH_INTERVAL), writer_(writer) 33{ 34} 35 36ResultTransporter::~ResultTransporter(void) 37{ 38 PROFILER_LOG_INFO(LOG_CORE, "ResultTransporter destroy!"); 39} 40 41void ResultTransporter::SetFlushInterval(int ms) 42{ 43 PROFILER_LOG_INFO(LOG_CORE, "ResultTransporter set flush interval to %d", ms); 44 flushInterval_ = std::chrono::milliseconds(ms); 45} 46 47void ResultTransporter::SetFlushThreshold(uint32_t nbytes) 48{ 49 PROFILER_LOG_INFO(LOG_CORE, "ResultTransporter set flush threshold to %u", nbytes); 50 flushThreshold_ = nbytes; 51} 52 53bool ResultTransporter::IsFlushTime() const 54{ 55 static auto lastTime = std::chrono::high_resolution_clock::now(); 56 auto currentTime = std::chrono::high_resolution_clock::now(); 57 auto elapsedTime = std::chrono::duration_cast<decltype(flushInterval_)>(currentTime - lastTime); 58 if (elapsedTime < flushInterval_) { 59 return false; 60 } 61 lastTime = currentTime; 62 return true; 63} 64 65long ResultTransporter::Write(ResultPtr&& packet) 66{ 67 if (writer_ == nullptr || writer_->write == nullptr) { 68 return 0; 69 } 70 71 size_t size = packet->ByteSizeLong(); 72 buffer_.resize(size); 73 CHECK_TRUE(buffer_.size() == size, -1, 74 "%s: buffer resize failed, size: %zu, buffer size: %zu, errno: %d(%s)", 75 __func__, size, buffer_.size(), errno, strerror(errno)); 76 77 int ret = packet->SerializeToArray(buffer_.data(), buffer_.size()); 78 CHECK_TRUE(ret > 0, ret, "%s: SerializeToArray failed with %d, size: %zu", __func__, ret, size); 79 80 writer_->write(writer_, buffer_.data(), buffer_.size()); 81 return buffer_.size(); 82} 83 84void ResultTransporter::Flush() 85{ 86 if (writer_ == nullptr || writer_->flush == nullptr) { 87 return; 88 } 89 writer_->flush(writer_); 90 91 auto count = bytesCount_.load(); 92 auto pending = bytesPending_.load(); 93 bytesPending_ = 0; 94 PROFILER_LOG_DEBUG(LOG_CORE, "ResultTransporter TX stats B: %" PRIu64 ", P: %u", count, pending); 95} 96 97bool ResultTransporter::Submit(ResultPtr&& packet) 98{ 99 std::unique_lock<std::mutex> lock(mutex_); 100 long nbytes = Write(std::move(packet)); 101 if (nbytes < 0) { 102 PROFILER_LOG_ERROR(LOG_CORE, "send result FAILED!"); 103 lock.unlock(); 104 return false; 105 } 106 bytesCount_ += nbytes; 107 bytesPending_ += nbytes; 108 109 if (IsFlushTime() || bytesPending_ >= flushThreshold_) { 110 Flush(); 111 } 112 lock.unlock(); 113 return true; 114} 115 116void ResultTransporter::Report(size_t msgSize) 117{ 118 bytesCount_ += msgSize; 119 bytesPending_ += msgSize; 120 121 if (IsFlushTime() || bytesPending_ >= flushThreshold_) { 122 Flush(); 123 } 124} 125FTRACE_NS_END 126