1/*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15#include "command_poller.h"
16#include "buffer_writer.h"
17#include "plugin_manager.h"
18#include "socket_context.h"
19
20#include <fcntl.h>
21#include <unistd.h>
22
23namespace {
24constexpr int SLEEP_TIME = 10;
25}
26
27CommandPoller::CommandPoller(const ManagerInterfacePtr& p) : requestIdAutoIncrease_(1), pluginManager_(p) {}
28
29CommandPoller::~CommandPoller() {}
30
31uint32_t CommandPoller::GetRequestId()
32{
33    return requestIdAutoIncrease_++;
34}
35
36bool CommandPoller::OnConnect()
37{
38    return Connect(DEFAULT_UNIX_SOCKET_FULL_PATH);
39}
40
41bool CommandPoller::OnCreateSessionCmd(const CreateSessionCmd& cmd, SocketContext& context) const
42{
43    PROFILER_LOG_DEBUG(LOG_CORE, "%s:proc", __func__);
44    CHECK_TRUE(cmd.buffer_sizes().size() != 0 && cmd.plugin_configs().size() != 0, false, "%s:cmd invalid!", __func__);
45
46    uint32_t bufferSize = cmd.buffer_sizes(0);
47    ProfilerPluginConfig config = cmd.plugin_configs(0);
48    std::vector<ProfilerPluginConfig> configVec;
49    configVec.push_back(config);
50
51    auto pluginManager = pluginManager_.lock(); // promote to shared_ptr
52    CHECK_NOTNULL(pluginManager, false, "promote FAILED!");
53
54    CHECK_TRUE(pluginManager->LoadPlugin(config.name()), false, "%s:fail 1", __func__);
55
56    int smbFd = -1;
57    int eventFd = -1;
58    if (bufferSize != 0) {
59        PROFILER_LOG_DEBUG(LOG_CORE, "%s:bufferSize = %d", __func__, bufferSize);
60        smbFd = context.ReceiveFileDiscriptor();
61        eventFd = context.ReceiveFileDiscriptor();
62        int flags = fcntl(eventFd, F_GETFL);
63        PROFILER_LOG_DEBUG(LOG_CORE, "%s:smbFd = %d, eventFd = %d", __func__, smbFd, eventFd);
64        PROFILER_LOG_DEBUG(LOG_CORE, "%s:eventFd flags = %X", __func__, flags);
65    }
66    CHECK_TRUE(pluginManager->CreateWriter(config.name(), bufferSize, smbFd, eventFd, config.is_protobuf_serialize()),
67               false, "%s:createWriter failed!", __func__);
68    CHECK_TRUE(pluginManager->CreatePluginSession(configVec), false,
69               "%s:createPluginSession failed!", __func__);
70    PROFILER_LOG_DEBUG(LOG_CORE, "%s:ok", __func__);
71    return true;
72}
73
74bool CommandPoller::OnDestroySessionCmd(const DestroySessionCmd& cmd) const
75{
76    PROFILER_LOG_DEBUG(LOG_CORE, "%s:proc", __func__);
77    CHECK_TRUE(cmd.plugin_ids().size() != 0, false, "%s:cmd invalid!", __func__);
78    uint32_t pluginId = cmd.plugin_ids(0);
79    std::vector<uint32_t> pluginIdVec;
80    pluginIdVec.push_back(pluginId);
81
82    auto pluginManager = pluginManager_.lock(); // promote to shared_ptr
83    CHECK_NOTNULL(pluginManager, false, "promote FAILED!");
84    CHECK_TRUE(pluginManager->DestroyPluginSession(pluginIdVec), false,
85               "%s:destroyPluginSession failed!", __func__);
86    CHECK_TRUE(pluginManager->ResetWriter(pluginId), false, "%s:resetWriter failed!", __func__);
87    CHECK_TRUE(pluginManager->UnloadPlugin(pluginId), false, "%s:unloadPlugin failed!", __func__);
88    PROFILER_LOG_DEBUG(LOG_CORE, "%s:ok", __func__);
89    return true;
90}
91
92bool CommandPoller::OnStartSessionCmd(const StartSessionCmd& cmd, PluginResult& result) const
93{
94    PROFILER_LOG_DEBUG(LOG_CORE, "%s:proc", __func__);
95    CHECK_TRUE(cmd.plugin_ids().size() != 0 && cmd.plugin_configs().size() != 0, false,
96               "%s:cmd invalid!", __func__);
97    std::vector<uint32_t> pluginIds;
98    pluginIds.push_back(cmd.plugin_ids(0));
99    std::vector<ProfilerPluginConfig> configVec;
100    configVec.push_back(cmd.plugin_configs(0));
101
102    auto pluginManager = pluginManager_.lock(); // promote to shared_ptr
103    CHECK_NOTNULL(pluginManager, false, "promote FAILED!");
104    CHECK_TRUE(pluginManager->StartPluginSession(pluginIds, configVec, result), false,
105               "%s:start Session failed!", __func__);
106    PROFILER_LOG_DEBUG(LOG_CORE, "%s:OK", __func__);
107    return true;
108}
109
110bool CommandPoller::OnStopSessionCmd(const StopSessionCmd& cmd) const
111{
112    PROFILER_LOG_DEBUG(LOG_CORE, "%s:proc", __func__);
113    CHECK_TRUE(cmd.plugin_ids().size() != 0, false, "%s:cmd invalid!", __func__);
114    std::vector<uint32_t> pluginIds;
115    pluginIds.push_back(cmd.plugin_ids(0));
116
117    auto pluginManager = pluginManager_.lock(); // promote to shared_ptr
118    CHECK_NOTNULL(pluginManager, false, "promote FAILED!");
119    CHECK_TRUE(pluginManager->StopPluginSession(pluginIds), false, "%s:stop Session failed!", __func__);
120    PROFILER_LOG_DEBUG(LOG_CORE, "%s:ok", __func__);
121    return true;
122}
123
124bool CommandPoller::OnReportBasicDataCmd(const RefreshSessionCmd& cmd) const
125{
126    PROFILER_LOG_DEBUG(LOG_CORE, "%s:proc", __func__);
127    CHECK_TRUE(cmd.plugin_ids().size() != 0, false, "%s:cmd invalid!", __func__);
128    std::vector<uint32_t> pluginIds;
129    pluginIds.push_back(cmd.plugin_ids(0));
130
131    auto pluginManager = pluginManager_.lock(); // promote to shared_ptr
132    CHECK_NOTNULL(pluginManager, false, "promote FAILED!");
133    CHECK_TRUE(pluginManager->ReportPluginBasicData(pluginIds), false, "%s:report basic data failed!", __func__);
134    PROFILER_LOG_INFO(LOG_CORE, "%s:ok", __func__);
135    return true;
136}
137
138bool CommandPoller::OnGetCommandResponse(SocketContext& context, ::GetCommandResponse& response)
139{
140    PROFILER_LOG_DEBUG(LOG_CORE, "%s:proc", __func__);
141    std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_TIME));
142    NotifyResultRequest nrr;
143    nrr.set_request_id(1);
144    nrr.set_command_id(response.command_id());
145    PluginResult* pr = nrr.add_result();
146    ProfilerPluginState* status = pr->mutable_status();
147
148    if (response.has_create_session_cmd()) {
149        if (OnCreateSessionCmd(response.create_session_cmd(), context)) {
150            status->set_state(ProfilerPluginState::LOADED);
151        } else {
152            status->set_state(ProfilerPluginState::REGISTERED);
153        }
154    } else if (response.has_destroy_session_cmd()) {
155        if (OnDestroySessionCmd(response.destroy_session_cmd())) {
156            status->set_state(ProfilerPluginState::REGISTERED);
157        } else {
158            status->set_state(ProfilerPluginState::LOADED);
159        }
160    } else if (response.has_start_session_cmd()) {
161        if (OnStartSessionCmd(response.start_session_cmd(), *pr)) {
162            status->set_state(ProfilerPluginState::IN_SESSION);
163        } else {
164            status->set_state(ProfilerPluginState::LOADED);
165        }
166    } else if (response.has_stop_session_cmd()) {
167        if (OnStopSessionCmd(response.stop_session_cmd())) {
168            status->set_state(ProfilerPluginState::LOADED);
169        } else {
170            status->set_state(ProfilerPluginState::IN_SESSION);
171        }
172    } else if (response.has_refresh_session_cmd()) {
173        OnReportBasicDataCmd(response.refresh_session_cmd());
174        status->set_state(ProfilerPluginState::IN_SESSION);
175    } else {
176        PROFILER_LOG_DEBUG(LOG_CORE, "%s:command Response failed!", __func__);
177        return false;
178    }
179    PROFILER_LOG_DEBUG(LOG_CORE, "%s:ok id = %d", __func__, nrr.command_id());
180    NotifyResult(nrr);
181    return true;
182}
183