1/* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15#include <algorithm> 16#include <cstring> 17#include <iostream> 18#include <memory> 19#include <mutex> 20#include <thread> 21#include <sys/prctl.h> 22#include <sys/stat.h> 23#include <unistd.h> 24 25#include <log_utils.h> 26#include <seq_packet_socket_server.h> 27 28#include "service_controller.h" 29#include "cmd_executor.h" 30 31namespace OHOS { 32namespace HiviewDFX { 33static const int MAX_CLIENT_CONNECTIONS = 100; 34 35CmdExecutor::~CmdExecutor() 36{ 37 std::lock_guard<std::mutex> lg(m_clientAccess); 38 for (auto& client : m_clients) { 39 client->m_stopThread.store(true); 40 } 41 for (auto& client : m_clients) { 42 if (client->m_clientThread.joinable()) { 43 client->m_clientThread.join(); 44 } 45 } 46} 47 48void CmdExecutor::MainLoop(const std::string& socketName) 49{ 50 SeqPacketSocketServer cmdServer(socketName, MAX_CLIENT_CONNECTIONS); 51 if (cmdServer.Init() < 0) { 52 std::cerr << "Failed to init control socket ! \n"; 53 return; 54 } 55 std::cout << "Server started to listen !\n"; 56 using namespace std::chrono_literals; 57 cmdServer.StartAcceptingConnection( 58 [this] (std::unique_ptr<Socket> handler) { 59 OnAcceptedConnection(std::move(handler)); 60 }, 61 3000ms, 62 [this] () { 63 CleanFinishedClients(); 64 }); 65} 66 67void CmdExecutor::OnAcceptedConnection(std::unique_ptr<Socket> handler) 68{ 69 std::lock_guard<std::mutex> lg(m_clientAccess); 70 auto newVal = std::make_unique<ClientThread>(); 71 if (newVal != nullptr) { 72 newVal->m_stopThread.store(false); 73 newVal->m_clientThread = std::thread([this](std::unique_ptr<Socket> handler) { 74 ClientEventLoop(std::move(handler)); 75 }, std::move(handler)); 76 m_clients.push_back(std::move(newVal)); 77 } 78} 79 80void CmdExecutor::ClientEventLoop(std::unique_ptr<Socket> handler) 81{ 82 decltype(m_clients)::iterator clientInfoIt; 83 { 84 std::lock_guard<std::mutex> lg(m_clientAccess); 85 clientInfoIt = std::find_if(m_clients.begin(), m_clients.end(), 86 [](const std::unique_ptr<ClientThread>& ct) { 87 return ct->m_clientThread.get_id() == std::this_thread::get_id(); 88 }); 89 } 90 if (clientInfoIt == m_clients.end()) { 91 std::cerr << "Failed to find client\n"; 92 return; 93 } 94 95 prctl(PR_SET_NAME, m_name.c_str()); 96 ServiceController serviceCtrl(std::move(handler), m_logCollector, m_hilogBuffer, m_kmsgBuffer); 97 serviceCtrl.CommunicationLoop((*clientInfoIt)->m_stopThread, m_cmdList); 98 99 std::lock_guard<std::mutex> ul(m_finishedClientAccess); 100 m_finishedClients.push_back(std::this_thread::get_id()); 101} 102 103void CmdExecutor::CleanFinishedClients() 104{ 105 std::list<std::thread> threadsToJoin; 106 { 107 // select clients to clean up - pick threads that we have to be sure are ended 108 std::scoped_lock sl(m_finishedClientAccess, m_clientAccess); 109 for (auto threadId : m_finishedClients) { 110 auto clientInfoIt = std::find_if(m_clients.begin(), m_clients.end(), 111 [&threadId](const std::unique_ptr<ClientThread>& ct) { 112 return ct->m_clientThread.get_id() == threadId; 113 }); 114 if (clientInfoIt != m_clients.end()) { 115 threadsToJoin.push_back(std::move((*clientInfoIt)->m_clientThread)); 116 m_clients.erase(clientInfoIt); 117 } 118 } 119 m_finishedClients.clear(); 120 } 121 for (auto& thread : threadsToJoin) { 122 if (thread.joinable()) { 123 thread.join(); 124 } 125 } 126} 127} // namespace HiviewDFX 128} // namespace OHOS 129