1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "dsoftbus_adapter_impl.h"
17 
18 #ifdef ENABLE_PERFORMANCE_CHECK
19 #include <chrono>
20 #endif // ENABLE_PERFORMANCE_CHECK
21 
22 #include <netinet/in.h>
23 #include <netinet/tcp.h>
24 #ifdef MSDP_HIVIEWDFX_HISYSEVENT_ENABLE
25 #include "cooperate_hisysevent.h"
26 #endif // MSDP_HIVIEWDFX_HISYSEVENT_ENABLE
27 #include "device_manager.h"
28 #include "dfs_session.h"
29 #include "securec.h"
30 #include "softbus_bus_center.h"
31 #include "softbus_error_code.h"
32 
33 #include "devicestatus_define.h"
34 #include "i_ddm_adapter.h"
35 #include "utility.h"
36 
37 #undef LOG_TAG
38 #define LOG_TAG "DSoftbusAdapterImpl"
39 
40 namespace OHOS {
41 namespace Msdp {
42 namespace DeviceStatus {
43 namespace {
44 #define SERVER_SESSION_NAME "ohos.msdp.device_status.intention.serversession"
45 #define D_DEV_MGR DistributedHardware::DeviceManager::GetInstance()
46 const std::string CLIENT_SESSION_NAME { "ohos.msdp.device_status.intention.clientsession." };
47 constexpr size_t BIND_STRING_LENGTH { 15 };
48 constexpr size_t DEVICE_NAME_SIZE_MAX { 256 };
49 constexpr size_t PKG_NAME_SIZE_MAX { 65 };
50 constexpr int32_t MIN_BW { 80 * 1024 * 1024 };
51 constexpr int32_t LATENCY { 3000 };
52 constexpr int32_t SOCKET_SERVER { 0 };
53 constexpr int32_t SOCKET_CLIENT { 1 };
54 constexpr int32_t INVALID_SOCKET { -1 };
55 }
56 
57 std::mutex DSoftbusAdapterImpl::mutex_;
58 std::shared_ptr<DSoftbusAdapterImpl> DSoftbusAdapterImpl::instance_;
59 
GetInstance()60 std::shared_ptr<DSoftbusAdapterImpl> DSoftbusAdapterImpl::GetInstance()
61 {
62     if (instance_ == nullptr) {
63         std::lock_guard<std::mutex> lock(mutex_);
64         if (instance_ == nullptr) {
65             instance_ = std::make_shared<DSoftbusAdapterImpl>();
66         }
67     }
68     return instance_;
69 }
70 
DestroyInstance()71 void DSoftbusAdapterImpl::DestroyInstance()
72 {
73     std::lock_guard<std::mutex> lock(mutex_);
74     instance_.reset();
75 }
76 
~DSoftbusAdapterImpl()77 DSoftbusAdapterImpl::~DSoftbusAdapterImpl()
78 {
79     Disable();
80 }
81 
Enable()82 int32_t DSoftbusAdapterImpl::Enable()
83 {
84     CALL_DEBUG_ENTER;
85     std::unique_lock<std::shared_mutex> lock(lock_);
86     return SetupServer();
87 }
88 
Disable()89 void DSoftbusAdapterImpl::Disable()
90 {
91     CALL_DEBUG_ENTER;
92     std::unique_lock<std::shared_mutex> lock(lock_);
93     ShutdownServer();
94 }
95 
AddObserver(std::shared_ptr<IDSoftbusObserver> observer)96 void DSoftbusAdapterImpl::AddObserver(std::shared_ptr<IDSoftbusObserver> observer)
97 {
98     CALL_DEBUG_ENTER;
99     std::unique_lock<std::shared_mutex> lock(lock_);
100     CHKPV(observer);
101     observers_.erase(Observer());
102     observers_.emplace(observer);
103 }
104 
RemoveObserver(std::shared_ptr<IDSoftbusObserver> observer)105 void DSoftbusAdapterImpl::RemoveObserver(std::shared_ptr<IDSoftbusObserver> observer)
106 {
107     CALL_DEBUG_ENTER;
108     std::unique_lock<std::shared_mutex> lock(lock_);
109     if (auto iter = observers_.find(Observer(observer)); iter != observers_.end()) {
110         observers_.erase(iter);
111     }
112     observers_.erase(Observer());
113 }
114 
CheckDeviceOnline(const std::string &networkId)115 bool DSoftbusAdapterImpl::CheckDeviceOnline(const std::string &networkId)
116 {
117     CALL_DEBUG_ENTER;
118     std::vector<DistributedHardware::DmDeviceInfo> deviceList;
119     if (D_DEV_MGR.GetTrustedDeviceList(FI_PKG_NAME, "", deviceList) != RET_OK) {
120         FI_HILOGE("GetTrustedDeviceList failed");
121         return false;
122     }
123     if (deviceList.empty()) {
124         FI_HILOGE("Trust device list size is invalid");
125         return false;
126     }
127     for (const auto &deviceInfo : deviceList) {
128         if (std::string(deviceInfo.networkId) == networkId) {
129             return true;
130         }
131     }
132     return false;
133 }
134 
OpenSession(const std::string &networkId)135 int32_t DSoftbusAdapterImpl::OpenSession(const std::string &networkId)
136 {
137     CALL_DEBUG_ENTER;
138     std::unique_lock<std::shared_mutex> lock(lock_);
139 #ifdef ENABLE_PERFORMANCE_CHECK
140     auto startStamp = std::chrono::steady_clock::now();
141 #endif // ENABLE_PERFORMANCE_CHECK
142     if (!DSoftbusAdapterImpl::CheckDeviceOnline(networkId)) {
143         FI_HILOGE("CheckDeviceOnline failed, networkId:%{public}s", Utility::Anonymize(networkId).c_str());
144         return RET_ERR;
145     }
146     int32_t ret = OpenSessionLocked(networkId);
147 #ifdef ENABLE_PERFORMANCE_CHECK
148     auto openSessionDuration = std::chrono::duration_cast<std::chrono::milliseconds>(
149         std::chrono::steady_clock::now() - startStamp).count();
150     FI_HILOGI("[PERF] OpenSessionLocked ret:%{public}d, elapsed: %{public}lld ms", ret, openSessionDuration);
151 #endif // ENABLE_PERFORMANCE_CHECK
152     if (ret != RET_OK) {
153 #ifdef MSDP_HIVIEWDFX_HISYSEVENT_ENABLE
154         CooperateDFX::WriteOpenSession(OHOS::HiviewDFX::HiSysEvent::EventType::FAULT);
155 #endif // MSDP_HIVIEWDFX_HISYSEVENT_ENABLE
156     } else {
157 #ifdef MSDP_HIVIEWDFX_HISYSEVENT_ENABLE
158         CooperateDFX::WriteOpenSession(OHOS::HiviewDFX::HiSysEvent::EventType::BEHAVIOR);
159 #endif // MSDP_HIVIEWDFX_HISYSEVENT_ENABLE
160     }
161     return ret;
162 }
163 
CloseSession(const std::string &networkId)164 void DSoftbusAdapterImpl::CloseSession(const std::string &networkId)
165 {
166     CALL_INFO_TRACE;
167     std::unique_lock<std::shared_mutex> lock(lock_);
168     if (auto iter = sessions_.find(networkId); iter != sessions_.end()) {
169         ::Shutdown(iter->second.socket_);
170         sessions_.erase(iter);
171         FI_HILOGI("Shutdown session(%{public}d, %{public}s)", iter->second.socket_,
172             Utility::Anonymize(networkId).c_str());
173     }
174 }
175 
CloseAllSessions()176 void DSoftbusAdapterImpl::CloseAllSessions()
177 {
178     CALL_INFO_TRACE;
179     std::unique_lock<std::shared_mutex> lock(lock_);
180     CloseAllSessionsLocked();
181 }
182 
FindConnection(const std::string &networkId)183 int32_t DSoftbusAdapterImpl::FindConnection(const std::string &networkId)
184 {
185     CALL_DEBUG_ENTER;
186     auto iter = sessions_.find(networkId);
187     return (iter != sessions_.end() ? iter->second.socket_ : -1);
188 }
189 
SendPacket(const std::string &networkId, NetPacket &packet)190 int32_t DSoftbusAdapterImpl::SendPacket(const std::string &networkId, NetPacket &packet)
191 {
192     CALL_DEBUG_ENTER;
193     std::shared_lock<std::shared_mutex> lock(lock_);
194     int32_t socket = FindConnection(networkId);
195     if (socket < 0) {
196         FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(networkId).c_str());
197         return RET_ERR;
198     }
199     StreamBuffer buffer;
200     if (!packet.MakeData(buffer)) {
201         FI_HILOGE("Failed to buffer packet");
202         return RET_ERR;
203     }
204     if (buffer.Size() > MAX_PACKET_BUF_SIZE) {
205         FI_HILOGE("Packet is too large");
206         return RET_ERR;
207     }
208     int32_t ret = ::SendBytes(socket, buffer.Data(), buffer.Size());
209     if (ret != SOFTBUS_OK) {
210         FI_HILOGE("DSOFTBUS::SendBytes fail (%{public}d)", ret);
211         return RET_ERR;
212     }
213     return RET_OK;
214 }
215 
SendParcel(const std::string &networkId, Parcel &parcel)216 int32_t DSoftbusAdapterImpl::SendParcel(const std::string &networkId, Parcel &parcel)
217 {
218     CALL_DEBUG_ENTER;
219     std::shared_lock<std::shared_mutex> lock(lock_);
220     int32_t socket = FindConnection(networkId);
221     if (socket < 0) {
222         FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(networkId).c_str());
223         return RET_ERR;
224     }
225     int32_t ret = ::SendBytes(socket, reinterpret_cast<const void*>(parcel.GetData()), parcel.GetDataSize());
226     if (ret != SOFTBUS_OK) {
227         FI_HILOGE("DSOFTBUS::SendBytes fail, error:%{public}d", ret);
228         return RET_ERR;
229     }
230     return RET_OK;
231 }
232 
BroadcastPacket(NetPacket &packet)233 int32_t DSoftbusAdapterImpl::BroadcastPacket(NetPacket &packet)
234 {
235     CALL_INFO_TRACE;
236     std::shared_lock<std::shared_mutex> lock(lock_);
237     if (sessions_.empty()) {
238         FI_HILOGE("No session connected");
239         return RET_ERR;
240     }
241     StreamBuffer buffer;
242     if (!packet.MakeData(buffer)) {
243         FI_HILOGE("Failed to buffer packet");
244         return RET_ERR;
245     }
246     if (buffer.Size() > MAX_PACKET_BUF_SIZE) {
247         FI_HILOGE("Packet is too large");
248         return RET_ERR;
249     }
250     for (const auto &elem : sessions_) {
251         int32_t socket = elem.second.socket_;
252         if (socket < 0) {
253             FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(elem.first).c_str());
254             continue;
255         }
256         if (int32_t ret = ::SendBytes(socket, buffer.Data(), buffer.Size()); ret != SOFTBUS_OK) {
257             FI_HILOGE("DSOFTBUS::SendBytes fail (%{public}d)", ret);
258             continue;
259         }
260         FI_HILOGI("BroadcastPacket to networkId:%{public}s success", Utility::Anonymize(elem.first).c_str());
261     }
262     return RET_OK;
263 }
264 
HasSessionExisted(const std::string &networkId)265 bool DSoftbusAdapterImpl::HasSessionExisted(const std::string &networkId)
266 {
267     CALL_DEBUG_ENTER;
268     auto iter = sessions_.find(networkId);
269     return (iter != sessions_.end() && iter->second.socket_ != INVALID_SOCKET);
270 }
271 
OnBindLink(int32_t socket, PeerSocketInfo info)272 static void OnBindLink(int32_t socket, PeerSocketInfo info)
273 {
274     DSoftbusAdapterImpl::GetInstance()->OnBind(socket, info);
275 }
276 
OnShutdownLink(int32_t socket, ShutdownReason reason)277 static void OnShutdownLink(int32_t socket, ShutdownReason reason)
278 {
279     DSoftbusAdapterImpl::GetInstance()->OnShutdown(socket, reason);
280 }
281 
OnBytesAvailable(int32_t socket, const void *data, uint32_t dataLen)282 static void OnBytesAvailable(int32_t socket, const void *data, uint32_t dataLen)
283 {
284     DSoftbusAdapterImpl::GetInstance()->OnBytes(socket, data, dataLen);
285 }
286 
OnBind(int32_t socket, PeerSocketInfo info)287 void DSoftbusAdapterImpl::OnBind(int32_t socket, PeerSocketInfo info)
288 {
289     CALL_INFO_TRACE;
290     std::unique_lock<std::shared_mutex> lock(lock_);
291     std::string networkId = info.networkId;
292     FI_HILOGI("Bind session(%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
293     if (auto iter = sessions_.find(networkId); iter != sessions_.cend()) {
294         if (iter->second.socket_ == socket) {
295             FI_HILOGI("(%{public}d, %{public}s) has bound", iter->second.socket_,
296                 Utility::Anonymize(networkId).c_str());
297             return;
298         }
299         FI_HILOGI("(%{public}d, %{public}s) need erase", iter->second.socket_, Utility::Anonymize(networkId).c_str());
300         sessions_.erase(iter);
301     }
302     ConfigTcpAlive(socket);
303     sessions_.emplace(networkId, Session(socket));
304 
305     for (const auto &item : observers_) {
306         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
307         if (observer != nullptr) {
308             FI_HILOGD("Notify binding (%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
309             observer->OnBind(networkId);
310         }
311     }
312 }
313 
OnShutdown(int32_t socket, ShutdownReason reason)314 void DSoftbusAdapterImpl::OnShutdown(int32_t socket, ShutdownReason reason)
315 {
316     CALL_INFO_TRACE;
317     std::unique_lock<std::shared_mutex> lock(lock_);
318     auto iter = std::find_if(sessions_.cbegin(), sessions_.cend(),
319         [socket](const auto &item) {
320             return (item.second.socket_ == socket);
321         });
322     if (iter == sessions_.cend()) {
323         FI_HILOGD("Session(%{public}d) is not bound", socket);
324         return;
325     }
326     std::string networkId = iter->first;
327     sessions_.erase(iter);
328     FI_HILOGI("Shutdown session(%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
329 
330     for (const auto &item : observers_) {
331         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
332         if (observer != nullptr) {
333             FI_HILOGD("Notify shutdown of session(%{public}d, %{public}s)",
334                 socket, Utility::Anonymize(networkId).c_str());
335             observer->OnShutdown(networkId);
336         }
337     }
338 }
339 
OnBytes(int32_t socket, const void *data, uint32_t dataLen)340 void DSoftbusAdapterImpl::OnBytes(int32_t socket, const void *data, uint32_t dataLen)
341 {
342     CALL_DEBUG_ENTER;
343     std::shared_lock<std::shared_mutex> lock(lock_);
344     auto iter = std::find_if(sessions_.begin(), sessions_.end(),
345         [socket](const auto &item) {
346             return (item.second.socket_ == socket);
347         });
348     if (iter == sessions_.end()) {
349         FI_HILOGE("Invalid socket: %{public}d", socket);
350         return;
351     }
352     const std::string networkId = iter->first;
353 
354     if (*reinterpret_cast<const uint32_t*>(data) < static_cast<uint32_t>(MessageId::MAX_MESSAGE_ID)) {
355         CircleStreamBuffer &circleBuffer = iter->second.buffer_;
356 
357         if (!circleBuffer.Write(reinterpret_cast<const char*>(data), dataLen)) {
358             FI_HILOGE("Failed to write buffer");
359         }
360         HandleSessionData(networkId, circleBuffer);
361     } else {
362         HandleRawData(networkId, data, dataLen);
363     }
364 }
365 
InitSocket(SocketInfo info, int32_t socketType, int32_t &socket)366 int32_t DSoftbusAdapterImpl::InitSocket(SocketInfo info, int32_t socketType, int32_t &socket)
367 {
368     CALL_INFO_TRACE;
369     socket = ::Socket(info);
370     if (socket < 0) {
371         FI_HILOGE("DSOFTBUS::Socket failed");
372         return RET_ERR;
373     }
374     QosTV socketQos[] {
375         { .qos = QOS_TYPE_MIN_BW, .value = MIN_BW },
376         { .qos = QOS_TYPE_MAX_LATENCY, .value = LATENCY },
377         { .qos = QOS_TYPE_MIN_LATENCY, .value = LATENCY },
378     };
379     ISocketListener listener {
380         .OnBind = OnBindLink,
381         .OnShutdown = OnShutdownLink,
382         .OnBytes = OnBytesAvailable,
383     };
384     int32_t ret { -1 };
385 
386     if (socketType == SOCKET_SERVER) {
387         ret = ::Listen(socket, socketQos, sizeof(socketQos) / sizeof(socketQos[0]), &listener);
388         if (ret != 0) {
389             FI_HILOGE("DSOFTBUS::Listen failed");
390         }
391     } else if (socketType == SOCKET_CLIENT) {
392         ret = ::Bind(socket, socketQos, sizeof(socketQos) / sizeof(socketQos[0]), &listener);
393         if (ret != 0) {
394             FI_HILOGE("DSOFTBUS::Bind failed");
395         }
396     }
397     if (ret != 0) {
398         ::Shutdown(socket);
399         socket = -1;
400         return ret;
401     }
402     return RET_OK;
403 }
404 
SetupServer()405 int32_t DSoftbusAdapterImpl::SetupServer()
406 {
407     CALL_INFO_TRACE;
408     if (socketFd_ > 0) {
409         return RET_OK;
410     }
411     char name[DEVICE_NAME_SIZE_MAX] { SERVER_SESSION_NAME };
412     char pkgName[PKG_NAME_SIZE_MAX] { FI_PKG_NAME };
413     FI_HILOGI("Server session name: \'%{public}s\'", name);
414     FI_HILOGI("Package name: \'%{public}s\'", pkgName);
415     SocketInfo info {
416         .name = name,
417         .pkgName = pkgName,
418         .dataType = DATA_TYPE_BYTES
419     };
420     int32_t ret = InitSocket(info, SOCKET_SERVER, socketFd_);
421     if (ret != RET_OK) {
422         FI_HILOGE("Failed to setup server");
423         return ret;
424     }
425     return RET_OK;
426 }
427 
ShutdownServer()428 void DSoftbusAdapterImpl::ShutdownServer()
429 {
430     CALL_INFO_TRACE;
431     CloseAllSessionsLocked();
432     if (socketFd_ > 0) {
433         ::Shutdown(socketFd_);
434         socketFd_ = -1;
435     }
436 }
437 
OpenSessionLocked(const std::string &networkId)438 int32_t DSoftbusAdapterImpl::OpenSessionLocked(const std::string &networkId)
439 {
440     CALL_INFO_TRACE;
441     if (sessions_.find(networkId) != sessions_.end()) {
442         FI_HILOGD("InputSoftbus session has already opened");
443         return RET_OK;
444     }
445     std::string sessionName = CLIENT_SESSION_NAME + networkId.substr(0, BIND_STRING_LENGTH);
446     char name[DEVICE_NAME_SIZE_MAX] {};
447     if (strcpy_s(name, sizeof(name), sessionName.c_str()) != EOK) {
448         FI_HILOGE("Invalid name:%{public}s", sessionName.c_str());
449         return RET_ERR;
450     }
451     char peerName[DEVICE_NAME_SIZE_MAX] { SERVER_SESSION_NAME };
452     char peerNetworkId[PKG_NAME_SIZE_MAX] {};
453     if (strcpy_s(peerNetworkId, sizeof(peerNetworkId), networkId.c_str()) != EOK) {
454         FI_HILOGE("Invalid peerNetworkId:%{public}s", Utility::Anonymize(networkId).c_str());
455         return RET_ERR;
456     }
457     char pkgName[PKG_NAME_SIZE_MAX] { FI_PKG_NAME };
458     FI_HILOGI("Client session name: \'%{public}s\'", name);
459     FI_HILOGI("Peer name: \'%{public}s\'", peerName);
460     FI_HILOGI("Peer network id: \'%{public}s\'", Utility::Anonymize(peerNetworkId).c_str());
461     FI_HILOGI("Package name: \'%{public}s\'", pkgName);
462     SocketInfo info {
463         .name = name,
464         .peerName = peerName,
465         .peerNetworkId = peerNetworkId,
466         .pkgName = pkgName,
467         .dataType = DATA_TYPE_BYTES
468     };
469     int32_t socket { -1 };
470 
471     int32_t ret = InitSocket(info, SOCKET_CLIENT, socket);
472     if (ret != RET_OK) {
473         FI_HILOGE("Failed to bind %{public}s", Utility::Anonymize(networkId).c_str());
474         return ret;
475     }
476     ConfigTcpAlive(socket);
477     FI_HILOGI("Connected to (%{public}s,%{public}d)", Utility::Anonymize(networkId).c_str(), socket);
478     sessions_.emplace(networkId, Session(socket));
479     OnConnectedLocked(networkId);
480     return RET_OK;
481 }
482 
OnConnectedLocked(const std::string &networkId)483 void DSoftbusAdapterImpl::OnConnectedLocked(const std::string &networkId)
484 {
485     CALL_INFO_TRACE;
486     for (const auto &item : observers_) {
487         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
488         CHKPC(observer);
489         FI_HILOGI("Notify connected to networkId:%{public}s", Utility::Anonymize(networkId).c_str());
490         observer->OnConnected(networkId);
491     }
492 }
493 
CloseAllSessionsLocked()494 void DSoftbusAdapterImpl::CloseAllSessionsLocked()
495 {
496     std::for_each(sessions_.begin(), sessions_.end(), [](const auto &item) {
497         ::Shutdown(item.second.socket_);
498         FI_HILOGI("Shutdown connection with (%{public}s,%{public}d)",
499             Utility::Anonymize(item.first).c_str(), item.second.socket_);
500     });
501     sessions_.clear();
502 }
503 
ConfigTcpAlive(int32_t socket)504 void DSoftbusAdapterImpl::ConfigTcpAlive(int32_t socket)
505 {
506     CALL_DEBUG_ENTER;
507     if (socket < 0) {
508         FI_HILOGW("Config tcp alive, invalid sessionId");
509         return;
510     }
511     int32_t handle { -1 };
512     int32_t result = GetSessionHandle(socket, &handle);
513     if (result != RET_OK) {
514         FI_HILOGE("Failed to get the session handle, socketId:%{public}d, handle:%{public}d", socket, handle);
515         return;
516     }
517     int32_t keepAliveTimeout { 10 };
518     result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPIDLE, &keepAliveTimeout, sizeof(keepAliveTimeout));
519     if (result != RET_OK) {
520         FI_HILOGE("Config tcp alive, setsockopt set idle failed, result:%{public}d", result);
521         return;
522     }
523     int32_t keepAliveCount { 5 };
524     result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPCNT, &keepAliveCount, sizeof(keepAliveCount));
525     if (result != RET_OK) {
526         FI_HILOGE("Config tcp alive, setsockopt set cnt failed");
527         return;
528     }
529     int32_t interval { 1 };
530     result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPINTVL, &interval, sizeof(interval));
531     if (result != RET_OK) {
532         FI_HILOGE("Config tcp alive, setsockopt set intvl failed");
533         return;
534     }
535     int32_t enable { 1 };
536     result = setsockopt(handle, SOL_SOCKET, SO_KEEPALIVE, &enable, sizeof(enable));
537     if (result != RET_OK) {
538         FI_HILOGE("Config tcp alive, setsockopt enable alive failed");
539         return;
540     }
541     int32_t TimeoutMs { 15000 };
542     result = setsockopt(handle, IPPROTO_TCP, TCP_USER_TIMEOUT, &TimeoutMs, sizeof(TimeoutMs));
543     if (result != RET_OK) {
544         FI_HILOGE("Failed to enable setsockopt for timeout, %{public}d", result);
545         return;
546     }
547 }
548 
HandleSessionData(const std::string &networkId, CircleStreamBuffer &circleBuffer)549 void DSoftbusAdapterImpl::HandleSessionData(const std::string &networkId, CircleStreamBuffer &circleBuffer)
550 {
551     CALL_DEBUG_ENTER;
552     while (circleBuffer.ResidualSize() >= static_cast<int32_t>(sizeof(PackHead))) {
553         const char *buf = circleBuffer.ReadBuf();
554         const PackHead *head = reinterpret_cast<const PackHead *>(buf);
555 
556         if ((head->size < 0) || (static_cast<size_t>(head->size) > MAX_PACKET_BUF_SIZE)) {
557             FI_HILOGE("Corrupted net packet");
558             break;
559         }
560         if ((head->size + static_cast<int32_t>(sizeof(PackHead))) > circleBuffer.ResidualSize()) {
561             FI_HILOGI("Incomplete package, package size:%{public}d, residual size:%{public}d",
562                 (head->size + static_cast<int32_t>(sizeof(PackHead))), circleBuffer.ResidualSize());
563             break;
564         }
565         NetPacket packet(head->idMsg);
566 
567         if ((head->size > 0) && !packet.Write(&buf[sizeof(PackHead)], head->size)) {
568             FI_HILOGE("Failed to fill packet, PacketSize:%{public}d", head->size);
569             break;
570         }
571         circleBuffer.SeekReadPos(packet.GetPacketLength());
572         HandlePacket(networkId, packet);
573     }
574 }
575 
HandlePacket(const std::string &networkId, NetPacket &packet)576 void DSoftbusAdapterImpl::HandlePacket(const std::string &networkId, NetPacket &packet)
577 {
578     CALL_DEBUG_ENTER;
579     for (const auto &item : observers_) {
580         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
581         if ((observer != nullptr) &&
582             observer->OnPacket(networkId, packet)) {
583             return;
584         }
585     }
586 }
587 
HandleRawData(const std::string &networkId, const void *data, uint32_t dataLen)588 void DSoftbusAdapterImpl::HandleRawData(const std::string &networkId, const void *data, uint32_t dataLen)
589 {
590     CALL_DEBUG_ENTER;
591     for (const auto &item : observers_) {
592         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
593         if ((observer != nullptr) &&
594             observer->OnRawData(networkId, data, dataLen)) {
595             return;
596         }
597     }
598 }
599 } // namespace DeviceStatus
600 } // namespace Msdp
601 } // namespace OHOS
602