1/* 2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include "plugin_service.h" 17 18#include <cinttypes> 19#include <fcntl.h> 20#include <sys/wait.h> 21#include <unistd.h> 22 23#include "plugin_command_builder.h" 24#include "plugin_service_impl.h" 25#include "plugin_session_manager.h" 26#include "profiler_capability_manager.h" 27#include "profiler_data_repeater.h" 28#include "securec.h" 29#include "share_memory_allocator.h" 30#include "socket_context.h" 31 32namespace { 33const int PAGE_BYTES = 4096; 34const int DEFAULT_EVENT_POLLING_INTERVAL = 5000; 35constexpr uint32_t FLUSH_BASELINE = (1U << 21); // need to flush data size with offline mode 36constexpr uint32_t STOP_BASELINE = (1U << 22); // need to stop take data size with offline mode 37} // namespace 38 39PluginService::PluginService() 40{ 41 pluginIdCounter_ = 0; 42 if (getuid() == 0) { 43 StartService(DEFAULT_UNIX_SOCKET_FULL_PATH); 44 } else { 45 StartService(DEFAULT_UNIX_SOCKET_PATH); 46 } 47 48 pluginCommandBuilder_ = std::make_shared<PluginCommandBuilder>(); 49 50 eventPoller_ = std::make_unique<EpollEventPoller>(DEFAULT_EVENT_POLLING_INTERVAL); 51 CHECK_NOTNULL(eventPoller_, NO_RETVAL, "create event poller FAILED!"); 52 53 eventPoller_->Init(); 54} 55 56PluginService::~PluginService() 57{ 58 if (eventPoller_) { 59 eventPoller_->Stop(); 60 eventPoller_->Finalize(); 61 } 62} 63 64bool PluginService::StartEpollThread() 65{ 66 if (eventPoller_) { 67 return eventPoller_->Start(); 68 } 69 return false; 70} 71 72bool PluginService::StopEpollThread() 73{ 74 if (eventPoller_) { 75 return eventPoller_->Stop(); 76 } 77 return false; 78} 79 80void PluginService::SetPluginSessionManager(const PluginSessionManagerPtr& pluginSessionManager) 81{ 82 pluginSessionManager_ = pluginSessionManager; 83} 84 85void PluginService::SetProfilerSessionConfig(const std::shared_ptr<ProfilerSessionConfig> profilerSessionConfig, 86 const std::vector<std::string>& pluginNames) 87{ 88 for (const std::string& name : pluginNames) { 89 profilerSessionConfigs_[name] = profilerSessionConfig; 90 } 91} 92 93SemaphorePtr PluginService::GetSemaphore(uint32_t id) const 94{ 95 std::unique_lock<std::mutex> lock(mutex_); 96 auto it = waitSemphores_.find(id); 97 if (it != waitSemphores_.end()) { 98 return it->second; 99 } 100 return nullptr; 101} 102 103bool PluginService::StartService(const std::string& unixSocketName) 104{ 105 pluginServiceImpl_ = std::make_shared<PluginServiceImpl>(*this); 106 serviceEntry_ = std::make_shared<ServiceEntry>(); 107 if (!serviceEntry_->StartServer(unixSocketName)) { 108 pluginServiceImpl_ = nullptr; 109 serviceEntry_ = nullptr; 110 PROFILER_LOG_DEBUG(LOG_CORE, "Start IPC Service FAIL"); 111 return false; 112 } 113 serviceEntry_->RegisterService(*pluginServiceImpl_.get()); 114 return true; 115} 116 117static ShareMemoryBlock::ReusePolicy GetReusePolicy(const ProfilerSessionConfig::BufferConfig& bufferConfig) 118{ 119 if (bufferConfig.policy() == ProfilerSessionConfig::BufferConfig::RECYCLE) { 120 return ShareMemoryBlock::DROP_OLD; 121 } 122 return ShareMemoryBlock::DROP_NONE; 123} 124 125// create plugin session with buffer config 126bool PluginService::CreatePluginSession(const ProfilerPluginConfig& pluginConfig, 127 const ProfilerSessionConfig::BufferConfig& bufferConfig, 128 const ProfilerDataRepeaterPtr& dataRepeater) 129{ 130 uint32_t pluginId = 0; 131 PluginContextPtr pluginCtx = nullptr; 132 std::string pluginName = pluginConfig.name(); 133 isProtobufSerialize_ = pluginConfig.is_protobuf_serialize(); 134 std::tie(pluginId, pluginCtx) = GetPluginContext(pluginName); 135 CHECK_NOTNULL(pluginCtx, false, "get PluginContext failed!"); 136 137 pluginCtx->profilerDataRepeater = dataRepeater; 138 139 uint32_t bufferSize = bufferConfig.pages() * PAGE_BYTES; 140 auto cmd = pluginCommandBuilder_->BuildCreateSessionCmd(pluginConfig, bufferSize); 141 CHECK_TRUE(cmd != nullptr, false, "CreatePluginSession BuildCreateSessionCmd FAIL %s", pluginName.c_str()); 142 143 auto smb = ShareMemoryAllocator::GetInstance().CreateMemoryBlockLocal(pluginName, bufferSize); 144 CHECK_TRUE(smb != nullptr, false, "CreateMemoryBlockLocal FAIL %s", pluginName.c_str()); 145 146 auto policy = GetReusePolicy(bufferConfig); 147 PROFILER_LOG_DEBUG(LOG_CORE, "CreatePluginSession policy = %d", (int)policy); 148 smb->SetReusePolicy(policy); 149 150 auto notifier = EventNotifier::Create(0, EventNotifier::NONBLOCK); 151 CHECK_NOTNULL(notifier, false, "create EventNotifier for %s failed!", pluginName.c_str()); 152 CHECK_TRUE(pluginServiceImpl_->SendHeartBeat(*pluginCtx->context), false, 153 "%s hiprofiler_plugin process is off line!", __func__); 154 pluginCtx->shareMemoryBlock = smb; 155 pluginCtx->eventNotifier = notifier; 156 pluginCtx->profilerPluginState.set_state(ProfilerPluginState::LOADED); 157 pluginServiceImpl_->PushCommand(*pluginCtx->context, cmd); 158 pluginCtx->context->SendFileDescriptor(smb->GetfileDescriptor()); 159 pluginCtx->context->SendFileDescriptor(notifier->GetFd()); 160 161 auto configIter = profilerSessionConfigs_.find(pluginName); 162 if (configIter == profilerSessionConfigs_.end()) { 163 PROFILER_LOG_ERROR(LOG_CORE, "profiler session config not set fot plugin name: %s", pluginName.c_str()); 164 return false; 165 } 166 if (configIter->second->session_mode() == ProfilerSessionConfig::OFFLINE) { 167 eventPoller_->AddFileDescriptor(notifier->GetFd(), 168 [this, pluginCtx] { this->ReadShareMemoryOffline(*pluginCtx); }); 169 } else if (configIter->second->session_mode() == ProfilerSessionConfig::ONLINE) { 170 eventPoller_->AddFileDescriptor(notifier->GetFd(), 171 [this, pluginCtx] { this->ReadShareMemoryOnline(*pluginCtx); }); 172 } 173 PROFILER_LOG_DEBUG(LOG_CORE, "CreatePluginSession %s done, shmem fd = %d", 174 pluginName.c_str(), smb->GetfileDescriptor()); 175 return true; 176} 177 178// create plugin session without buffer config 179bool PluginService::CreatePluginSession(const ProfilerPluginConfig& pluginConfig, 180 const ProfilerDataRepeaterPtr& dataRepeater) 181{ 182 uint32_t pluginId = 0; 183 PluginContextPtr pluginCtx = nullptr; 184 std::string pluginName = pluginConfig.name(); 185 std::tie(pluginId, pluginCtx) = GetPluginContext(pluginName); 186 CHECK_NOTNULL(pluginCtx, false, "get PluginContext failed!"); 187 188 pluginCtx->profilerDataRepeater = dataRepeater; 189 pluginCtx->shareMemoryBlock = nullptr; 190 191 auto cmd = pluginCommandBuilder_->BuildCreateSessionCmd(pluginConfig, 0); 192 CHECK_TRUE(cmd != nullptr, false, "CreatePluginSession BuildCreateSessionCmd FAIL %s", pluginName.c_str()); 193 CHECK_TRUE(pluginServiceImpl_->SendHeartBeat(*pluginCtx->context), false, 194 "%s hiprofiler_plugin process is off line!", __func__); 195 pluginCtx->profilerPluginState.set_state(ProfilerPluginState::LOADED); 196 pluginServiceImpl_->PushCommand(*pluginCtx->context, cmd); 197 PROFILER_LOG_DEBUG(LOG_CORE, "CreatePluginSession %s done!", pluginName.c_str()); 198 return true; 199} 200 201bool PluginService::StartPluginSession(const ProfilerPluginConfig& config) 202{ 203 uint32_t pluginId = 0; 204 PluginContextPtr pluginCtx = nullptr; 205 std::string pluginName = config.name(); 206 std::tie(pluginId, pluginCtx) = GetPluginContext(pluginName); 207 CHECK_NOTNULL(pluginCtx, false, "get PluginContext failed!"); 208 209 auto cmd = pluginCommandBuilder_->BuildStartSessionCmd(config, pluginId); 210 CHECK_TRUE(cmd != nullptr, false, "StartPluginSession BuildStartSessionCmd FAIL %s", pluginName.c_str()); 211 CHECK_TRUE(pluginServiceImpl_->SendHeartBeat(*pluginCtx->context), false, 212 "%s hiprofiler_plugin process is off line!", __func__); 213 pluginCtx->profilerPluginState.set_state(ProfilerPluginState::IN_SESSION); 214 pluginServiceImpl_->PushCommand(*pluginCtx->context, cmd); 215 PROFILER_LOG_INFO(LOG_CORE, "StartPluginSession %s done!", pluginName.c_str()); 216 return true; 217} 218 219bool PluginService::StopPluginSession(const std::string& pluginName) 220{ 221 uint32_t pluginId = 0; 222 PluginContextPtr pluginCtx = nullptr; 223 std::tie(pluginId, pluginCtx) = GetPluginContext(pluginName); 224 CHECK_NOTNULL(pluginCtx, false, "get PluginContext failed!"); 225 226 auto cmd = pluginCommandBuilder_->BuildStopSessionCmd(pluginId); 227 CHECK_TRUE(cmd != nullptr, false, "StopPluginSession BuildStopSessionCmd FAIL %s", pluginName.c_str()); 228 CHECK_TRUE(pluginServiceImpl_->SendHeartBeat(*pluginCtx->context), false, 229 "%s hiprofiler_plugin process is off line!", __func__); 230 pluginCtx->profilerPluginState.set_state(ProfilerPluginState::LOADED); 231 pluginServiceImpl_->PushCommand(*pluginCtx->context, cmd); 232 auto sem = GetSemaphoreFactory().Create(0); 233 CHECK_NOTNULL(sem, false, "create Semaphore for stop %s FAILED!", pluginName.c_str()); 234 235 waitSemphores_[cmd->command_id()] = sem; 236 PROFILER_LOG_DEBUG(LOG_CORE, "=== StopPluginSession %s Waiting ... ===", pluginName.c_str()); 237 // wait on semaphore at most 30 seconds. 238 if (!sem->TimedWait(30)) { 239 // semaphore timeout 240 PROFILER_LOG_DEBUG(LOG_CORE, "=== StopPluginSession Waiting FAIL ==="); 241 return false; 242 } 243 PROFILER_LOG_DEBUG(LOG_CORE, "StopPluginSession %s done!", pluginName.c_str()); 244 return true; 245} 246 247bool PluginService::DestroyPluginSession(const std::string& pluginName) 248{ 249 uint32_t pluginId = 0; 250 PluginContextPtr pluginCtx = nullptr; 251 std::tie(pluginId, pluginCtx) = GetPluginContext(pluginName); 252 CHECK_NOTNULL(pluginCtx, false, "get PluginContext failed!"); 253 254 auto cmd = pluginCommandBuilder_->BuildDestroySessionCmd(pluginId); 255 CHECK_TRUE(cmd != nullptr, false, "DestroyPluginSession BuildDestroySessionCmd FAIL %s", pluginName.c_str()); 256 257 if (profilerSessionConfigs_.find(pluginName) != profilerSessionConfigs_.end()) { 258 profilerSessionConfigs_.erase(profilerSessionConfigs_.find(pluginName)); 259 } 260 if (pluginCtx->shareMemoryBlock) { 261 ShareMemoryAllocator::GetInstance().ReleaseMemoryBlockLocal(pluginName); 262 } 263 264 if (pluginCtx->eventNotifier) { 265 eventPoller_->RemoveFileDescriptor(pluginCtx->eventNotifier->GetFd()); 266 pluginCtx->eventNotifier = nullptr; 267 } 268 CHECK_TRUE(pluginServiceImpl_->SendHeartBeat(*pluginCtx->context), false, 269 "%s hiprofiler_plugin process is off line!", __func__); 270 pluginCtx->profilerPluginState.set_state(ProfilerPluginState::REGISTERED); 271 pluginServiceImpl_->PushCommand(*pluginCtx->context, cmd); 272 PROFILER_LOG_INFO(LOG_CORE, "DestroyPluginSession %s done!", pluginName.c_str()); 273 return true; 274} 275 276bool PluginService::RefreshPluginSession(const std::string& pluginName) 277{ 278 uint32_t pluginId = 0; 279 PluginContextPtr pluginCtx = nullptr; 280 std::tie(pluginId, pluginCtx) = GetPluginContext(pluginName); 281 CHECK_NOTNULL(pluginCtx, false, "get PluginContext failed!"); 282 283 auto cmd = pluginCommandBuilder_->BuildRefreshSessionCmd(pluginId); 284 CHECK_TRUE(cmd != nullptr, false, "RefreshPluginSession BuildRefreshSessionCmd FAIL %s", pluginName.c_str()); 285 CHECK_TRUE(pluginServiceImpl_->SendHeartBeat(*pluginCtx->context), false, 286 "%s hiprofiler_plugin process is off line!", __func__); 287 pluginServiceImpl_->PushCommand(*pluginCtx->context, cmd); 288 PROFILER_LOG_INFO(LOG_CORE, "RefreshPluginSession %s done!", pluginName.c_str()); 289 return true; 290} 291 292bool PluginService::RemovePluginSessionCtx(const std::string& pluginName) 293{ 294 PluginContextPtr pluginCtx = GetPluginContext(pluginName).second; 295 CHECK_NOTNULL(pluginCtx, false, "get PluginContext failed!"); 296 297 if (pluginCtx->shareMemoryBlock) { 298 ShareMemoryAllocator::GetInstance().ReleaseMemoryBlockLocal(pluginName); 299 pluginCtx->shareMemoryBlock = nullptr; 300 } 301 302 if (pluginCtx->eventNotifier) { 303 eventPoller_->RemoveFileDescriptor(pluginCtx->eventNotifier->GetFd()); 304 pluginCtx->eventNotifier = nullptr; 305 } 306 307 pluginCtx->profilerPluginState.set_state(ProfilerPluginState::INITED); 308 PROFILER_LOG_INFO(LOG_CORE, "RemovePluginSessionCtx %s done!", pluginName.c_str()); 309 return true; 310} 311 312std::pair<uint32_t, PluginContextPtr> PluginService::GetPluginContext(const std::string& pluginName) 313{ 314 std::unique_lock<std::mutex> lock(mutex_); 315 CHECK_TRUE(nameIndex_.count(pluginName) > 0, std::make_pair(0, nullptr), 316 "GetPluginContext failed, plugin name `%s` not found!", pluginName.c_str()); 317 uint32_t id = nameIndex_[pluginName]; 318 319 CHECK_TRUE(pluginContext_.count(id) > 0, std::make_pair(id, nullptr), "plugin id %u not found!", id); 320 return std::make_pair(id, pluginContext_[id]); 321} 322 323PluginContextPtr PluginService::GetPluginContextById(uint32_t id) 324{ 325 std::unique_lock<std::mutex> lock(mutex_); 326 CHECK_TRUE(pluginContext_.count(id) > 0, nullptr, "plugin id %u not found!", id); 327 return pluginContext_[id]; 328} 329 330bool PluginService::AddPluginInfo(const PluginInfo& pluginInfo) 331{ 332 if (nameIndex_.find(pluginInfo.name) == nameIndex_.end()) { // add new plugin 333 auto pluginCtx = std::make_shared<PluginContext>(); 334 CHECK_NOTNULL(pluginCtx, false, "create PluginContext failed!"); 335 336 ProfilerPluginCapability capability; 337 capability.set_path(pluginInfo.path); 338 capability.set_name(pluginInfo.name); 339 CHECK_TRUE(ProfilerCapabilityManager::GetInstance().AddCapability(capability), false, 340 "AddPluginInfo AddCapability FAIL"); 341 342 pluginCtx->name = pluginInfo.name; 343 pluginCtx->path = pluginInfo.path; 344 pluginCtx->context = pluginInfo.context; 345 pluginCtx->config.set_name(pluginInfo.name); 346 pluginCtx->config.set_plugin_sha256(pluginInfo.sha256); 347 pluginCtx->profilerPluginState.set_name(pluginInfo.name); 348 pluginCtx->profilerPluginState.set_state(ProfilerPluginState::REGISTERED); 349 pluginCtx->sha256 = pluginInfo.sha256; 350 pluginCtx->bufferSizeHint = pluginInfo.bufferSizeHint; 351 pluginCtx->isStandaloneFileData = pluginInfo.isStandaloneFileData; 352 pluginCtx->outFileName = pluginInfo.outFileName; 353 pluginCtx->pluginVersion = pluginInfo.pluginVersion; 354 355 uint32_t pluginId = ++pluginIdCounter_; 356 std::unique_lock<std::mutex> lock(mutex_); 357 pluginContext_[pluginId] = pluginCtx; 358 nameIndex_[pluginInfo.name] = pluginId; 359 } else { // update sha256 or bufferSizeHint 360 std::unique_lock<std::mutex> lock(mutex_); 361 CHECK_TRUE(nameIndex_.count(pluginInfo.name) > 0, false, "plugin name %s not found!", pluginInfo.name.c_str()); 362 363 uint32_t pluginId = nameIndex_[pluginInfo.name]; 364 CHECK_TRUE(pluginContext_.count(pluginId) > 0, false, "plugin id %u not found!", pluginId); 365 auto pluginCtx = pluginContext_[pluginId]; 366 367 if (pluginInfo.sha256 != "") { 368 pluginCtx->sha256 = pluginInfo.sha256; 369 } 370 if (pluginInfo.bufferSizeHint != 0) { 371 pluginCtx->bufferSizeHint = pluginInfo.bufferSizeHint; 372 } 373 if (pluginInfo.isStandaloneFileData != false) { 374 pluginCtx->isStandaloneFileData = pluginInfo.isStandaloneFileData; 375 } 376 if (pluginInfo.outFileName != "") { 377 pluginCtx->outFileName = pluginInfo.outFileName; 378 } 379 if (pluginInfo.pluginVersion != "") { 380 pluginCtx->pluginVersion = pluginInfo.pluginVersion; 381 } 382 } 383 PROFILER_LOG_DEBUG(LOG_CORE, "AddPluginInfo for %s done!", pluginInfo.name.c_str()); 384 385 return true; 386} 387 388bool PluginService::GetPluginInfo(const std::string& pluginName, PluginInfo& pluginInfo) 389{ 390 uint32_t pluginId = 0; 391 PluginContextPtr pluginCtx = nullptr; 392 std::tie(pluginId, pluginCtx) = GetPluginContext(pluginName); 393 CHECK_TRUE(pluginId, false, "plugin name %s not found!", pluginName.c_str()); 394 CHECK_TRUE(pluginCtx, false, "plugin id %u not found!", pluginId); 395 396 pluginInfo.id = pluginId; 397 pluginInfo.name = pluginCtx->name; 398 pluginInfo.path = pluginCtx->path; 399 pluginInfo.sha256 = pluginCtx->sha256; 400 pluginInfo.bufferSizeHint = pluginCtx->bufferSizeHint; 401 return true; 402} 403 404bool PluginService::RemovePluginInfo(const PluginInfo& pluginInfo) 405{ 406 uint32_t pluginId = pluginInfo.id; 407 PluginContextPtr pluginCtx = GetPluginContextById(pluginId); 408 CHECK_NOTNULL(pluginCtx, false, "RemovePluginInfo failed, id %d not found!", pluginId); 409 410 std::string pluginName = pluginCtx->config.name(); 411 CHECK_TRUE(ProfilerCapabilityManager::GetInstance().RemoveCapability(pluginName), false, 412 "RemovePluginInfo RemoveCapability FAIL %d", pluginId); 413 414 auto pluginState = pluginCtx->profilerPluginState.state(); 415 if (pluginState == ProfilerPluginState::LOADED || pluginState == ProfilerPluginState::IN_SESSION) { 416 std::vector<std::string> pluginNames = {pluginName}; 417 pluginSessionManager_->InvalidatePluginSessions(pluginNames); 418 pluginSessionManager_->RemovePluginSessions(pluginNames); 419 this->RemovePluginSessionCtx(pluginName); 420 } 421 422 std::unique_lock<std::mutex> lock(mutex_); 423 nameIndex_.erase(pluginName); 424 pluginContext_.erase(pluginId); 425 PROFILER_LOG_DEBUG(LOG_CORE, "RemovePluginInfo for %s done!", pluginName.c_str()); 426 return true; 427} 428 429void PluginService::ReadShareMemoryOffline(PluginContext& context) 430{ 431 CHECK_NOTNULL(context.shareMemoryBlock, NO_RETVAL, "smb of %s is null!", context.path.c_str()); 432 CHECK_NOTNULL(traceWriter_, NO_RETVAL, "traceWriter_ is null!"); 433 if (context.eventNotifier) { 434 context.eventNotifier->Take(); 435 } 436 437 uint32_t stopTakeDataSize = 0; 438 while (true) { 439 int retval = 0; 440 bool ret = context.shareMemoryBlock->TakeData([&](const int8_t data[], uint32_t size) -> bool { 441 CHECK_NOTNULL(data, false, "memory block data is null!"); 442 retval = traceWriter_->Write(data, size); 443 CHECK_TRUE(retval != -1, false, "need to splite file"); 444 CHECK_TRUE(retval > 0, false, "write %d bytes failed!", size); 445 return true; 446 }, isProtobufSerialize_); 447 448 if (retval == -1) { 449 PROFILER_LOG_DEBUG(LOG_CORE, "need to clear share memory block and report the basic data"); 450 pluginSessionManager_->RefreshPluginSession(); 451 break; 452 } 453 454 dataFlushSize_ += static_cast<uint32_t>(retval); 455 stopTakeDataSize += static_cast<uint32_t>(retval); 456 if (stopTakeDataSize > STOP_BASELINE) { 457 break; 458 } else if (dataFlushSize_ > FLUSH_BASELINE) { 459 traceWriter_->Flush(); 460 traceWriter_->Finish(); 461 dataFlushSize_ = 0; 462 } 463 464 if (!ret) { // no data to read 465 break; 466 } 467 } 468 traceWriter_->Flush(); 469 traceWriter_->Finish(); 470} 471 472void PluginService::ReadShareMemoryOnline(PluginContext& context) 473{ 474 CHECK_NOTNULL(context.shareMemoryBlock, NO_RETVAL, "smb of %s is null!", context.path.c_str()); 475 if (context.eventNotifier) { 476 context.eventNotifier->Take(); 477 } 478 479 while (true) { 480 auto pluginData = std::make_shared<ProfilerPluginData>(); 481 bool ret = context.shareMemoryBlock->TakeData([&](const int8_t data[], uint32_t size) -> bool { 482 int retval = pluginData->ParseFromArray(reinterpret_cast<const char*>(data), size); 483 CHECK_TRUE(retval, false, "parse %d bytes failed!", size); 484 return true; 485 }, isProtobufSerialize_); 486 if (!ret) { 487 break; 488 } 489 if (!context.profilerDataRepeater->PutPluginData(pluginData)) { 490 break; 491 } 492 } 493} 494 495void PluginService::FlushShareMemory(PluginContext& context) 496{ 497 CHECK_NOTNULL(context.shareMemoryBlock, NO_RETVAL, "smb of %s is null!", context.path.c_str()); 498 CHECK_NOTNULL(traceWriter_, NO_RETVAL, "traceWriter_ is null!"); 499 500 while (true) { 501 bool ret = context.shareMemoryBlock->TakeData([&](const int8_t data[], uint32_t size) -> bool { 502 int retval = traceWriter_->Write(data, size); 503 CHECK_TRUE(retval > 0, false, "write %d bytes failed!", size); 504 return true; 505 }, isProtobufSerialize_); 506 if (!ret) { // no data to read 507 break; 508 } 509 } 510 traceWriter_->Finish(); 511} 512 513void PluginService::FlushAllData(const std::string& pluginName) 514{ 515 PROFILER_LOG_INFO(LOG_CORE, "FlushAllData for %s start!", pluginName.c_str()); 516 auto configIter = profilerSessionConfigs_.find(pluginName); 517 if (configIter == profilerSessionConfigs_.end()) { 518 PROFILER_LOG_ERROR(LOG_CORE, "profiler session config not set fot plugin name: %s", pluginName.c_str()); 519 return; 520 } 521 if (!configIter->second->discard_cache_data()) { 522 uint32_t pluginId = 0; 523 PluginContextPtr pluginCtx = nullptr; 524 std::tie(pluginId, pluginCtx) = GetPluginContext(pluginName); 525 CHECK_NOTNULL(pluginCtx, NO_RETVAL, "%s: get PluginContext(%s) failed!", __func__, pluginName.c_str()); 526 if (configIter->second->session_mode() == ProfilerSessionConfig::OFFLINE) { 527 FlushShareMemory(*pluginCtx); 528 } else if (configIter->second->session_mode() == ProfilerSessionConfig::ONLINE) { 529 ReadShareMemoryOnline(*pluginCtx); 530 } 531 } 532 PROFILER_LOG_INFO(LOG_CORE, "FlushAllData for %s done!", pluginName.c_str()); 533} 534 535bool PluginService::AppendResult(NotifyResultRequest& request) 536{ 537 pluginCommandBuilder_->GetedCommandResponse(request.command_id()); 538 auto sem = GetSemaphore(request.command_id()); 539 if (sem) { 540 sem->Post(); 541 } 542 543 int size = request.result_size(); 544 PROFILER_LOG_DEBUG(LOG_CORE, "AppendResult size:%d, cmd id:%d", size, request.command_id()); 545 for (int i = 0; i < size; i++) { 546 PluginResult pr = request.result(i); 547 if (pr.data().size() > 0) { 548 PROFILER_LOG_DEBUG(LOG_CORE, "AppendResult Size : %zu", pr.data().size()); 549 uint32_t pluginId = pr.plugin_id(); 550 PluginContextPtr pluginCtx = GetPluginContextById(pluginId); 551 CHECK_NOTNULL(pluginCtx, false, "plugin id %u not found!", pluginId); 552 if (pluginCtx->profilerDataRepeater == nullptr) { 553 PROFILER_LOG_DEBUG(LOG_CORE, "AppendResult profilerDataRepeater==nullptr %s %d", 554 pr.status().name().c_str(), pluginId); 555 return false; 556 } 557 auto pluginData = std::make_shared<ProfilerPluginData>(); 558 pluginData->set_name(pr.status().name()); 559 pluginData->set_status(0); 560 pluginData->set_data(pr.data()); 561 if (!pluginCtx->profilerDataRepeater->PutPluginData(pluginData)) { 562 return false; 563 } 564 } else if (pr.out_file_name() != "") { // updata plugin outFileName 565 std::unique_lock<std::mutex> lock(mutex_); 566 auto pluginId = pr.plugin_id(); 567 CHECK_TRUE(pluginContext_.count(pluginId) > 0, false, "plugin id %u not found!", pluginId); 568 pluginContext_[pluginId]->outFileName = pr.out_file_name(); 569 } else { 570 PROFILER_LOG_DEBUG(LOG_CORE, "Flush?Data From ShareMemory?"); 571 } 572 } 573 return true; 574} 575 576std::vector<ProfilerPluginState> PluginService::GetPluginStatus() 577{ 578 std::vector<ProfilerPluginState> status; 579 std::unique_lock<std::mutex> lock(mutex_); 580 for (auto& entry : pluginContext_) { 581 status.push_back(entry.second->profilerPluginState); 582 } 583 return status; 584} 585 586uint32_t PluginService::GetPluginIdByName(std::string name) 587{ 588 std::unique_lock<std::mutex> lock(mutex_); 589 if (nameIndex_.find(name) == nameIndex_.end()) { 590 return 0; 591 } 592 return nameIndex_[name]; 593} 594 595void PluginService::SetTraceWriter(const TraceFileWriterPtr& traceWriter) 596{ 597 traceWriter_ = traceWriter; 598}