106f6ba60Sopenharmony_ci/* 206f6ba60Sopenharmony_ci * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved. 306f6ba60Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 406f6ba60Sopenharmony_ci * you may not use this file except in compliance with the License. 506f6ba60Sopenharmony_ci * You may obtain a copy of the License at 606f6ba60Sopenharmony_ci * 706f6ba60Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 806f6ba60Sopenharmony_ci * 906f6ba60Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software 1006f6ba60Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 1106f6ba60Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1206f6ba60Sopenharmony_ci * See the License for the specific language governing permissions and 1306f6ba60Sopenharmony_ci * limitations under the License. 1406f6ba60Sopenharmony_ci * 1506f6ba60Sopenharmony_ci * Description: ResultTransporter class implements 1606f6ba60Sopenharmony_ci */ 1706f6ba60Sopenharmony_ci#include "result_transporter.h" 1806f6ba60Sopenharmony_ci 1906f6ba60Sopenharmony_ci#include <chrono> 2006f6ba60Sopenharmony_ci#include <cinttypes> 2106f6ba60Sopenharmony_ci#include <pthread.h> 2206f6ba60Sopenharmony_ci#include <unistd.h> 2306f6ba60Sopenharmony_ci#include "logging.h" 2406f6ba60Sopenharmony_ci 2506f6ba60Sopenharmony_cinamespace { 2606f6ba60Sopenharmony_ciconstexpr auto DEFAULT_FLUSH_INTERVAL = std::chrono::milliseconds(1000); 2706f6ba60Sopenharmony_ciconstexpr uint32_t DEFAULT_FLUSH_THRESHOLD = 1024 * 1024; 2806f6ba60Sopenharmony_ci} // namespace 2906f6ba60Sopenharmony_ci 3006f6ba60Sopenharmony_ciFTRACE_NS_BEGIN 3106f6ba60Sopenharmony_ciResultTransporter::ResultTransporter(const std::string& name, WriterStructPtr writer) 3206f6ba60Sopenharmony_ci : name_(name), flushThreshold_(DEFAULT_FLUSH_THRESHOLD), flushInterval_(DEFAULT_FLUSH_INTERVAL), writer_(writer) 3306f6ba60Sopenharmony_ci{ 3406f6ba60Sopenharmony_ci} 3506f6ba60Sopenharmony_ci 3606f6ba60Sopenharmony_ciResultTransporter::~ResultTransporter(void) 3706f6ba60Sopenharmony_ci{ 3806f6ba60Sopenharmony_ci PROFILER_LOG_INFO(LOG_CORE, "ResultTransporter destroy!"); 3906f6ba60Sopenharmony_ci} 4006f6ba60Sopenharmony_ci 4106f6ba60Sopenharmony_civoid ResultTransporter::SetFlushInterval(int ms) 4206f6ba60Sopenharmony_ci{ 4306f6ba60Sopenharmony_ci PROFILER_LOG_INFO(LOG_CORE, "ResultTransporter set flush interval to %d", ms); 4406f6ba60Sopenharmony_ci flushInterval_ = std::chrono::milliseconds(ms); 4506f6ba60Sopenharmony_ci} 4606f6ba60Sopenharmony_ci 4706f6ba60Sopenharmony_civoid ResultTransporter::SetFlushThreshold(uint32_t nbytes) 4806f6ba60Sopenharmony_ci{ 4906f6ba60Sopenharmony_ci PROFILER_LOG_INFO(LOG_CORE, "ResultTransporter set flush threshold to %u", nbytes); 5006f6ba60Sopenharmony_ci flushThreshold_ = nbytes; 5106f6ba60Sopenharmony_ci} 5206f6ba60Sopenharmony_ci 5306f6ba60Sopenharmony_cibool ResultTransporter::IsFlushTime() const 5406f6ba60Sopenharmony_ci{ 5506f6ba60Sopenharmony_ci static auto lastTime = std::chrono::high_resolution_clock::now(); 5606f6ba60Sopenharmony_ci auto currentTime = std::chrono::high_resolution_clock::now(); 5706f6ba60Sopenharmony_ci auto elapsedTime = std::chrono::duration_cast<decltype(flushInterval_)>(currentTime - lastTime); 5806f6ba60Sopenharmony_ci if (elapsedTime < flushInterval_) { 5906f6ba60Sopenharmony_ci return false; 6006f6ba60Sopenharmony_ci } 6106f6ba60Sopenharmony_ci lastTime = currentTime; 6206f6ba60Sopenharmony_ci return true; 6306f6ba60Sopenharmony_ci} 6406f6ba60Sopenharmony_ci 6506f6ba60Sopenharmony_cilong ResultTransporter::Write(ResultPtr&& packet) 6606f6ba60Sopenharmony_ci{ 6706f6ba60Sopenharmony_ci if (writer_ == nullptr || writer_->write == nullptr) { 6806f6ba60Sopenharmony_ci return 0; 6906f6ba60Sopenharmony_ci } 7006f6ba60Sopenharmony_ci 7106f6ba60Sopenharmony_ci size_t size = packet->ByteSizeLong(); 7206f6ba60Sopenharmony_ci buffer_.resize(size); 7306f6ba60Sopenharmony_ci CHECK_TRUE(buffer_.size() == size, -1, 7406f6ba60Sopenharmony_ci "%s: buffer resize failed, size: %zu, buffer size: %zu, errno: %d(%s)", 7506f6ba60Sopenharmony_ci __func__, size, buffer_.size(), errno, strerror(errno)); 7606f6ba60Sopenharmony_ci 7706f6ba60Sopenharmony_ci int ret = packet->SerializeToArray(buffer_.data(), buffer_.size()); 7806f6ba60Sopenharmony_ci CHECK_TRUE(ret > 0, ret, "%s: SerializeToArray failed with %d, size: %zu", __func__, ret, size); 7906f6ba60Sopenharmony_ci 8006f6ba60Sopenharmony_ci writer_->write(writer_, buffer_.data(), buffer_.size()); 8106f6ba60Sopenharmony_ci return buffer_.size(); 8206f6ba60Sopenharmony_ci} 8306f6ba60Sopenharmony_ci 8406f6ba60Sopenharmony_civoid ResultTransporter::Flush() 8506f6ba60Sopenharmony_ci{ 8606f6ba60Sopenharmony_ci if (writer_ == nullptr || writer_->flush == nullptr) { 8706f6ba60Sopenharmony_ci return; 8806f6ba60Sopenharmony_ci } 8906f6ba60Sopenharmony_ci writer_->flush(writer_); 9006f6ba60Sopenharmony_ci 9106f6ba60Sopenharmony_ci auto count = bytesCount_.load(); 9206f6ba60Sopenharmony_ci auto pending = bytesPending_.load(); 9306f6ba60Sopenharmony_ci bytesPending_ = 0; 9406f6ba60Sopenharmony_ci PROFILER_LOG_DEBUG(LOG_CORE, "ResultTransporter TX stats B: %" PRIu64 ", P: %u", count, pending); 9506f6ba60Sopenharmony_ci} 9606f6ba60Sopenharmony_ci 9706f6ba60Sopenharmony_cibool ResultTransporter::Submit(ResultPtr&& packet) 9806f6ba60Sopenharmony_ci{ 9906f6ba60Sopenharmony_ci std::unique_lock<std::mutex> lock(mutex_); 10006f6ba60Sopenharmony_ci long nbytes = Write(std::move(packet)); 10106f6ba60Sopenharmony_ci if (nbytes < 0) { 10206f6ba60Sopenharmony_ci PROFILER_LOG_ERROR(LOG_CORE, "send result FAILED!"); 10306f6ba60Sopenharmony_ci lock.unlock(); 10406f6ba60Sopenharmony_ci return false; 10506f6ba60Sopenharmony_ci } 10606f6ba60Sopenharmony_ci bytesCount_ += nbytes; 10706f6ba60Sopenharmony_ci bytesPending_ += nbytes; 10806f6ba60Sopenharmony_ci 10906f6ba60Sopenharmony_ci if (IsFlushTime() || bytesPending_ >= flushThreshold_) { 11006f6ba60Sopenharmony_ci Flush(); 11106f6ba60Sopenharmony_ci } 11206f6ba60Sopenharmony_ci lock.unlock(); 11306f6ba60Sopenharmony_ci return true; 11406f6ba60Sopenharmony_ci} 11506f6ba60Sopenharmony_ci 11606f6ba60Sopenharmony_civoid ResultTransporter::Report(size_t msgSize) 11706f6ba60Sopenharmony_ci{ 11806f6ba60Sopenharmony_ci bytesCount_ += msgSize; 11906f6ba60Sopenharmony_ci bytesPending_ += msgSize; 12006f6ba60Sopenharmony_ci 12106f6ba60Sopenharmony_ci if (IsFlushTime() || bytesPending_ >= flushThreshold_) { 12206f6ba60Sopenharmony_ci Flush(); 12306f6ba60Sopenharmony_ci } 12406f6ba60Sopenharmony_ci} 12506f6ba60Sopenharmony_ciFTRACE_NS_END 126