1/*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <gtest/gtest.h>
17#include <thread>
18#include <sys/eventfd.h>
19#include <cstdarg>
20
21#include "begetctl.h"
22#include "cJSON.h"
23#include "init.h"
24#include "init_hashmap.h"
25#include "init_param.h"
26#include "init_utils.h"
27#include "le_epoll.h"
28#include "le_loop.h"
29#include "le_socket.h"
30#include "le_task.h"
31#include "loop_event.h"
32#include "param_manager.h"
33#include "param_message.h"
34#include "param_utils.h"
35#include "trigger_manager.h"
36
37using namespace testing::ext;
38using namespace std;
39
40namespace init_ut {
41const std::string TCP_SERVER = "127.0.0.1:7777";
42const std::string PIPE_SERVER = STARTUP_INIT_UT_PATH "/dev/unix/socket/testsocket";
43const std::string WATCHER_FILE = STARTUP_INIT_UT_PATH "/test_watcher_file";
44const std::string FORMAT_STR = "{ \"cmd\":%d, \"message\":\"%s\" }";
45
46static LoopHandle g_loopClient_ = nullptr;
47static LoopHandle g_loopServer_ = nullptr;
48static int g_maxCount = 0;
49static int g_timeCount = 0;
50static int g_cmd = 2;
51static void DecodeMessage(const char *buffer, size_t nread, uint32_t &cmd)
52{
53    cJSON *root = cJSON_ParseWithLength(buffer, nread);
54    if (root == nullptr) {
55        EXPECT_NE(root, nullptr);
56        printf("Invalid message %s  \n", buffer);
57        return;
58    }
59    printf("Message: %s  \n", cJSON_GetStringValue(cJSON_GetObjectItem(root, "message")));
60    cmd = cJSON_GetNumberValue(cJSON_GetObjectItem(root, "cmd"));
61    printf("cmd: %d \n", cmd);
62    cJSON_Delete(root);
63    return;
64}
65
66static void SendMessage(const LoopHandle loopHandle, const TaskHandle taskHandle, const char *message, ...)
67{
68    uint32_t bufferSize = 1024; // 1024 buffer size
69    BufferHandle handle = LE_CreateBuffer(loopHandle, bufferSize);
70    char *buffer = (char *)LE_GetBufferInfo(handle, nullptr, &bufferSize);
71
72    va_list vargs;
73    va_start(vargs, message);
74    if (vsnprintf_s(buffer, bufferSize, bufferSize - 1, message, vargs) == -1) {
75        LE_FreeBuffer(loopHandle, taskHandle, handle);
76        va_end(vargs);
77        EXPECT_EQ(1, 0);
78        return;
79    }
80    va_end(vargs);
81    int ret = LE_Send(loopHandle, taskHandle, handle, bufferSize);
82    EXPECT_EQ(ret, 0);
83}
84
85static void TestOnClose(const TaskHandle taskHandle)
86{
87}
88
89static LE_STATUS TestHandleTaskEvent(const LoopHandle loop, const TaskHandle task, uint32_t oper)
90{
91    return LE_SUCCESS;
92}
93
94static void TestOnReceiveRequest(const TaskHandle task, const uint8_t *buffer, uint32_t nread)
95{
96    EXPECT_NE(buffer, nullptr);
97    if (buffer == nullptr) {
98        return;
99    }
100    printf("Server receive message %s \n", reinterpret_cast<const char *>(buffer));
101    uint32_t cmd = 0;
102    DecodeMessage(reinterpret_cast<const char *>(buffer), nread, cmd);
103    SendMessage(g_loopServer_, task, reinterpret_cast<const char *>(buffer));
104}
105
106static void TestClientOnReceiveRequest(const TaskHandle task, const uint8_t *buffer, uint32_t nread)
107{
108    printf("Client receive message %s \n", reinterpret_cast<const char *>(buffer));
109    EXPECT_NE(buffer, nullptr);
110    if (buffer == nullptr) {
111        return;
112    }
113    uint32_t cmd = 0;
114    DecodeMessage(reinterpret_cast<const char *>(buffer), nread, cmd);
115    if (cmd == 5 || cmd == 2) { // 2 5 close server
116        LE_StopLoop(g_loopClient_);
117    }
118}
119
120static void ProcessAsyncEvent(const TaskHandle taskHandle, uint64_t eventId, const uint8_t *buffer, uint32_t buffLen)
121{
122    UNUSED(taskHandle);
123    UNUSED(eventId);
124    UNUSED(buffer);
125    UNUSED(buffLen);
126}
127
128static void TestSendMessageComplete(const TaskHandle taskHandle, BufferHandle handle)
129{
130    printf("SendMessage result %d \n", LE_GetSendResult(handle));
131    uint32_t bufferSize = 1024; // 1024 buffer size
132    char *buffer = (char *)LE_GetBufferInfo(handle, nullptr, &bufferSize);
133    uint32_t cmd = 0;
134    DecodeMessage(reinterpret_cast<const char *>(buffer), bufferSize, cmd);
135    if (cmd == 5) { // 5 close server
136        LE_StopLoop(g_loopServer_);
137    }
138}
139
140static int TestTcpIncomingConnect(LoopHandle loop, TaskHandle server)
141{
142    PARAM_CHECK(server != nullptr, return -1, "Error server");
143    printf("Tcp connect incoming \n");
144    TaskHandle stream;
145    LE_StreamInfo info = {};
146    info.baseInfo.flags = TASK_STREAM | TASK_TCP | TASK_CONNECT;
147    info.baseInfo.close = TestOnClose;
148    info.baseInfo.userDataSize = 0;
149    info.disConnectComplete = nullptr;
150    info.sendMessageComplete = TestSendMessageComplete;
151    info.recvMessage = TestOnReceiveRequest;
152    LE_STATUS ret = LE_AcceptStreamClient(loop, server, &stream, &info);
153    EXPECT_EQ(ret, 0);
154    return 0;
155}
156
157static int TestPipIncomingConnect(LoopHandle loop, TaskHandle server)
158{
159    PARAM_CHECK(server != nullptr, return -1, "Error server");
160    printf("Pipe connect incoming \n");
161    TaskHandle stream;
162    LE_StreamInfo info = {};
163    info.baseInfo.flags = TASK_STREAM | TASK_PIPE | TASK_CONNECT;
164    info.baseInfo.close = TestOnClose;
165    info.baseInfo.userDataSize = 0;
166    info.disConnectComplete = nullptr;
167    info.sendMessageComplete = TestSendMessageComplete;
168    info.recvMessage = TestOnReceiveRequest;
169    LE_STATUS ret = LE_AcceptStreamClient(loop, server, &stream, &info);
170    EXPECT_EQ(ret, 0);
171    return 0;
172}
173
174static void TestConnectComplete(const TaskHandle client)
175{
176    printf("Connect complete \n");
177}
178
179static void TestDisConnectComplete(const TaskHandle client)
180{
181    printf("DisConnect complete \n");
182    LE_StopLoop(g_loopClient_);
183}
184
185static void TestProcessTimer(const TimerHandle taskHandle, void *context)
186{
187    g_timeCount++;
188    printf("ProcessTimer %d\n", g_timeCount);
189    if (g_maxCount == 2) { // 2 stop
190        if (g_timeCount >= g_maxCount) {
191            LE_StopLoop(g_loopClient_);
192        }
193    }
194    if (g_maxCount == 3) { // 3 stop timer
195        if (g_timeCount >= g_maxCount) {
196            LE_StopTimer(g_loopClient_, taskHandle);
197            LE_StopLoop(g_loopClient_);
198        }
199    }
200    if (g_maxCount == 10) { // 10 write watcher file
201        FILE *tmpFile = fopen(WATCHER_FILE.c_str(), "wr");
202        if (tmpFile != nullptr) {
203            fprintf(tmpFile, "%s", "test watcher file 22222222222");
204            (void)fflush(tmpFile);
205            fclose(tmpFile);
206        }
207        LE_StopTimer(g_loopClient_, taskHandle);
208        LE_StopLoop(g_loopClient_);
209    }
210}
211
212static void ProcessWatchEventTest(WatcherHandle taskHandle, int fd, uint32_t *events, const void *context)
213{
214    UNUSED(taskHandle);
215    UNUSED(fd);
216    UNUSED(events);
217    UNUSED(context);
218    printf("Process watcher event \n");
219    LE_StopLoop(g_loopClient_);
220}
221
222class LoopServerUnitTest : public testing::Test {
223public:
224    LoopServerUnitTest() {};
225    virtual ~LoopServerUnitTest() {};
226    static void SetUpTestCase(void) {};
227    static void TearDownTestCase(void) {};
228    void SetUp() {};
229    void TearDown() {};
230    void TestBody(void) {};
231
232    // for thread to create tcp\pipe server
233    void RunServer(void)
234    {
235        TaskHandle tcpServer = nullptr;
236        TaskHandle pipeServer = nullptr;
237        LE_STATUS ret = LE_CreateLoop(&g_loopServer_);
238        EXPECT_EQ(ret, 0);
239        // create server for tcp
240        LE_StreamServerInfo info = {};
241        info.baseInfo.flags = TASK_STREAM | TASK_TCP | TASK_SERVER;
242        info.socketId = -1;
243        info.server = const_cast<char *>(TCP_SERVER.c_str());
244        info.baseInfo.close = TestOnClose;
245        info.incommingConnect = TestTcpIncomingConnect;
246        ret = LE_CreateStreamServer(g_loopServer_, &tcpServer, &info);
247        EXPECT_EQ(ret, 0);
248
249        info.baseInfo.flags = TASK_STREAM | TASK_PIPE | TASK_SERVER;
250        info.socketId = -1;
251        info.server = const_cast<char *>(PIPE_SERVER.c_str());
252        info.baseInfo.close = TestOnClose;
253        info.incommingConnect = TestPipIncomingConnect;
254        ret = LE_CreateStreamServer(g_loopServer_, &pipeServer, &info);
255        EXPECT_EQ(ret, 0);
256
257        printf("Run server pipeServer_ \n");
258        // run loop for server
259        LE_RunLoop(g_loopServer_);
260
261        printf("Run server pipeServer_ \n");
262        LE_CloseStreamTask(g_loopServer_, pipeServer);
263        pipeServer = nullptr;
264        LE_CloseStreamTask(g_loopServer_, tcpServer);
265        tcpServer = nullptr;
266        LE_CloseLoop(g_loopServer_);
267        g_loopServer_ = nullptr;
268    }
269
270    void StartServer()
271    {
272        std::thread(&LoopServerUnitTest::RunServer, this).detach();
273        sleep(1);
274    }
275
276    TaskHandle CreateConnect(const char *tcpServer, uint32_t flags)
277    {
278        if (g_loopClient_ == nullptr) {
279            LE_STATUS ret = LE_CreateLoop(&g_loopClient_);
280            EXPECT_EQ(ret, 0);
281        }
282
283        TaskHandle task = nullptr;
284        LE_StreamInfo info = {};
285        info.baseInfo.flags = TASK_STREAM | flags | TASK_CONNECT;
286        info.server = const_cast<char *>(tcpServer);
287        info.baseInfo.userDataSize = 0;
288        info.baseInfo.close = TestOnClose;
289        info.disConnectComplete = TestDisConnectComplete;
290        info.connectComplete = TestConnectComplete;
291        info.sendMessageComplete = nullptr;
292        info.recvMessage = TestClientOnReceiveRequest;
293        LE_STATUS status = LE_CreateStreamClient(g_loopClient_, &task, &info);
294        EXPECT_EQ(status, 0);
295        return task;
296    }
297
298    WatcherHandle CreateWatcherTask(int fd, const char *fileName)
299    {
300        if (g_loopClient_ == nullptr) {
301            LE_STATUS ret = LE_CreateLoop(&g_loopClient_);
302            EXPECT_EQ(ret, 0);
303        }
304        WatcherHandle handle = nullptr;
305        LE_WatchInfo info = {};
306        info.fd = fd;
307        info.flags = WATCHER_ONCE;
308        info.events = EVENT_READ | EVENT_WRITE;
309        info.processEvent = ProcessWatchEventTest;
310        LE_STATUS status = LE_StartWatcher(g_loopClient_, &handle, &info, nullptr);
311        EXPECT_EQ(status, 0);
312        return handle;
313    }
314
315    TimerHandle CreateTimerTask(int repeat)
316    {
317        if (g_loopClient_ == nullptr) {
318            LE_STATUS ret = LE_CreateLoop(&g_loopClient_);
319            EXPECT_EQ(ret, 0);
320        }
321        TimerHandle timer = nullptr;
322        int ret = LE_CreateTimer(g_loopClient_, &timer, TestProcessTimer, nullptr);
323        EXPECT_EQ(ret, 0);
324        ret = LE_StartTimer(g_loopClient_, timer, 500, repeat); // 500 ms
325        EXPECT_EQ(ret, 0);
326        return timer;
327    }
328private:
329    std::thread *serverThread_ = nullptr;
330};
331
332HWTEST_F(LoopServerUnitTest, Init_TestRunServer_001, TestSize.Level1)
333{
334    LoopServerUnitTest test;
335    test.StartServer();
336}
337
338HWTEST_F(LoopServerUnitTest, Init_TestPipConnect_001, TestSize.Level1)
339{
340    g_cmd = 2; // 2 only close client
341    LoopServerUnitTest test;
342    TaskHandle pipe = test.CreateConnect(PIPE_SERVER.c_str(), TASK_PIPE);
343    EXPECT_NE(pipe, nullptr);
344    SendMessage(g_loopClient_, pipe, FORMAT_STR.c_str(), g_cmd, "connect success");
345    LE_RunLoop(g_loopClient_);
346    LE_CloseStreamTask(g_loopClient_, pipe);
347    LE_CloseLoop(g_loopClient_);
348    g_loopClient_ = nullptr;
349}
350
351HWTEST_F(LoopServerUnitTest, Init_TestTcpConnect_001, TestSize.Level1)
352{
353    g_cmd = 2; // 2 only close client
354    LoopServerUnitTest test;
355    TaskHandle tcp = test.CreateConnect(TCP_SERVER.c_str(), TASK_TCP);
356    EXPECT_NE(tcp, nullptr);
357    SendMessage(g_loopClient_, tcp, FORMAT_STR.c_str(), g_cmd, "connect success");
358    LE_RunLoop(g_loopClient_);
359    LE_CloseStreamTask(g_loopClient_, tcp);
360    LE_CloseLoop(g_loopClient_);
361    g_loopClient_ = nullptr;
362}
363
364HWTEST_F(LoopServerUnitTest, Init_TestTimer_001, TestSize.Level1)
365{
366    LoopServerUnitTest test;
367    g_maxCount = 2; // 2 stop
368    TimerHandle timer = test.CreateTimerTask(2);
369    EXPECT_NE(timer, nullptr);
370    LE_RunLoop(g_loopClient_);
371    LE_CloseLoop(g_loopClient_);
372    g_loopClient_ = nullptr;
373}
374
375HWTEST_F(LoopServerUnitTest, Init_TestTimer_002, TestSize.Level1)
376{
377    LoopServerUnitTest test;
378    g_maxCount = 3; // 3 stop timer
379    TimerHandle timer = test.CreateTimerTask(3);
380    EXPECT_NE(timer, nullptr);
381    LE_RunLoop(g_loopClient_);
382    LE_CloseLoop(g_loopClient_);
383    g_loopClient_ = nullptr;
384}
385
386HWTEST_F(LoopServerUnitTest, Init_TestWatcher_001, TestSize.Level1)
387{
388    int fd = open(WATCHER_FILE.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
389    if (fd >= 0) {
390        write(fd, WATCHER_FILE.c_str(), WATCHER_FILE.size());
391    }
392    EXPECT_GE(fd, 0);
393    printf("Watcher fd %d \n", fd);
394    LoopServerUnitTest test;
395    WatcherHandle watcher = test.CreateWatcherTask(3, WATCHER_FILE.c_str());
396    EXPECT_NE(watcher, nullptr);
397    g_maxCount = 10; // 10 write watcher file
398    TimerHandle timer = test.CreateTimerTask(1);
399    EXPECT_NE(timer, nullptr);
400
401    LE_RunLoop(g_loopClient_);
402    LE_RemoveWatcher(g_loopClient_, watcher);
403    close(fd);
404    LE_CloseLoop(g_loopClient_);
405    g_loopClient_ = nullptr;
406}
407
408HWTEST_F(LoopServerUnitTest, Init_TestStopServer_001, TestSize.Level1)
409{
410    g_cmd = 5; // 5 close server
411    LoopServerUnitTest test;
412    TaskHandle pip = test.CreateConnect(PIPE_SERVER.c_str(), TASK_PIPE);
413    EXPECT_NE(pip, nullptr);
414    SendMessage(g_loopClient_, pip, FORMAT_STR.c_str(), g_cmd, "connect success");
415    LE_RunLoop(g_loopClient_);
416    LE_CloseStreamTask(g_loopClient_, pip);
417    LE_CloseLoop(g_loopClient_);
418    g_loopClient_ = nullptr;
419}
420
421HWTEST_F(LoopServerUnitTest, Init_TestServerTimeout_001, TestSize.Level1)
422{
423    int flag = TASK_STREAM | TASK_PIPE | TASK_SERVER | TASK_TEST;
424    int serverSock = CreateSocket(flag, "/data/test1pipe");
425    EXPECT_NE(serverSock, -1);
426    int ret = AcceptSocket(serverSock, flag);
427    EXPECT_EQ(ret, -1);
428}
429}  // namespace init_ut
430