1/*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#ifndef OHOS_DSCHED_CONTINUE_MANAGER_H
17#define OHOS_DSCHED_CONTINUE_MANAGER_H
18
19#include <map>
20#include <string>
21#include <atomic>
22
23#include "dsched_data_buffer.h"
24#include "dsched_continue.h"
25#include "idata_listener.h"
26#include "iremote_object.h"
27#include "single_instance.h"
28#include "want.h"
29
30namespace OHOS {
31namespace DistributedSchedule {
32namespace {
33constexpr int32_t MAX_CONCURRENT_SINK = 1;
34constexpr int32_t MAX_CONCURRENT_SOURCE = 1;
35constexpr int32_t CONTINUE_TIMEOUT = 10000;
36}
37class DSchedContinueManager {
38DECLARE_SINGLE_INSTANCE_BASE(DSchedContinueManager);
39public:
40    explicit DSchedContinueManager();
41    ~DSchedContinueManager();
42    int32_t ContinueMission(const std::string &srcDeviceId, const std::string &dstDeviceId,
43        int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams &wantParams);
44    int32_t ContinueMission(const DSchedContinueInfo& continueInfo, const sptr<IRemoteObject> &callback,
45        const OHOS::AAFwk::WantParams &wantParams);
46    int32_t StartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId, int32_t callerUid, int32_t status,
47        uint32_t accessToken);
48    int32_t NotifyCompleteContinuation(const std::u16string& devId, int32_t sessionId, bool isSuccess,
49        const std::string &callerBundleName);
50    int32_t OnContinueEnd(const DSchedContinueInfo& info);
51
52    void Init();
53    void UnInit();
54    void NotifyAllConnectDecision(std::string peerDeviceId, bool isSupport);
55    void OnDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer);
56    void OnShutdown(int32_t socket, bool isSelfCalled);
57
58    int32_t GetContinueInfo(std::string &srcDeviceId, std::string &dstDeviceId);
59    std::shared_ptr<DSchedContinue> GetDSchedContinueByWant(const OHOS::AAFwk::Want& want, int32_t missionId);
60    std::shared_ptr<DSchedContinue> GetDSchedContinueByDevId(const std::u16string& devId, int32_t missionId);
61    void NotifyTerminateContinuation(const int32_t missionId);
62
63private:
64    void StartEvent();
65    void HandleContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId, int32_t missionId,
66        const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams);
67    void HandleContinueMission(const DSchedContinueInfo& continueInfo,
68        const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams);
69    bool GetFirstBundleName(DSchedContinueInfo &info, std::string &firstBundleNamme, std::string bundleName,
70        std::string deviceId);
71    void HandleContinueMissionWithBundleName(DSchedContinueInfo &info, const sptr<IRemoteObject> &callback,
72        const OHOS::AAFwk::WantParams &wantParams);
73    void HandleStartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId, int32_t callerUid,
74        int32_t status, uint32_t accessToken);
75    void HandleNotifyCompleteContinuation(const std::u16string& devId, int32_t missionId, bool isSuccess,
76        const std::string &callerBundleName);
77    void HandleContinueEnd(const DSchedContinueInfo& info);
78    void HandleDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer);
79    void NotifyContinueDataRecv(int32_t sessionId, int32_t command, const std::string& jsonStr,
80        std::shared_ptr<DSchedDataBuffer> dataBuffer);
81    int32_t CheckContinuationLimit(const std::string& srcDeviceId, const std::string& dstDeviceId, int32_t &direction);
82    void WaitAllConnectDecision(int32_t direction, const DSchedContinueInfo &info, int32_t timeout);
83    void SetTimeOut(const DSchedContinueInfo& info, int32_t timeout);
84    void RemoveTimeout(const DSchedContinueInfo& info);
85
86    class SoftbusListener : public IDataListener {
87        void OnBind(int32_t socket, PeerSocketInfo info);
88        void OnShutdown(int32_t socket, bool isSelfCalled);
89        void OnDataRecv(int32_t socket, std::shared_ptr<DSchedDataBuffer> dataBuffer);
90    };
91
92private:
93#ifdef DMSFWK_ALL_CONNECT_MGR
94    static constexpr int32_t CONNECT_DECISION_WAIT_S = 60;
95#endif
96
97    std::thread eventThread_;
98    std::condition_variable eventCon_;
99    std::mutex eventMutex_;
100    std::shared_ptr<OHOS::AppExecFwk::EventHandler> eventHandler_;
101    std::shared_ptr<DSchedContinueManager::SoftbusListener> softbusListener_;
102
103    std::map<DSchedContinueInfo, std::shared_ptr<DSchedContinue>> continues_;
104    std::mutex continueMutex_;
105
106#ifdef DMSFWK_ALL_CONNECT_MGR
107    std::mutex connectDecisionMutex_;
108    std::condition_variable connectDecisionCond_;
109    std::map<std::string, std::atomic<bool>> peerConnectDecision_;
110#endif
111
112    std::atomic<int32_t> cntSink_ {0};
113    std::atomic<int32_t> cntSource_ {0};
114};
115}  // namespace DistributedSchedule
116}  // namespace OHOS
117#endif  // OHOS_DSCHED_CONTINUE_MANAGER_H
118