106f6ba60Sopenharmony_ci/* 206f6ba60Sopenharmony_ci * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved. 306f6ba60Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 406f6ba60Sopenharmony_ci * you may not use this file except in compliance with the License. 506f6ba60Sopenharmony_ci * You may obtain a copy of the License at 606f6ba60Sopenharmony_ci * 706f6ba60Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 806f6ba60Sopenharmony_ci * 906f6ba60Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software 1006f6ba60Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 1106f6ba60Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1206f6ba60Sopenharmony_ci * See the License for the specific language governing permissions and 1306f6ba60Sopenharmony_ci * limitations under the License. 1406f6ba60Sopenharmony_ci */ 1506f6ba60Sopenharmony_ci#include "stream_plugin.h" 1606f6ba60Sopenharmony_ci 1706f6ba60Sopenharmony_ci#include <sys/syscall.h> 1806f6ba60Sopenharmony_ci#include <sys/types.h> 1906f6ba60Sopenharmony_ci#include <unistd.h> 2006f6ba60Sopenharmony_ci 2106f6ba60Sopenharmony_ci#include "securec.h" 2206f6ba60Sopenharmony_ci#include "stream_plugin_result.pbencoder.h" 2306f6ba60Sopenharmony_ci 2406f6ba60Sopenharmony_ciusing namespace OHOS::Developtools::Profiler; 2506f6ba60Sopenharmony_ci 2606f6ba60Sopenharmony_ciStreamPlugin::StreamPlugin() {} 2706f6ba60Sopenharmony_ci 2806f6ba60Sopenharmony_ciStreamPlugin::~StreamPlugin() {} 2906f6ba60Sopenharmony_ci 3006f6ba60Sopenharmony_ciint StreamPlugin::Start(const uint8_t* configData, uint32_t configSize) 3106f6ba60Sopenharmony_ci{ 3206f6ba60Sopenharmony_ci // 反序列化 3306f6ba60Sopenharmony_ci CHECK_TRUE(protoConfig_.ParseFromArray(configData, configSize) > 0, -1, "%s:parseFromArray failed!", __func__); 3406f6ba60Sopenharmony_ci // 启动线程写数据 3506f6ba60Sopenharmony_ci std::unique_lock<std::mutex> locker(mutex_); 3606f6ba60Sopenharmony_ci running_ = true; 3706f6ba60Sopenharmony_ci writeThread_ = std::thread([this] { this->Loop(); }); 3806f6ba60Sopenharmony_ci 3906f6ba60Sopenharmony_ci return 0; 4006f6ba60Sopenharmony_ci} 4106f6ba60Sopenharmony_ci 4206f6ba60Sopenharmony_ciint StreamPlugin::Stop() 4306f6ba60Sopenharmony_ci{ 4406f6ba60Sopenharmony_ci std::unique_lock<std::mutex> locker(mutex_); 4506f6ba60Sopenharmony_ci running_ = false; 4606f6ba60Sopenharmony_ci locker.unlock(); 4706f6ba60Sopenharmony_ci if (writeThread_.joinable()) { 4806f6ba60Sopenharmony_ci writeThread_.join(); 4906f6ba60Sopenharmony_ci } 5006f6ba60Sopenharmony_ci PROFILER_LOG_INFO(LOG_CORE, "%s:stop success!", __func__); 5106f6ba60Sopenharmony_ci return 0; 5206f6ba60Sopenharmony_ci} 5306f6ba60Sopenharmony_ci 5406f6ba60Sopenharmony_ciint StreamPlugin::SetWriter(WriterStruct* writer) 5506f6ba60Sopenharmony_ci{ 5606f6ba60Sopenharmony_ci resultWriter_ = writer; 5706f6ba60Sopenharmony_ci return 0; 5806f6ba60Sopenharmony_ci} 5906f6ba60Sopenharmony_ci 6006f6ba60Sopenharmony_civoid StreamPlugin::Loop(void) 6106f6ba60Sopenharmony_ci{ 6206f6ba60Sopenharmony_ci PROFILER_LOG_INFO(LOG_CORE, "%s:transporter thread %d start !!!!!", __func__, gettid()); 6306f6ba60Sopenharmony_ci CHECK_NOTNULL(resultWriter_, NO_RETVAL, "%s: resultWriter_ nullptr", __func__); 6406f6ba60Sopenharmony_ci 6506f6ba60Sopenharmony_ci uint32_t index = 0; 6606f6ba60Sopenharmony_ci while (running_) { 6706f6ba60Sopenharmony_ci if (resultWriter_->isProtobufSerialize) { 6806f6ba60Sopenharmony_ci StreamData dataProto; 6906f6ba60Sopenharmony_ci dataProto.set_intdata(index); 7006f6ba60Sopenharmony_ci dataProto.set_stringdata(std::to_string(index)); 7106f6ba60Sopenharmony_ci buffer_.resize(dataProto.ByteSizeLong()); 7206f6ba60Sopenharmony_ci dataProto.SerializeToArray(buffer_.data(), buffer_.size()); 7306f6ba60Sopenharmony_ci if (index < 50) { // 50: count of loop 7406f6ba60Sopenharmony_ci resultWriter_->write(resultWriter_, buffer_.data(), buffer_.size()); 7506f6ba60Sopenharmony_ci resultWriter_->flush(resultWriter_); 7606f6ba60Sopenharmony_ci } 7706f6ba60Sopenharmony_ci } else { 7806f6ba60Sopenharmony_ci ProtoEncoder::StreamData dataProto(resultWriter_->startReport(resultWriter_)); 7906f6ba60Sopenharmony_ci dataProto.set_intdata(index); 8006f6ba60Sopenharmony_ci dataProto.set_stringdata(std::to_string(index)); 8106f6ba60Sopenharmony_ci int messageLen = dataProto.Finish(); 8206f6ba60Sopenharmony_ci resultWriter_->finishReport(resultWriter_, messageLen); 8306f6ba60Sopenharmony_ci if (index < 50) { // 50: count of loop 8406f6ba60Sopenharmony_ci resultWriter_->flush(resultWriter_); 8506f6ba60Sopenharmony_ci } 8606f6ba60Sopenharmony_ci } 8706f6ba60Sopenharmony_ci 8806f6ba60Sopenharmony_ci index++; 8906f6ba60Sopenharmony_ci } 9006f6ba60Sopenharmony_ci PROFILER_LOG_INFO(LOG_CORE, "%s:transporter thread %d exit !!!!!", __func__, gettid()); 9106f6ba60Sopenharmony_ci}