1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import re
20import telnetlib
21import time
22import os
23import threading
24
25from xdevice import DeviceOsType
26from xdevice import DeviceProperties
27from xdevice import ConfigConst
28from xdevice import DeviceLabelType
29from xdevice import ModeType
30from xdevice import IDevice
31from xdevice import platform_logger
32from xdevice import DeviceAllocationState
33from xdevice import Plugin
34from xdevice import exec_cmd
35from xdevice import convert_serial
36from xdevice import convert_mac
37from xdevice import convert_ip
38from xdevice import check_mode
39from xdevice import Platform
40
41from ohos.constants import ComType
42from ohos.environment.dmlib_lite import LiteHelper
43from ohos.error import ErrorMessage
44from ohos.exception import LiteDeviceConnectError
45from ohos.exception import LiteDeviceTimeout
46from ohos.exception import LiteParamError
47
48LOG = platform_logger("DeviceLite")
49TIMEOUT = 90
50RETRY_ATTEMPTS = 0
51HDC = "litehdc.exe"
52DEFAULT_BAUD_RATE = 115200
53
54
55def get_hdc_path():
56    from xdevice import Variables
57    user_path = os.path.join(Variables.exec_dir, "resource/tools")
58    top_user_path = os.path.join(Variables.top_dir, "config")
59    config_path = os.path.join(Variables.res_dir, "config")
60    paths = [user_path, top_user_path, config_path]
61
62    file_path = ""
63    for path in paths:
64        if os.path.exists(os.path.abspath(os.path.join(
65                path, HDC))):
66            file_path = os.path.abspath(os.path.join(
67                path, HDC))
68            break
69
70    if os.path.exists(file_path):
71        return file_path
72    else:
73        raise LiteParamError(ErrorMessage.Common.Code_0301006)
74
75
76def parse_available_com(com_str):
77    com_str = com_str.replace("\r", " ")
78    com_list = com_str.split("\n")
79    for index, item in enumerate(com_list):
80        com_list[index] = item.strip().strip(b'\x00'.decode())
81    return com_list
82
83
84def perform_device_action(func):
85    def device_action(self, *args, **kwargs):
86        if not self.get_recover_state():
87            LOG.debug("Device %s %s is false" % (self.device_sn,
88                                                 ConfigConst.recover_state))
89            return "", "", ""
90
91        tmp = int(kwargs.get("retry", RETRY_ATTEMPTS))
92        retry = tmp + 1 if tmp > 0 else 1
93        exception = None
94        for num in range(retry):
95            try:
96                result = func(self, *args, **kwargs)
97                return result
98            except LiteDeviceTimeout as error:
99                LOG.error(error)
100                exception = error
101                if num:
102                    self.recover_device()
103            except Exception as error:
104                LOG.error(error)
105                exception = error
106        raise exception
107
108    return device_action
109
110
111@Plugin(type=Plugin.DEVICE, id=DeviceOsType.lite)
112class DeviceLite(IDevice):
113    """
114    Class representing a device lite device.
115
116    Each object of this class represents one device lite device in xDevice.
117
118    Attributes:
119        device_connect_type: A string that's the type of lite device
120    """
121    device_os_type = DeviceOsType.lite
122    device_allocation_state = DeviceAllocationState.available
123
124    def __init__(self):
125        self.device_sn = ""
126        self.label = ""
127        self.device_connect_type = ""
128        self.device_kernel = ""
129        self.device = None
130        self.ifconfig = None
131        self.device_id = None
132        self.extend_value = {}
133        self.device_lock = threading.RLock()
134        self.test_platform = Platform.ohos
135        self.device_props = {}
136        self.device_description = {}
137
138    def init_description(self):
139        if self.device_description:
140            return
141        desc = {
142            DeviceProperties.sn: convert_serial(self.device_sn),
143            DeviceProperties.model: self.label,
144            DeviceProperties.type_: self.label,
145            DeviceProperties.platform: self.test_platform,
146            DeviceProperties.version: "",
147            DeviceProperties.others: self.device_props
148        }
149        self.device_description.update(desc)
150
151    def __set_serial__(self, device=None):
152        for item in device:
153            if "ip" in item.keys() and "port" in item.keys():
154                self.device_sn = "remote_%s_%s" % \
155                                 (item.get("ip"), item.get("port"))
156                break
157            elif "type" in item.keys() and "com" in item.keys():
158                self.device_sn = "local_%s" % item.get("com")
159                break
160
161    def __get_serial__(self):
162        return self.device_sn
163
164    def get(self, key=None, default=None):
165        if not key:
166            return default
167        value = getattr(self, key, None)
168        if value:
169            return value
170        else:
171            return self.extend_value.get(key, default)
172
173    def update_device_props(self, props):
174        if self.device_props or not isinstance(props, dict):
175            return
176        self.device_props.update(props)
177
178    def __set_device_kernel__(self, kernel_type=""):
179        self.device_kernel = kernel_type
180
181    def __get_device_kernel__(self):
182        return self.device_kernel
183
184    @staticmethod
185    def _check_watchgt(device):
186        for item in device:
187            if "label" not in item.keys():
188                raise LiteParamError(ErrorMessage.Config.Code_0302027)
189            if "com" not in item.keys() or ("com" in item.keys() and
190                                            not item.get("com")):
191                raise LiteParamError(ErrorMessage.Config.Code_0302028)
192            else:
193                hdc = get_hdc_path()
194                result = exec_cmd([hdc])
195                com_list = parse_available_com(result)
196                if item.get("com").upper() in com_list:
197                    return True
198                else:
199                    raise LiteParamError(ErrorMessage.Config.Code_0302029)
200
201    @staticmethod
202    def _check_wifiiot_config(device):
203        com_type_set = set()
204        for item in device:
205            if "label" not in item.keys():
206                if "com" not in item.keys() or ("com" in item.keys() and
207                                                not item.get("com")):
208                    raise LiteParamError(ErrorMessage.Config.Code_0302030)
209
210                if "type" not in item.keys() or ("type" not in item.keys() and
211                                                 not item.get("type")):
212                    raise LiteParamError(ErrorMessage.Config.Code_0302031)
213                else:
214                    com_type_set.add(item.get("type"))
215        if len(com_type_set) < 2:
216            raise LiteParamError(ErrorMessage.Config.Code_0302032)
217
218    @staticmethod
219    def _check_ipcamera_local(device):
220        for item in device:
221            if "label" not in item.keys():
222                if "com" not in item.keys() or ("com" in item.keys() and
223                                                not item.get("com")):
224                    raise LiteParamError(ErrorMessage.Config.Code_0302033)
225
226    @staticmethod
227    def _check_ipcamera_remote(device=None):
228        for item in device:
229            if "label" not in item.keys():
230                if "port" in item.keys() and item.get("port") and not item.get(
231                        "port").isnumeric():
232                    raise LiteParamError(ErrorMessage.Config.Code_0302034)
233                elif "port" not in item.keys():
234                    raise LiteParamError(ErrorMessage.Config.Code_0302035)
235
236    def __check_config__(self, device=None):
237        self.set_connect_type(device)
238        if self.label == DeviceLabelType.wifiiot:
239            self._check_wifiiot_config(device)
240        elif self.label == DeviceLabelType.ipcamera and \
241                self.device_connect_type == "local":
242            self._check_ipcamera_local(device)
243        elif self.label == DeviceLabelType.ipcamera and \
244                self.device_connect_type == "remote":
245            self._check_ipcamera_remote(device)
246        elif self.label == DeviceLabelType.watch_gt:
247            self._check_watchgt(device)
248
249    def set_connect_type(self, device):
250        pattern = r'^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[' \
251                  r'01]?\d\d?)$'
252        for item in device:
253            if "label" in item.keys():
254                self.label = item.get("label")
255            if "com" in item.keys():
256                self.device_connect_type = "local"
257            if "ip" in item.keys():
258                if re.match(pattern, item.get("ip")):
259                    self.device_connect_type = "remote"
260                else:
261                    raise LiteParamError(ErrorMessage.Config.Code_0302025)
262        if not self.label:
263            raise LiteParamError(ErrorMessage.Config.Code_0302026)
264        else:
265            if self.label != DeviceLabelType.wifiiot and \
266                    self.label != DeviceLabelType.ipcamera and \
267                    self.label != DeviceLabelType.watch_gt:
268                raise LiteParamError(ErrorMessage.Config.Code_0302020)
269        if not self.device_connect_type:
270            raise LiteParamError(ErrorMessage.Config.Code_0302021)
271
272    def __init_device__(self, device):
273        self.__check_config__(device)
274        self.__set_serial__(device)
275        if self.device_connect_type == "remote":
276            self.device = CONTROLLER_DICT.get("remote")(device)
277        else:
278            self.device = CONTROLLER_DICT.get("local")(device)
279
280        self.ifconfig = device[1].get("ifconfig")
281        if self.ifconfig:
282            self.connect()
283            self.execute_command_with_timeout(command='\r', timeout=5)
284            self.execute_command_with_timeout(command=self.ifconfig, timeout=5)
285
286    def connect(self):
287        """
288        Connect the device
289
290        """
291        try:
292            self.device.connect()
293        except LiteDeviceConnectError as _:
294            if check_mode(ModeType.decc):
295                LOG.debug("Set device %s recover state to false" %
296                          self.device_sn)
297                self.device_allocation_state = DeviceAllocationState.unusable
298                self.set_recover_state(False)
299            raise
300
301    @perform_device_action
302    def execute_command_with_timeout(self, command="", case_type="",
303                                     timeout=TIMEOUT, **kwargs):
304        """Executes command on the device.
305
306        Args:
307            command: the command to execute
308            case_type: CTest or CppTest
309            timeout: timeout for read result
310            **kwargs: receiver - parser handler input
311
312        Returns:
313            (filter_result, status, error_message)
314
315            filter_result: command execution result
316            status: true or false
317            error_message: command execution error message
318        """
319        receiver = kwargs.get("receiver", None)
320        if self.device_connect_type == "remote":
321            LOG.info("%s execute command shell %s with timeout %ss" %
322                     (convert_serial(self.__get_serial__()), command,
323                      str(timeout)))
324            filter_result, status, error_message = \
325                self.device.execute_command_with_timeout(
326                    command=command,
327                    timeout=timeout,
328                    receiver=receiver)
329        elif self.device_connect_type == "agent":
330            filter_result, status, error_message = \
331                self.device.execute_command_with_timeout(
332                    command=command,
333                    case_type=case_type,
334                    timeout=timeout,
335                    receiver=receiver, type="cmd")
336        else:
337            filter_result, status, error_message = \
338                self.device.execute_command_with_timeout(
339                    command=command,
340                    case_type=case_type,
341                    timeout=timeout,
342                    receiver=receiver)
343        if not receiver:
344            LOG.debug("{} execute result:{}".format(convert_serial(self.__get_serial__()),
345                                                    convert_mac(filter_result.replace("BS", ""))))
346        if not status:
347            LOG.debug("{} error_message:{}".format(convert_serial(self.__get_serial__()),
348                                                   convert_mac(error_message.replace("BS", ""))))
349
350        return filter_result.replace("BS", ""), status, error_message
351
352    def recover_device(self):
353        self.reboot()
354
355    def reboot(self):
356        self.connect()
357        filter_result, status, error_message = self. \
358            execute_command_with_timeout(command="reset", timeout=30)
359        if not filter_result:
360            if check_mode(ModeType.decc):
361                LOG.debug("Set device %s recover state to false" %
362                          self.device_sn)
363                self.device_allocation_state = DeviceAllocationState.unusable
364                self.set_recover_state(False)
365        if self.ifconfig:
366            enter_result, _, _ = self.execute_command_with_timeout(
367                command='\r',
368                timeout=15)
369            if " #" in enter_result or "OHOS #" in enter_result:
370                LOG.info("Reset device %s success" % self.device_sn)
371                self.execute_command_with_timeout(command=self.ifconfig,
372                                                  timeout=5)
373            elif "hisilicon #" in enter_result:
374                LOG.info("Reset device %s fail" % self.device_sn)
375
376            ifconfig_result, _, _ = self.execute_command_with_timeout(
377                command="ifconfig",
378                timeout=5)
379
380    def close(self):
381        """
382        Close the telnet connection with device server or close the local
383        serial
384        """
385        self.device.close()
386
387    def set_recover_state(self, state):
388        with self.device_lock:
389            setattr(self, ConfigConst.recover_state, state)
390
391    def get_recover_state(self, default_state=True):
392        with self.device_lock:
393            state = getattr(self, ConfigConst.recover_state, default_state)
394            return state
395
396
397class RemoteController:
398    """
399    Class representing a device lite remote device.
400    Each object of this class represents one device lite remote device
401    in xDevice.
402    """
403
404    def __init__(self, device):
405        self.host = device[1].get("ip")
406        self.port = int(device[1].get("port"))
407        self.telnet = None
408
409    def connect(self):
410        """
411        Connect the device server
412        """
413        try:
414            if self.telnet:
415                return self.telnet
416            self.telnet = telnetlib.Telnet(self.host, self.port,
417                                           timeout=TIMEOUT)
418        except Exception as error:
419            raise LiteDeviceConnectError(ErrorMessage.Device.Code_0303008.format(
420                convert_ip(self.host), self.port, str(error))) from error
421        time.sleep(2)
422        self.telnet.set_debuglevel(0)
423        return self.telnet
424
425    def execute_command_with_timeout(self, command="", timeout=TIMEOUT,
426                                     receiver=None):
427        """
428        Executes command on the device.
429
430        Parameters:
431            command: the command to execute
432            timeout: timeout for read result
433            receiver: parser handler
434        """
435        return LiteHelper.execute_remote_cmd_with_timeout(
436            self.telnet, command, timeout, receiver)
437
438    def close(self):
439        """
440        Close the telnet connection with device server
441        """
442        try:
443            if not self.telnet:
444                return
445            self.telnet.close()
446            self.telnet = None
447        except ConnectionError as _:
448            error_message = "Remote device is disconnected abnormally"
449            LOG.error(error_message, error_no="00401")
450
451
452class LocalController:
453    def __init__(self, device):
454        """
455        Init Local device.
456        Parameters:
457            device: local device
458        """
459        self.com_dict = {}
460        for item in device:
461            if "com" in item.keys():
462                if "type" in item.keys() and ComType.cmd_com == item.get(
463                        "type"):
464                    self.com_dict[ComType.cmd_com] = ComController(item)
465                elif "type" in item.keys() and ComType.deploy_com == item.get(
466                        "type"):
467                    self.com_dict[ComType.deploy_com] = ComController(item)
468
469    def connect(self, key=ComType.cmd_com):
470        """
471        Open serial.
472        """
473        self.com_dict.get(key).connect()
474
475    def close(self, key=ComType.cmd_com):
476        """
477        Close serial.
478        """
479        if self.com_dict and self.com_dict.get(key):
480            self.com_dict.get(key).close()
481
482    def execute_command_with_timeout(self, **kwargs):
483        """
484        Execute command on the serial and read all the output from the serial.
485        """
486        args = kwargs
487        key = args.get("key", ComType.cmd_com)
488        command = args.get("command", None)
489        case_type = args.get("case_type", "")
490        receiver = args.get("receiver", None)
491        timeout = args.get("timeout", TIMEOUT)
492        return self.com_dict.get(key).execute_command_with_timeout(
493            command=command, case_type=case_type,
494            timeout=timeout, receiver=receiver)
495
496
497class ComController:
498    def __init__(self, device):
499        """
500        Init serial.
501        Parameters:
502            device: local com
503        """
504        self.is_open = False
505        self.com = None
506        self.serial_port = device.get("com", None)
507        self.baud_rate = int(device.get("baud_rate", DEFAULT_BAUD_RATE))
508        self.timeout = int(device.get("timeout", TIMEOUT))
509        self.usb_port = device.get("usb_port", None)
510
511    def connect(self):
512        """
513        Open serial.
514        """
515        try:
516            if not self.is_open:
517                import serial
518                self.com = serial.Serial(self.serial_port,
519                                         baudrate=self.baud_rate,
520                                         timeout=self.timeout)
521                self.is_open = True
522                return self.com
523        except Exception as error:
524            raise LiteDeviceConnectError(ErrorMessage.Device.Code_0303009.format(
525                self.serial_port, str(error))) from error
526        return None
527
528    def close(self):
529        """
530        Close serial.
531        """
532        try:
533            if not self.com:
534                return
535            if self.is_open:
536                self.com.close()
537            self.is_open = False
538        except ConnectionError as _:
539            error_message = "Local device is disconnected abnormally"
540            LOG.error(error_message, error_no="00401")
541
542    def execute_command_with_timeout(self, **kwargs):
543        """
544        Execute command on the serial and read all the output from the serial.
545        """
546        return LiteHelper.execute_local_cmd_with_timeout(self.com, **kwargs)
547
548    def execute_command(self, command):
549        """
550        Execute command on the serial and read all the output from the serial.
551        """
552        LiteHelper.execute_local_command(self.com, command)
553
554
555CONTROLLER_DICT = {
556    "local": LocalController,
557    "remote": RemoteController,
558}
559