1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "dsoftbus_adapter_impl.h"
17
18 #ifdef ENABLE_PERFORMANCE_CHECK
19 #include <chrono>
20 #endif // ENABLE_PERFORMANCE_CHECK
21
22 #include <netinet/in.h>
23 #include <netinet/tcp.h>
24 #ifdef MSDP_HIVIEWDFX_HISYSEVENT_ENABLE
25 #include "cooperate_hisysevent.h"
26 #endif // MSDP_HIVIEWDFX_HISYSEVENT_ENABLE
27 #include "device_manager.h"
28 #include "dfs_session.h"
29 #include "securec.h"
30 #include "softbus_bus_center.h"
31 #include "softbus_error_code.h"
32
33 #include "devicestatus_define.h"
34 #include "i_ddm_adapter.h"
35 #include "utility.h"
36
37 #undef LOG_TAG
38 #define LOG_TAG "DSoftbusAdapterImpl"
39
40 namespace OHOS {
41 namespace Msdp {
42 namespace DeviceStatus {
43 namespace {
44 #define SERVER_SESSION_NAME "ohos.msdp.device_status.intention.serversession"
45 #define D_DEV_MGR DistributedHardware::DeviceManager::GetInstance()
46 const std::string CLIENT_SESSION_NAME { "ohos.msdp.device_status.intention.clientsession." };
47 constexpr size_t BIND_STRING_LENGTH { 15 };
48 constexpr size_t DEVICE_NAME_SIZE_MAX { 256 };
49 constexpr size_t PKG_NAME_SIZE_MAX { 65 };
50 constexpr int32_t MIN_BW { 80 * 1024 * 1024 };
51 constexpr int32_t LATENCY { 3000 };
52 constexpr int32_t SOCKET_SERVER { 0 };
53 constexpr int32_t SOCKET_CLIENT { 1 };
54 constexpr int32_t INVALID_SOCKET { -1 };
55 }
56
57 std::mutex DSoftbusAdapterImpl::mutex_;
58 std::shared_ptr<DSoftbusAdapterImpl> DSoftbusAdapterImpl::instance_;
59
GetInstance()60 std::shared_ptr<DSoftbusAdapterImpl> DSoftbusAdapterImpl::GetInstance()
61 {
62 if (instance_ == nullptr) {
63 std::lock_guard<std::mutex> lock(mutex_);
64 if (instance_ == nullptr) {
65 instance_ = std::make_shared<DSoftbusAdapterImpl>();
66 }
67 }
68 return instance_;
69 }
70
DestroyInstance()71 void DSoftbusAdapterImpl::DestroyInstance()
72 {
73 std::lock_guard<std::mutex> lock(mutex_);
74 instance_.reset();
75 }
76
~DSoftbusAdapterImpl()77 DSoftbusAdapterImpl::~DSoftbusAdapterImpl()
78 {
79 Disable();
80 }
81
Enable()82 int32_t DSoftbusAdapterImpl::Enable()
83 {
84 CALL_DEBUG_ENTER;
85 std::unique_lock<std::shared_mutex> lock(lock_);
86 return SetupServer();
87 }
88
Disable()89 void DSoftbusAdapterImpl::Disable()
90 {
91 CALL_DEBUG_ENTER;
92 std::unique_lock<std::shared_mutex> lock(lock_);
93 ShutdownServer();
94 }
95
AddObserver(std::shared_ptr<IDSoftbusObserver> observer)96 void DSoftbusAdapterImpl::AddObserver(std::shared_ptr<IDSoftbusObserver> observer)
97 {
98 CALL_DEBUG_ENTER;
99 std::unique_lock<std::shared_mutex> lock(lock_);
100 CHKPV(observer);
101 observers_.erase(Observer());
102 observers_.emplace(observer);
103 }
104
RemoveObserver(std::shared_ptr<IDSoftbusObserver> observer)105 void DSoftbusAdapterImpl::RemoveObserver(std::shared_ptr<IDSoftbusObserver> observer)
106 {
107 CALL_DEBUG_ENTER;
108 std::unique_lock<std::shared_mutex> lock(lock_);
109 if (auto iter = observers_.find(Observer(observer)); iter != observers_.end()) {
110 observers_.erase(iter);
111 }
112 observers_.erase(Observer());
113 }
114
CheckDeviceOnline(const std::string &networkId)115 bool DSoftbusAdapterImpl::CheckDeviceOnline(const std::string &networkId)
116 {
117 CALL_DEBUG_ENTER;
118 std::vector<DistributedHardware::DmDeviceInfo> deviceList;
119 if (D_DEV_MGR.GetTrustedDeviceList(FI_PKG_NAME, "", deviceList) != RET_OK) {
120 FI_HILOGE("GetTrustedDeviceList failed");
121 return false;
122 }
123 if (deviceList.empty()) {
124 FI_HILOGE("Trust device list size is invalid");
125 return false;
126 }
127 for (const auto &deviceInfo : deviceList) {
128 if (std::string(deviceInfo.networkId) == networkId) {
129 return true;
130 }
131 }
132 return false;
133 }
134
OpenSession(const std::string &networkId)135 int32_t DSoftbusAdapterImpl::OpenSession(const std::string &networkId)
136 {
137 CALL_DEBUG_ENTER;
138 std::unique_lock<std::shared_mutex> lock(lock_);
139 #ifdef ENABLE_PERFORMANCE_CHECK
140 auto startStamp = std::chrono::steady_clock::now();
141 #endif // ENABLE_PERFORMANCE_CHECK
142 if (!DSoftbusAdapterImpl::CheckDeviceOnline(networkId)) {
143 FI_HILOGE("CheckDeviceOnline failed, networkId:%{public}s", Utility::Anonymize(networkId).c_str());
144 return RET_ERR;
145 }
146 int32_t ret = OpenSessionLocked(networkId);
147 #ifdef ENABLE_PERFORMANCE_CHECK
148 auto openSessionDuration = std::chrono::duration_cast<std::chrono::milliseconds>(
149 std::chrono::steady_clock::now() - startStamp).count();
150 FI_HILOGI("[PERF] OpenSessionLocked ret:%{public}d, elapsed: %{public}lld ms", ret, openSessionDuration);
151 #endif // ENABLE_PERFORMANCE_CHECK
152 if (ret != RET_OK) {
153 #ifdef MSDP_HIVIEWDFX_HISYSEVENT_ENABLE
154 CooperateDFX::WriteOpenSession(OHOS::HiviewDFX::HiSysEvent::EventType::FAULT);
155 #endif // MSDP_HIVIEWDFX_HISYSEVENT_ENABLE
156 } else {
157 #ifdef MSDP_HIVIEWDFX_HISYSEVENT_ENABLE
158 CooperateDFX::WriteOpenSession(OHOS::HiviewDFX::HiSysEvent::EventType::BEHAVIOR);
159 #endif // MSDP_HIVIEWDFX_HISYSEVENT_ENABLE
160 }
161 return ret;
162 }
163
CloseSession(const std::string &networkId)164 void DSoftbusAdapterImpl::CloseSession(const std::string &networkId)
165 {
166 CALL_INFO_TRACE;
167 std::unique_lock<std::shared_mutex> lock(lock_);
168 if (auto iter = sessions_.find(networkId); iter != sessions_.end()) {
169 ::Shutdown(iter->second.socket_);
170 sessions_.erase(iter);
171 FI_HILOGI("Shutdown session(%{public}d, %{public}s)", iter->second.socket_,
172 Utility::Anonymize(networkId).c_str());
173 }
174 }
175
CloseAllSessions()176 void DSoftbusAdapterImpl::CloseAllSessions()
177 {
178 CALL_INFO_TRACE;
179 std::unique_lock<std::shared_mutex> lock(lock_);
180 CloseAllSessionsLocked();
181 }
182
FindConnection(const std::string &networkId)183 int32_t DSoftbusAdapterImpl::FindConnection(const std::string &networkId)
184 {
185 CALL_DEBUG_ENTER;
186 auto iter = sessions_.find(networkId);
187 return (iter != sessions_.end() ? iter->second.socket_ : -1);
188 }
189
SendPacket(const std::string &networkId, NetPacket &packet)190 int32_t DSoftbusAdapterImpl::SendPacket(const std::string &networkId, NetPacket &packet)
191 {
192 CALL_DEBUG_ENTER;
193 std::shared_lock<std::shared_mutex> lock(lock_);
194 int32_t socket = FindConnection(networkId);
195 if (socket < 0) {
196 FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(networkId).c_str());
197 return RET_ERR;
198 }
199 StreamBuffer buffer;
200 if (!packet.MakeData(buffer)) {
201 FI_HILOGE("Failed to buffer packet");
202 return RET_ERR;
203 }
204 if (buffer.Size() > MAX_PACKET_BUF_SIZE) {
205 FI_HILOGE("Packet is too large");
206 return RET_ERR;
207 }
208 int32_t ret = ::SendBytes(socket, buffer.Data(), buffer.Size());
209 if (ret != SOFTBUS_OK) {
210 FI_HILOGE("DSOFTBUS::SendBytes fail (%{public}d)", ret);
211 return RET_ERR;
212 }
213 return RET_OK;
214 }
215
SendParcel(const std::string &networkId, Parcel &parcel)216 int32_t DSoftbusAdapterImpl::SendParcel(const std::string &networkId, Parcel &parcel)
217 {
218 CALL_DEBUG_ENTER;
219 std::shared_lock<std::shared_mutex> lock(lock_);
220 int32_t socket = FindConnection(networkId);
221 if (socket < 0) {
222 FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(networkId).c_str());
223 return RET_ERR;
224 }
225 int32_t ret = ::SendBytes(socket, reinterpret_cast<const void*>(parcel.GetData()), parcel.GetDataSize());
226 if (ret != SOFTBUS_OK) {
227 FI_HILOGE("DSOFTBUS::SendBytes fail, error:%{public}d", ret);
228 return RET_ERR;
229 }
230 return RET_OK;
231 }
232
BroadcastPacket(NetPacket &packet)233 int32_t DSoftbusAdapterImpl::BroadcastPacket(NetPacket &packet)
234 {
235 CALL_INFO_TRACE;
236 std::shared_lock<std::shared_mutex> lock(lock_);
237 if (sessions_.empty()) {
238 FI_HILOGE("No session connected");
239 return RET_ERR;
240 }
241 StreamBuffer buffer;
242 if (!packet.MakeData(buffer)) {
243 FI_HILOGE("Failed to buffer packet");
244 return RET_ERR;
245 }
246 if (buffer.Size() > MAX_PACKET_BUF_SIZE) {
247 FI_HILOGE("Packet is too large");
248 return RET_ERR;
249 }
250 for (const auto &elem : sessions_) {
251 int32_t socket = elem.second.socket_;
252 if (socket < 0) {
253 FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(elem.first).c_str());
254 continue;
255 }
256 if (int32_t ret = ::SendBytes(socket, buffer.Data(), buffer.Size()); ret != SOFTBUS_OK) {
257 FI_HILOGE("DSOFTBUS::SendBytes fail (%{public}d)", ret);
258 continue;
259 }
260 FI_HILOGI("BroadcastPacket to networkId:%{public}s success", Utility::Anonymize(elem.first).c_str());
261 }
262 return RET_OK;
263 }
264
HasSessionExisted(const std::string &networkId)265 bool DSoftbusAdapterImpl::HasSessionExisted(const std::string &networkId)
266 {
267 CALL_DEBUG_ENTER;
268 auto iter = sessions_.find(networkId);
269 return (iter != sessions_.end() && iter->second.socket_ != INVALID_SOCKET);
270 }
271
OnBindLink(int32_t socket, PeerSocketInfo info)272 static void OnBindLink(int32_t socket, PeerSocketInfo info)
273 {
274 DSoftbusAdapterImpl::GetInstance()->OnBind(socket, info);
275 }
276
OnShutdownLink(int32_t socket, ShutdownReason reason)277 static void OnShutdownLink(int32_t socket, ShutdownReason reason)
278 {
279 DSoftbusAdapterImpl::GetInstance()->OnShutdown(socket, reason);
280 }
281
OnBytesAvailable(int32_t socket, const void *data, uint32_t dataLen)282 static void OnBytesAvailable(int32_t socket, const void *data, uint32_t dataLen)
283 {
284 DSoftbusAdapterImpl::GetInstance()->OnBytes(socket, data, dataLen);
285 }
286
OnBind(int32_t socket, PeerSocketInfo info)287 void DSoftbusAdapterImpl::OnBind(int32_t socket, PeerSocketInfo info)
288 {
289 CALL_INFO_TRACE;
290 std::unique_lock<std::shared_mutex> lock(lock_);
291 std::string networkId = info.networkId;
292 FI_HILOGI("Bind session(%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
293 if (auto iter = sessions_.find(networkId); iter != sessions_.cend()) {
294 if (iter->second.socket_ == socket) {
295 FI_HILOGI("(%{public}d, %{public}s) has bound", iter->second.socket_,
296 Utility::Anonymize(networkId).c_str());
297 return;
298 }
299 FI_HILOGI("(%{public}d, %{public}s) need erase", iter->second.socket_, Utility::Anonymize(networkId).c_str());
300 sessions_.erase(iter);
301 }
302 ConfigTcpAlive(socket);
303 sessions_.emplace(networkId, Session(socket));
304
305 for (const auto &item : observers_) {
306 std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
307 if (observer != nullptr) {
308 FI_HILOGD("Notify binding (%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
309 observer->OnBind(networkId);
310 }
311 }
312 }
313
OnShutdown(int32_t socket, ShutdownReason reason)314 void DSoftbusAdapterImpl::OnShutdown(int32_t socket, ShutdownReason reason)
315 {
316 CALL_INFO_TRACE;
317 std::unique_lock<std::shared_mutex> lock(lock_);
318 auto iter = std::find_if(sessions_.cbegin(), sessions_.cend(),
319 [socket](const auto &item) {
320 return (item.second.socket_ == socket);
321 });
322 if (iter == sessions_.cend()) {
323 FI_HILOGD("Session(%{public}d) is not bound", socket);
324 return;
325 }
326 std::string networkId = iter->first;
327 sessions_.erase(iter);
328 FI_HILOGI("Shutdown session(%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
329
330 for (const auto &item : observers_) {
331 std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
332 if (observer != nullptr) {
333 FI_HILOGD("Notify shutdown of session(%{public}d, %{public}s)",
334 socket, Utility::Anonymize(networkId).c_str());
335 observer->OnShutdown(networkId);
336 }
337 }
338 }
339
OnBytes(int32_t socket, const void *data, uint32_t dataLen)340 void DSoftbusAdapterImpl::OnBytes(int32_t socket, const void *data, uint32_t dataLen)
341 {
342 CALL_DEBUG_ENTER;
343 std::shared_lock<std::shared_mutex> lock(lock_);
344 auto iter = std::find_if(sessions_.begin(), sessions_.end(),
345 [socket](const auto &item) {
346 return (item.second.socket_ == socket);
347 });
348 if (iter == sessions_.end()) {
349 FI_HILOGE("Invalid socket: %{public}d", socket);
350 return;
351 }
352 const std::string networkId = iter->first;
353
354 if (*reinterpret_cast<const uint32_t*>(data) < static_cast<uint32_t>(MessageId::MAX_MESSAGE_ID)) {
355 CircleStreamBuffer &circleBuffer = iter->second.buffer_;
356
357 if (!circleBuffer.Write(reinterpret_cast<const char*>(data), dataLen)) {
358 FI_HILOGE("Failed to write buffer");
359 }
360 HandleSessionData(networkId, circleBuffer);
361 } else {
362 HandleRawData(networkId, data, dataLen);
363 }
364 }
365
InitSocket(SocketInfo info, int32_t socketType, int32_t &socket)366 int32_t DSoftbusAdapterImpl::InitSocket(SocketInfo info, int32_t socketType, int32_t &socket)
367 {
368 CALL_INFO_TRACE;
369 socket = ::Socket(info);
370 if (socket < 0) {
371 FI_HILOGE("DSOFTBUS::Socket failed");
372 return RET_ERR;
373 }
374 QosTV socketQos[] {
375 { .qos = QOS_TYPE_MIN_BW, .value = MIN_BW },
376 { .qos = QOS_TYPE_MAX_LATENCY, .value = LATENCY },
377 { .qos = QOS_TYPE_MIN_LATENCY, .value = LATENCY },
378 };
379 ISocketListener listener {
380 .OnBind = OnBindLink,
381 .OnShutdown = OnShutdownLink,
382 .OnBytes = OnBytesAvailable,
383 };
384 int32_t ret { -1 };
385
386 if (socketType == SOCKET_SERVER) {
387 ret = ::Listen(socket, socketQos, sizeof(socketQos) / sizeof(socketQos[0]), &listener);
388 if (ret != 0) {
389 FI_HILOGE("DSOFTBUS::Listen failed");
390 }
391 } else if (socketType == SOCKET_CLIENT) {
392 ret = ::Bind(socket, socketQos, sizeof(socketQos) / sizeof(socketQos[0]), &listener);
393 if (ret != 0) {
394 FI_HILOGE("DSOFTBUS::Bind failed");
395 }
396 }
397 if (ret != 0) {
398 ::Shutdown(socket);
399 socket = -1;
400 return ret;
401 }
402 return RET_OK;
403 }
404
SetupServer()405 int32_t DSoftbusAdapterImpl::SetupServer()
406 {
407 CALL_INFO_TRACE;
408 if (socketFd_ > 0) {
409 return RET_OK;
410 }
411 char name[DEVICE_NAME_SIZE_MAX] { SERVER_SESSION_NAME };
412 char pkgName[PKG_NAME_SIZE_MAX] { FI_PKG_NAME };
413 FI_HILOGI("Server session name: \'%{public}s\'", name);
414 FI_HILOGI("Package name: \'%{public}s\'", pkgName);
415 SocketInfo info {
416 .name = name,
417 .pkgName = pkgName,
418 .dataType = DATA_TYPE_BYTES
419 };
420 int32_t ret = InitSocket(info, SOCKET_SERVER, socketFd_);
421 if (ret != RET_OK) {
422 FI_HILOGE("Failed to setup server");
423 return ret;
424 }
425 return RET_OK;
426 }
427
ShutdownServer()428 void DSoftbusAdapterImpl::ShutdownServer()
429 {
430 CALL_INFO_TRACE;
431 CloseAllSessionsLocked();
432 if (socketFd_ > 0) {
433 ::Shutdown(socketFd_);
434 socketFd_ = -1;
435 }
436 }
437
OpenSessionLocked(const std::string &networkId)438 int32_t DSoftbusAdapterImpl::OpenSessionLocked(const std::string &networkId)
439 {
440 CALL_INFO_TRACE;
441 if (sessions_.find(networkId) != sessions_.end()) {
442 FI_HILOGD("InputSoftbus session has already opened");
443 return RET_OK;
444 }
445 std::string sessionName = CLIENT_SESSION_NAME + networkId.substr(0, BIND_STRING_LENGTH);
446 char name[DEVICE_NAME_SIZE_MAX] {};
447 if (strcpy_s(name, sizeof(name), sessionName.c_str()) != EOK) {
448 FI_HILOGE("Invalid name:%{public}s", sessionName.c_str());
449 return RET_ERR;
450 }
451 char peerName[DEVICE_NAME_SIZE_MAX] { SERVER_SESSION_NAME };
452 char peerNetworkId[PKG_NAME_SIZE_MAX] {};
453 if (strcpy_s(peerNetworkId, sizeof(peerNetworkId), networkId.c_str()) != EOK) {
454 FI_HILOGE("Invalid peerNetworkId:%{public}s", Utility::Anonymize(networkId).c_str());
455 return RET_ERR;
456 }
457 char pkgName[PKG_NAME_SIZE_MAX] { FI_PKG_NAME };
458 FI_HILOGI("Client session name: \'%{public}s\'", name);
459 FI_HILOGI("Peer name: \'%{public}s\'", peerName);
460 FI_HILOGI("Peer network id: \'%{public}s\'", Utility::Anonymize(peerNetworkId).c_str());
461 FI_HILOGI("Package name: \'%{public}s\'", pkgName);
462 SocketInfo info {
463 .name = name,
464 .peerName = peerName,
465 .peerNetworkId = peerNetworkId,
466 .pkgName = pkgName,
467 .dataType = DATA_TYPE_BYTES
468 };
469 int32_t socket { -1 };
470
471 int32_t ret = InitSocket(info, SOCKET_CLIENT, socket);
472 if (ret != RET_OK) {
473 FI_HILOGE("Failed to bind %{public}s", Utility::Anonymize(networkId).c_str());
474 return ret;
475 }
476 ConfigTcpAlive(socket);
477 FI_HILOGI("Connected to (%{public}s,%{public}d)", Utility::Anonymize(networkId).c_str(), socket);
478 sessions_.emplace(networkId, Session(socket));
479 OnConnectedLocked(networkId);
480 return RET_OK;
481 }
482
OnConnectedLocked(const std::string &networkId)483 void DSoftbusAdapterImpl::OnConnectedLocked(const std::string &networkId)
484 {
485 CALL_INFO_TRACE;
486 for (const auto &item : observers_) {
487 std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
488 CHKPC(observer);
489 FI_HILOGI("Notify connected to networkId:%{public}s", Utility::Anonymize(networkId).c_str());
490 observer->OnConnected(networkId);
491 }
492 }
493
CloseAllSessionsLocked()494 void DSoftbusAdapterImpl::CloseAllSessionsLocked()
495 {
496 std::for_each(sessions_.begin(), sessions_.end(), [](const auto &item) {
497 ::Shutdown(item.second.socket_);
498 FI_HILOGI("Shutdown connection with (%{public}s,%{public}d)",
499 Utility::Anonymize(item.first).c_str(), item.second.socket_);
500 });
501 sessions_.clear();
502 }
503
ConfigTcpAlive(int32_t socket)504 void DSoftbusAdapterImpl::ConfigTcpAlive(int32_t socket)
505 {
506 CALL_DEBUG_ENTER;
507 if (socket < 0) {
508 FI_HILOGW("Config tcp alive, invalid sessionId");
509 return;
510 }
511 int32_t handle { -1 };
512 int32_t result = GetSessionHandle(socket, &handle);
513 if (result != RET_OK) {
514 FI_HILOGE("Failed to get the session handle, socketId:%{public}d, handle:%{public}d", socket, handle);
515 return;
516 }
517 int32_t keepAliveTimeout { 10 };
518 result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPIDLE, &keepAliveTimeout, sizeof(keepAliveTimeout));
519 if (result != RET_OK) {
520 FI_HILOGE("Config tcp alive, setsockopt set idle failed, result:%{public}d", result);
521 return;
522 }
523 int32_t keepAliveCount { 5 };
524 result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPCNT, &keepAliveCount, sizeof(keepAliveCount));
525 if (result != RET_OK) {
526 FI_HILOGE("Config tcp alive, setsockopt set cnt failed");
527 return;
528 }
529 int32_t interval { 1 };
530 result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPINTVL, &interval, sizeof(interval));
531 if (result != RET_OK) {
532 FI_HILOGE("Config tcp alive, setsockopt set intvl failed");
533 return;
534 }
535 int32_t enable { 1 };
536 result = setsockopt(handle, SOL_SOCKET, SO_KEEPALIVE, &enable, sizeof(enable));
537 if (result != RET_OK) {
538 FI_HILOGE("Config tcp alive, setsockopt enable alive failed");
539 return;
540 }
541 int32_t TimeoutMs { 15000 };
542 result = setsockopt(handle, IPPROTO_TCP, TCP_USER_TIMEOUT, &TimeoutMs, sizeof(TimeoutMs));
543 if (result != RET_OK) {
544 FI_HILOGE("Failed to enable setsockopt for timeout, %{public}d", result);
545 return;
546 }
547 }
548
HandleSessionData(const std::string &networkId, CircleStreamBuffer &circleBuffer)549 void DSoftbusAdapterImpl::HandleSessionData(const std::string &networkId, CircleStreamBuffer &circleBuffer)
550 {
551 CALL_DEBUG_ENTER;
552 while (circleBuffer.ResidualSize() >= static_cast<int32_t>(sizeof(PackHead))) {
553 const char *buf = circleBuffer.ReadBuf();
554 const PackHead *head = reinterpret_cast<const PackHead *>(buf);
555
556 if ((head->size < 0) || (static_cast<size_t>(head->size) > MAX_PACKET_BUF_SIZE)) {
557 FI_HILOGE("Corrupted net packet");
558 break;
559 }
560 if ((head->size + static_cast<int32_t>(sizeof(PackHead))) > circleBuffer.ResidualSize()) {
561 FI_HILOGI("Incomplete package, package size:%{public}d, residual size:%{public}d",
562 (head->size + static_cast<int32_t>(sizeof(PackHead))), circleBuffer.ResidualSize());
563 break;
564 }
565 NetPacket packet(head->idMsg);
566
567 if ((head->size > 0) && !packet.Write(&buf[sizeof(PackHead)], head->size)) {
568 FI_HILOGE("Failed to fill packet, PacketSize:%{public}d", head->size);
569 break;
570 }
571 circleBuffer.SeekReadPos(packet.GetPacketLength());
572 HandlePacket(networkId, packet);
573 }
574 }
575
HandlePacket(const std::string &networkId, NetPacket &packet)576 void DSoftbusAdapterImpl::HandlePacket(const std::string &networkId, NetPacket &packet)
577 {
578 CALL_DEBUG_ENTER;
579 for (const auto &item : observers_) {
580 std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
581 if ((observer != nullptr) &&
582 observer->OnPacket(networkId, packet)) {
583 return;
584 }
585 }
586 }
587
HandleRawData(const std::string &networkId, const void *data, uint32_t dataLen)588 void DSoftbusAdapterImpl::HandleRawData(const std::string &networkId, const void *data, uint32_t dataLen)
589 {
590 CALL_DEBUG_ENTER;
591 for (const auto &item : observers_) {
592 std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
593 if ((observer != nullptr) &&
594 observer->OnRawData(networkId, data, dataLen)) {
595 return;
596 }
597 }
598 }
599 } // namespace DeviceStatus
600 } // namespace Msdp
601 } // namespace OHOS
602