15f9996aaSopenharmony_ci#!/usr/bin/env python3
25f9996aaSopenharmony_ci# -*- coding: utf-8 -*-
35f9996aaSopenharmony_ci# Copyright (c) 2022-2023 Huawei Device Co., Ltd.
45f9996aaSopenharmony_ci# Licensed under the Apache License, Version 2.0 (the "License");
55f9996aaSopenharmony_ci# you may not use this file except in compliance with the License.
65f9996aaSopenharmony_ci# You may obtain a copy of the License at
75f9996aaSopenharmony_ci#
85f9996aaSopenharmony_ci#     http://www.apache.org/licenses/LICENSE-2.0
95f9996aaSopenharmony_ci#
105f9996aaSopenharmony_ci# Unless required by applicable law or agreed to in writing, software
115f9996aaSopenharmony_ci# distributed under the License is distributed on an "AS IS" BASIS,
125f9996aaSopenharmony_ci# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
135f9996aaSopenharmony_ci# See the License for the specific language governing permissions and
145f9996aaSopenharmony_ci# limitations under the License.
155f9996aaSopenharmony_ciimport json
165f9996aaSopenharmony_ciimport os
175f9996aaSopenharmony_ciimport getopt
185f9996aaSopenharmony_ciimport sys
195f9996aaSopenharmony_ciimport contextlib
205f9996aaSopenharmony_ci
215f9996aaSopenharmony_ci# Store the hash table of the services that need to validate
225f9996aaSopenharmony_ciPRIVILEGE_HASH = {}
235f9996aaSopenharmony_ciCRITICAL_HASH = {}
245f9996aaSopenharmony_ci
255f9996aaSopenharmony_ci
265f9996aaSopenharmony_ciclass CfgValidateError(Exception):
275f9996aaSopenharmony_ci    """
285f9996aaSopenharmony_ci    When the process list verification fails, throw this exception
295f9996aaSopenharmony_ci    """
305f9996aaSopenharmony_ci    def __init__(self, name, reason):
315f9996aaSopenharmony_ci        super().__init__()
325f9996aaSopenharmony_ci        self.name = name
335f9996aaSopenharmony_ci        self.reason = reason
345f9996aaSopenharmony_ci
355f9996aaSopenharmony_ci
365f9996aaSopenharmony_ciclass CfgItem:
375f9996aaSopenharmony_ci    """
385f9996aaSopenharmony_ci    CfgItem is the value of HASH, representing the interesetd field of a service read from a cfg file
395f9996aaSopenharmony_ci    """
405f9996aaSopenharmony_ci
415f9996aaSopenharmony_ci    def __init__(self):
425f9996aaSopenharmony_ci        self.uid = ""
435f9996aaSopenharmony_ci        self.gid = []
445f9996aaSopenharmony_ci        self.need_verified = False
455f9996aaSopenharmony_ci        self.enabled_critical = False
465f9996aaSopenharmony_ci        self.loc = ""
475f9996aaSopenharmony_ci        self.critical = []
485f9996aaSopenharmony_ci        self.related_item = ProcessItem()
495f9996aaSopenharmony_ci
505f9996aaSopenharmony_ci    def __init__(self, loc):
515f9996aaSopenharmony_ci        self.uid = ""
525f9996aaSopenharmony_ci        self.gid = []
535f9996aaSopenharmony_ci        self.need_verified = False
545f9996aaSopenharmony_ci        self.enabled_critical = False
555f9996aaSopenharmony_ci        self.loc = loc
565f9996aaSopenharmony_ci        self.critical = []
575f9996aaSopenharmony_ci        self.related_item = ProcessItem()
585f9996aaSopenharmony_ci
595f9996aaSopenharmony_ci    @classmethod
605f9996aaSopenharmony_ci    def _is_need_verified_uid(self, uid: str):
615f9996aaSopenharmony_ci        return uid == "root" or uid == "system"
625f9996aaSopenharmony_ci
635f9996aaSopenharmony_ci    @classmethod
645f9996aaSopenharmony_ci    def _is_need_verified_gid(self, gid):
655f9996aaSopenharmony_ci        # To enable gid-root validate, change it to "return gid == "root""
665f9996aaSopenharmony_ci        return False
675f9996aaSopenharmony_ci
685f9996aaSopenharmony_ci    @classmethod
695f9996aaSopenharmony_ci    def _is_need_verified_critical(self, critical: list):
705f9996aaSopenharmony_ci        return critical[0] == 1
715f9996aaSopenharmony_ci
725f9996aaSopenharmony_ci    def set_uid(self, uid):
735f9996aaSopenharmony_ci        """
745f9996aaSopenharmony_ci        Set uid and check it at the same time.
755f9996aaSopenharmony_ci        The uid needs to be validated only if _is_need_verified_uid return True
765f9996aaSopenharmony_ci        """
775f9996aaSopenharmony_ci        if CfgItem._is_need_verified_uid(uid):
785f9996aaSopenharmony_ci            self.uid = uid
795f9996aaSopenharmony_ci            self.need_verified = True
805f9996aaSopenharmony_ci
815f9996aaSopenharmony_ci    def append_gid(self, gid):
825f9996aaSopenharmony_ci        """
835f9996aaSopenharmony_ci        Append gid and check it at the same time.
845f9996aaSopenharmony_ci        The gid needs to be validated only if _is_need_verified_gid return True
855f9996aaSopenharmony_ci        """
865f9996aaSopenharmony_ci        if CfgItem._is_need_verified_gid(gid) and gid not in self.gid:
875f9996aaSopenharmony_ci            self.gid.append(gid)
885f9996aaSopenharmony_ci            self.need_verified = True
895f9996aaSopenharmony_ci
905f9996aaSopenharmony_ci    def set_critical(self, critical: bool):
915f9996aaSopenharmony_ci        if CfgItem._is_need_verified_critical(critical):
925f9996aaSopenharmony_ci            self.critical = critical
935f9996aaSopenharmony_ci            self.enabled_critical = True
945f9996aaSopenharmony_ci
955f9996aaSopenharmony_ci    def handle_socket(self, socket: dict):
965f9996aaSopenharmony_ci        """
975f9996aaSopenharmony_ci        Validate possible field "socket" in the field "services"
985f9996aaSopenharmony_ci        """
995f9996aaSopenharmony_ci        for i in socket:
1005f9996aaSopenharmony_ci            if ("uid" in i) and CfgItem._is_need_verified_uid(i["uid"]):
1015f9996aaSopenharmony_ci                self.need_verified = True
1025f9996aaSopenharmony_ci                if self.uid != "" and self.uid != i["uid"]:
1035f9996aaSopenharmony_ci                    print("Error: uid and uid in socket is not same!")
1045f9996aaSopenharmony_ci                    print("Cfg location: {}".format(self.loc))
1055f9996aaSopenharmony_ci                    raise CfgValidateError("Customization Error", "cfgs check not pass")
1065f9996aaSopenharmony_ci                self.uid = i["uid"]
1075f9996aaSopenharmony_ci            if "gid" in i :
1085f9996aaSopenharmony_ci                if isinstance(i["gid"], str) and i["gid"] not in self.gid:
1095f9996aaSopenharmony_ci                    self.append_gid(i["gid"])
1105f9996aaSopenharmony_ci                    continue
1115f9996aaSopenharmony_ci                for item in i["gid"]:
1125f9996aaSopenharmony_ci                    self.append_gid(item)
1135f9996aaSopenharmony_ci
1145f9996aaSopenharmony_ci
1155f9996aaSopenharmony_ci    def record_related_item(self, related_item):
1165f9996aaSopenharmony_ci        """
1175f9996aaSopenharmony_ci        When its permissions does not match those in process list,
1185f9996aaSopenharmony_ci        records the permissions given in process list
1195f9996aaSopenharmony_ci        """
1205f9996aaSopenharmony_ci        self.related_item = related_item
1215f9996aaSopenharmony_ci
1225f9996aaSopenharmony_ci
1235f9996aaSopenharmony_ciclass ProcessItem:
1245f9996aaSopenharmony_ci    """
1255f9996aaSopenharmony_ci    Processitem is the data structure of an item read from the process list
1265f9996aaSopenharmony_ci    """
1275f9996aaSopenharmony_ci    def __init__(self):
1285f9996aaSopenharmony_ci        self.name = ""
1295f9996aaSopenharmony_ci        self.uid = ""
1305f9996aaSopenharmony_ci        self.gid = []
1315f9996aaSopenharmony_ci        self.critical = []
1325f9996aaSopenharmony_ci
1335f9996aaSopenharmony_ci    def __init__(self, process_item=None):
1345f9996aaSopenharmony_ci        """
1355f9996aaSopenharmony_ci        Use the JSON item in the process list to initialize the class
1365f9996aaSopenharmony_ci        """
1375f9996aaSopenharmony_ci        if process_item is None:
1385f9996aaSopenharmony_ci            self.name = ""
1395f9996aaSopenharmony_ci            self.uid = ""
1405f9996aaSopenharmony_ci            self.gid = []
1415f9996aaSopenharmony_ci            self.critical = []
1425f9996aaSopenharmony_ci            return
1435f9996aaSopenharmony_ci
1445f9996aaSopenharmony_ci        self.name = process_item["name"]
1455f9996aaSopenharmony_ci
1465f9996aaSopenharmony_ci        if "uid" in process_item:
1475f9996aaSopenharmony_ci            self.uid = process_item["uid"]
1485f9996aaSopenharmony_ci        else:
1495f9996aaSopenharmony_ci            self.uid = ""
1505f9996aaSopenharmony_ci        if "gid" in process_item:
1515f9996aaSopenharmony_ci            if isinstance(process_item["gid"], str):
1525f9996aaSopenharmony_ci                self.gid = []
1535f9996aaSopenharmony_ci                self.gid.append(process_item["gid"])
1545f9996aaSopenharmony_ci            else:
1555f9996aaSopenharmony_ci                self.gid = process_item["gid"]
1565f9996aaSopenharmony_ci        else:
1575f9996aaSopenharmony_ci            self.gid = []
1585f9996aaSopenharmony_ci        if "critical" in process_item:
1595f9996aaSopenharmony_ci            self.critical = process_item["critical"]
1605f9996aaSopenharmony_ci        else:
1615f9996aaSopenharmony_ci            self.critical = []
1625f9996aaSopenharmony_ci
1635f9996aaSopenharmony_ci    def verify(self, cfg_item):
1645f9996aaSopenharmony_ci        """
1655f9996aaSopenharmony_ci        Returns whether the corresponding CFG (cfg_item) has passed the verification
1665f9996aaSopenharmony_ci        """
1675f9996aaSopenharmony_ci        if self.uid or self.gid:
1685f9996aaSopenharmony_ci            return self._verify_uid(cfg_item.uid) and self._verify_gid(cfg_item.gid)
1695f9996aaSopenharmony_ci        if self.critical:
1705f9996aaSopenharmony_ci            return self.critical == cfg_item.critical
1715f9996aaSopenharmony_ci        return False
1725f9996aaSopenharmony_ci
1735f9996aaSopenharmony_ci    def _verify_uid(self, uid):
1745f9996aaSopenharmony_ci        return not ((uid == "root" or uid == "system") and (uid != self.uid))
1755f9996aaSopenharmony_ci
1765f9996aaSopenharmony_ci    def _verify_gid(self, gid):
1775f9996aaSopenharmony_ci        return not ("root" in gid and "root" not in self.gid)
1785f9996aaSopenharmony_ci
1795f9996aaSopenharmony_ci
1805f9996aaSopenharmony_cidef print_privilege_hash():
1815f9996aaSopenharmony_ci    global PRIVILEGE_HASH
1825f9996aaSopenharmony_ci    for i in PRIVILEGE_HASH.items():
1835f9996aaSopenharmony_ci        print("Name: {}\nuid: {}\ngiven uid: {}\ngid: ".format(i[0], i[1].uid, i[1].related_item.uid), end="")
1845f9996aaSopenharmony_ci
1855f9996aaSopenharmony_ci        for gid in i[1].gid:
1865f9996aaSopenharmony_ci            print(gid, end=" ")
1875f9996aaSopenharmony_ci        print("")
1885f9996aaSopenharmony_ci
1895f9996aaSopenharmony_ci        print("given gid: ", end=" ")
1905f9996aaSopenharmony_ci        for gid in i[1].related_item.gid:
1915f9996aaSopenharmony_ci            print(gid, end=" ")
1925f9996aaSopenharmony_ci        print("")
1935f9996aaSopenharmony_ci        print("Cfg location: {}".format(i[1].loc))
1945f9996aaSopenharmony_ci        print("")
1955f9996aaSopenharmony_ci
1965f9996aaSopenharmony_ci
1975f9996aaSopenharmony_cidef print_critical_hash():
1985f9996aaSopenharmony_ci    global CRITICAL_HASH
1995f9996aaSopenharmony_ci    for i in CRITICAL_HASH.items():
2005f9996aaSopenharmony_ci        print("Cfg location: {}\n".format(i[1].loc), end="")
2015f9996aaSopenharmony_ci        print("Name: {}\ncritical: {}\n".format(i[0], i[1].critical), end="")
2025f9996aaSopenharmony_ci        print("Whitelist-allowed critical: {}\n".format(i[1].related_item.critical))
2035f9996aaSopenharmony_ci        print("")
2045f9996aaSopenharmony_ci
2055f9996aaSopenharmony_ci
2065f9996aaSopenharmony_cidef container_validate(process_path: str, list_name: str, item_container: list):
2075f9996aaSopenharmony_ci    with open(process_path) as fp:
2085f9996aaSopenharmony_ci        data = json.load(fp)
2095f9996aaSopenharmony_ci        if list_name not in data:
2105f9996aaSopenharmony_ci            print("Error: {}is not a valid whilelist, it has not a wanted field name".format(process_path))
2115f9996aaSopenharmony_ci            raise CfgValidateError("Customization Error", "cfgs check not pass")
2125f9996aaSopenharmony_ci
2135f9996aaSopenharmony_ci        for i in data[list_name]:
2145f9996aaSopenharmony_ci            if i["name"] not in item_container :
2155f9996aaSopenharmony_ci                # no CfgItem in HASH meet the item in process list
2165f9996aaSopenharmony_ci                continue
2175f9996aaSopenharmony_ci
2185f9996aaSopenharmony_ci            temp_item = ProcessItem(i)
2195f9996aaSopenharmony_ci            if temp_item.name not in item_container:
2205f9996aaSopenharmony_ci                continue
2215f9996aaSopenharmony_ci
2225f9996aaSopenharmony_ci            if temp_item.verify(item_container.get(temp_item.name)):
2235f9996aaSopenharmony_ci                # Process field check passed, remove the corresponding service from HASH
2245f9996aaSopenharmony_ci                item_container.pop(temp_item.name)
2255f9996aaSopenharmony_ci            else:
2265f9996aaSopenharmony_ci                item_container.get(temp_item.name).record_related_item(temp_item)
2275f9996aaSopenharmony_ci
2285f9996aaSopenharmony_ci
2295f9996aaSopenharmony_cidef check_container(critical_process_path: str):
2305f9996aaSopenharmony_ci    global PRIVILEGE_HASH
2315f9996aaSopenharmony_ci    global CRITICAL_HASH
2325f9996aaSopenharmony_ci    if PRIVILEGE_HASH:
2335f9996aaSopenharmony_ci        # The remaining services in HASH do not pass the high-privilege validation
2345f9996aaSopenharmony_ci        print("Error: some services are not authenticated. Listed as follow:")
2355f9996aaSopenharmony_ci        print_privilege_hash()
2365f9996aaSopenharmony_ci
2375f9996aaSopenharmony_ci        raise CfgValidateError("Customization Error", "cfgs check not pass")
2385f9996aaSopenharmony_ci
2395f9996aaSopenharmony_ci    if CRITICAL_HASH:
2405f9996aaSopenharmony_ci        # The remaining services in HASH do not pass the critical validation
2415f9996aaSopenharmony_ci        print("Error: some services do not match with critical whitelist({}).".format(critical_process_path), end="")
2425f9996aaSopenharmony_ci        print(" Directly enable critical or modify enabled critical services are prohibited!", end="")
2435f9996aaSopenharmony_ci        print(" Misconfigured services listed as follow:")
2445f9996aaSopenharmony_ci        print_critical_hash()
2455f9996aaSopenharmony_ci
2465f9996aaSopenharmony_ci        raise CfgValidateError("Customization Error", "cfgs check not pass")
2475f9996aaSopenharmony_ci    return
2485f9996aaSopenharmony_ci
2495f9996aaSopenharmony_ci
2505f9996aaSopenharmony_ci
2515f9996aaSopenharmony_cidef validate_cfg_file(privilege_process_path: str, critical_process_path: str, result_path: str):
2525f9996aaSopenharmony_ci    """
2535f9996aaSopenharmony_ci    Load the process list file
2545f9996aaSopenharmony_ci    For each item in the list, find out whether there is a CfgItem needs validation in HASH
2555f9996aaSopenharmony_ci    """
2565f9996aaSopenharmony_ci    global PRIVILEGE_HASH
2575f9996aaSopenharmony_ci    global CRITICAL_HASH
2585f9996aaSopenharmony_ci    if not os.path.exists(privilege_process_path):
2595f9996aaSopenharmony_ci        print("High-privilege process check skipped: file [{}] not exist".format(privilege_process_path))
2605f9996aaSopenharmony_ci        PRIVILEGE_HASH.clear()
2615f9996aaSopenharmony_ci    else:
2625f9996aaSopenharmony_ci        container_validate(privilege_process_path, "high_privilege_process_list", PRIVILEGE_HASH)
2635f9996aaSopenharmony_ci    if not os.path.exists(critical_process_path):
2645f9996aaSopenharmony_ci        print("Critical-reboot process check skipped: file [{}] not exist".format(critical_process_path))
2655f9996aaSopenharmony_ci        CRITICAL_HASH.clear()
2665f9996aaSopenharmony_ci    else:
2675f9996aaSopenharmony_ci        container_validate(critical_process_path, "critical_reboot_process_list", CRITICAL_HASH)
2685f9996aaSopenharmony_ci
2695f9996aaSopenharmony_ci    check_container(critical_process_path)
2705f9996aaSopenharmony_ci
2715f9996aaSopenharmony_ci
2725f9996aaSopenharmony_cidef handle_services(filename: str, field: str):
2735f9996aaSopenharmony_ci    global PRIVILEGE_HASH
2745f9996aaSopenharmony_ci    global CRITICAL_HASH
2755f9996aaSopenharmony_ci    cfg_item = CfgItem(filename)
2765f9996aaSopenharmony_ci    key = field['name']
2775f9996aaSopenharmony_ci    if "uid" in field:
2785f9996aaSopenharmony_ci        cfg_item.set_uid(field["uid"])
2795f9996aaSopenharmony_ci    if "gid" in field:
2805f9996aaSopenharmony_ci        if isinstance(field["gid"], str):
2815f9996aaSopenharmony_ci            cfg_item.append_gid(field["gid"])
2825f9996aaSopenharmony_ci        else:
2835f9996aaSopenharmony_ci            for item in field["gid"]:
2845f9996aaSopenharmony_ci                cfg_item.append_gid(item)
2855f9996aaSopenharmony_ci    if "socket" in field:
2865f9996aaSopenharmony_ci        cfg_item.handle_socket(field["socket"])
2875f9996aaSopenharmony_ci    if "critical" in field:
2885f9996aaSopenharmony_ci        cfg_item.set_critical(field["critical"])
2895f9996aaSopenharmony_ci    if cfg_item.need_verified:
2905f9996aaSopenharmony_ci        # Services that need to check permissions are added to HASH
2915f9996aaSopenharmony_ci        PRIVILEGE_HASH[key] = cfg_item
2925f9996aaSopenharmony_ci    if cfg_item.enabled_critical:
2935f9996aaSopenharmony_ci        # Services that need to check critical are added to HASH
2945f9996aaSopenharmony_ci        CRITICAL_HASH[key] = cfg_item
2955f9996aaSopenharmony_ci
2965f9996aaSopenharmony_ci
2975f9996aaSopenharmony_cidef parse_cfg_file(filename: str):
2985f9996aaSopenharmony_ci    """
2995f9996aaSopenharmony_ci    Load the cfg file in JSON format
3005f9996aaSopenharmony_ci    """
3015f9996aaSopenharmony_ci    with open(filename) as fp:
3025f9996aaSopenharmony_ci        try:
3035f9996aaSopenharmony_ci            data = json.load(fp)
3045f9996aaSopenharmony_ci        except json.decoder.JSONDecodeError:
3055f9996aaSopenharmony_ci            print("\nError: loading cfg file {} failed. Cfg file not in JSON format!\n".format(filename))
3065f9996aaSopenharmony_ci        if "services" not in data:
3075f9996aaSopenharmony_ci            return
3085f9996aaSopenharmony_ci        for field in data['services']:
3095f9996aaSopenharmony_ci            handle_services(filename, field)
3105f9996aaSopenharmony_ci    return
3115f9996aaSopenharmony_ci
3125f9996aaSopenharmony_ci
3135f9996aaSopenharmony_cidef iterate_cfg_folder(cfg_dir: str):
3145f9996aaSopenharmony_ci    for file in os.listdir(cfg_dir):
3155f9996aaSopenharmony_ci        if file.endswith(".cfg"):
3165f9996aaSopenharmony_ci            parse_cfg_file("{}/{}".format(cfg_dir, file))
3175f9996aaSopenharmony_ci    return
3185f9996aaSopenharmony_ci
3195f9996aaSopenharmony_ci
3205f9996aaSopenharmony_cidef main():
3215f9996aaSopenharmony_ci    opts, args = getopt.getopt(sys.argv[1:], '', ['sys-cfg-folder=', 'vendor-cfg-folder=', \
3225f9996aaSopenharmony_ci        'high-privilege-process-list-path=', 'critical-reboot-process-list-path=', 'result-path='])
3235f9996aaSopenharmony_ci
3245f9996aaSopenharmony_ci    sys_cfg_folder = opts[0][1]
3255f9996aaSopenharmony_ci    if not os.path.exists(sys_cfg_folder):
3265f9996aaSopenharmony_ci        print("Process field check skipped: file [{}] not exist".format(sys_cfg_folder))
3275f9996aaSopenharmony_ci        return
3285f9996aaSopenharmony_ci
3295f9996aaSopenharmony_ci    vendor_cfg_folder = opts[1][1]
3305f9996aaSopenharmony_ci    if not os.path.exists(vendor_cfg_folder):
3315f9996aaSopenharmony_ci        print("Process field check skipped: file [{}] not exist".format(vendor_cfg_folder))
3325f9996aaSopenharmony_ci        return
3335f9996aaSopenharmony_ci
3345f9996aaSopenharmony_ci    privilege_process_path = opts[2][1]
3355f9996aaSopenharmony_ci
3365f9996aaSopenharmony_ci    critical_process_path = opts[3][1]
3375f9996aaSopenharmony_ci
3385f9996aaSopenharmony_ci    iterate_cfg_folder(sys_cfg_folder)
3395f9996aaSopenharmony_ci    iterate_cfg_folder(vendor_cfg_folder)
3405f9996aaSopenharmony_ci    validate_cfg_file(privilege_process_path, critical_process_path, None)
3415f9996aaSopenharmony_ci
3425f9996aaSopenharmony_ci    return
3435f9996aaSopenharmony_ci
3445f9996aaSopenharmony_ciif __name__ == "__main__":
3455f9996aaSopenharmony_ci
3465f9996aaSopenharmony_ci    main()
3475f9996aaSopenharmony_ci
348