1 /*
2 * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "tcp_client.h"
17 #include "common/common_macro.h"
18 #include "common/media_log.h"
19 #include "common/sharing_log.h"
20 #include "network/socket/socket_utils.h"
21 #include "network/socket/tcp_socket.h"
22 #include "utils/utils.h"
23 namespace OHOS {
24 namespace Sharing {
~TcpClient()25 TcpClient::~TcpClient()
26 {
27 SHARING_LOGD("trace.");
28 Disconnect();
29 }
30
TcpClient()31 TcpClient::TcpClient()
32 {
33 SHARING_LOGD("trace.");
34 }
35
Connect(const std::string &peerIp, uint16_t peerPort, const std::string &localIp, uint16_t localPort)36 bool TcpClient::Connect(const std::string &peerIp, uint16_t peerPort, const std::string &localIp, uint16_t localPort)
37 {
38 SHARING_LOGI("peerIp:%{public}s, peerPort:%{public}d, thread_id: %{public}llu.", GetAnonyString(peerIp).c_str(),
39 peerPort, GetThreadId());
40
41 int32_t retCode = 0;
42 socket_ = std::make_unique<TcpSocket>();
43 if (socket_) {
44 if (socket_->Connect(peerIp, peerPort, retCode, true, true, localIp, localPort)) {
45 SHARING_LOGI("connect success.");
46 auto eventRunner = OHOS::AppExecFwk::EventRunner::Create(true);
47 eventHandler_ = std::make_shared<TcpClientEventHandler>();
48 eventHandler_->SetClient(shared_from_this());
49 eventHandler_->SetEventRunner(eventRunner);
50 eventRunner->Run();
51
52 bool ret = false;
53 eventListener_ = std::make_shared<TcpClientEventListener>();
54 if (eventListener_) {
55 eventListener_->SetClient(shared_from_this());
56 ret = eventListener_->AddFdListener(socket_->GetLocalFd(), eventListener_, eventHandler_);
57 }
58
59 auto callback = callback_.lock();
60 if (callback) {
61 callback->OnClientConnect(true);
62 callback->OnClientWriteable(socket_->GetLocalFd());
63 }
64
65 return ret;
66 } else {
67 std::unique_lock<std::shared_mutex> lk(mutex_);
68 if (eventListener_) {
69 eventListener_->RemoveFdListener(socket_->GetLocalFd());
70 } else {
71 SHARING_LOGE("eventListener is nullptr.");
72 }
73 SocketUtils::ShutDownSocket(socket_->GetLocalFd());
74 SocketUtils::CloseSocket(socket_->GetLocalFd());
75 socket_.reset();
76 }
77 }
78 SHARING_LOGE("[TcpClient] Connect failed!");
79 auto callback = callback_.lock();
80 if (callback) {
81 callback->OnClientConnect(false);
82 }
83
84 return false;
85 }
86
Disconnect()87 void TcpClient::Disconnect()
88 {
89 SHARING_LOGD("trace.");
90 std::unique_lock<std::shared_mutex> lk(mutex_);
91 if (socket_ != nullptr) {
92 eventListener_->RemoveFdListener(socket_->GetLocalFd());
93 SocketUtils::ShutDownSocket(socket_->GetLocalFd());
94 SocketUtils::CloseSocket(socket_->GetLocalFd());
95 eventListener_->OnShutdown(socket_->GetLocalFd());
96 socket_.reset();
97 }
98 }
99
Send(const DataBuffer::Ptr &buf, int32_t nSize)100 bool TcpClient::Send(const DataBuffer::Ptr &buf, int32_t nSize)
101 {
102 SHARING_LOGD("trace.");
103 RETURN_FALSE_IF_NULL(buf);
104 return Send(buf->Peek(), nSize);
105 }
106
Send(const char *buf, int32_t nSize)107 bool TcpClient::Send(const char *buf, int32_t nSize)
108 {
109 SHARING_LOGD("trace.");
110 std::unique_lock<std::shared_mutex> lk(mutex_);
111 if (socket_ != nullptr) {
112 if (SocketUtils::SendSocket(socket_->GetLocalFd(), buf, nSize) != 0) {
113 return true;
114 } else {
115 lk.unlock();
116 SHARING_LOGE("send Failed, Disconnect!");
117 Disconnect();
118 return false;
119 }
120 } else {
121 return false;
122 }
123 }
124
Send(const std::string &msg)125 bool TcpClient::Send(const std::string &msg)
126 {
127 SHARING_LOGD("trace.");
128 return Send(msg.c_str(), msg.size());
129 }
130
GetSocketInfo()131 SocketInfo::Ptr TcpClient::GetSocketInfo()
132 {
133 SHARING_LOGD("trace.");
134 return socket_;
135 }
136
OnClientReadable(int32_t fd)137 void TcpClient::OnClientReadable(int32_t fd)
138 {
139 SHARING_LOGI("trace fd: %{public}d, thread_id: %{public}llu.", fd, GetThreadId());
140 int32_t error = 0;
141 int32_t retCode = 0;
142 do {
143 DataBuffer::Ptr buf = std::make_shared<DataBuffer>(DEAFULT_READ_BUFFER_SIZE);
144 retCode = SocketUtils::RecvSocket(fd, (char *)buf->Data(), DEAFULT_READ_BUFFER_SIZE, flags_, error);
145 SHARING_LOGD("recvSocket len: %{public}d.", retCode);
146 if (retCode > 0) {
147 buf->UpdateSize(retCode);
148 auto callback = callback_.lock();
149 if (callback) {
150 callback->OnClientReadData(fd, std::move(buf));
151 }
152 } else if (retCode < 0) {
153 if (error == ECONNREFUSED) {
154 auto callback = callback_.lock();
155 if (callback) {
156 callback->OnClientConnect(false);
157 }
158 }
159 } else {
160 SHARING_LOGE("recvSocket failed!");
161 Disconnect();
162 }
163 } while (retCode == (int32_t)DEAFULT_READ_BUFFER_SIZE);
164 }
165
166 } // namespace Sharing
167 } // namespace OHOS