15f9996aaSopenharmony_ci#!/usr/bin/env python3 25f9996aaSopenharmony_ci# -*- coding: utf-8 -*- 35f9996aaSopenharmony_ci# Copyright (c) 2022-2023 Huawei Device Co., Ltd. 45f9996aaSopenharmony_ci# Licensed under the Apache License, Version 2.0 (the "License"); 55f9996aaSopenharmony_ci# you may not use this file except in compliance with the License. 65f9996aaSopenharmony_ci# You may obtain a copy of the License at 75f9996aaSopenharmony_ci# 85f9996aaSopenharmony_ci# http://www.apache.org/licenses/LICENSE-2.0 95f9996aaSopenharmony_ci# 105f9996aaSopenharmony_ci# Unless required by applicable law or agreed to in writing, software 115f9996aaSopenharmony_ci# distributed under the License is distributed on an "AS IS" BASIS, 125f9996aaSopenharmony_ci# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 135f9996aaSopenharmony_ci# See the License for the specific language governing permissions and 145f9996aaSopenharmony_ci# limitations under the License. 155f9996aaSopenharmony_ciimport json 165f9996aaSopenharmony_ciimport os 175f9996aaSopenharmony_ciimport getopt 185f9996aaSopenharmony_ciimport sys 195f9996aaSopenharmony_ciimport contextlib 205f9996aaSopenharmony_ci 215f9996aaSopenharmony_ci# Store the hash table of the services that need to validate 225f9996aaSopenharmony_ciPRIVILEGE_HASH = {} 235f9996aaSopenharmony_ciCRITICAL_HASH = {} 245f9996aaSopenharmony_ci 255f9996aaSopenharmony_ci 265f9996aaSopenharmony_ciclass CfgValidateError(Exception): 275f9996aaSopenharmony_ci """ 285f9996aaSopenharmony_ci When the process list verification fails, throw this exception 295f9996aaSopenharmony_ci """ 305f9996aaSopenharmony_ci def __init__(self, name, reason): 315f9996aaSopenharmony_ci super().__init__() 325f9996aaSopenharmony_ci self.name = name 335f9996aaSopenharmony_ci self.reason = reason 345f9996aaSopenharmony_ci 355f9996aaSopenharmony_ci 365f9996aaSopenharmony_ciclass CfgItem: 375f9996aaSopenharmony_ci """ 385f9996aaSopenharmony_ci CfgItem is the value of HASH, representing the interesetd field of a service read from a cfg file 395f9996aaSopenharmony_ci """ 405f9996aaSopenharmony_ci 415f9996aaSopenharmony_ci def __init__(self): 425f9996aaSopenharmony_ci self.uid = "" 435f9996aaSopenharmony_ci self.gid = [] 445f9996aaSopenharmony_ci self.need_verified = False 455f9996aaSopenharmony_ci self.enabled_critical = False 465f9996aaSopenharmony_ci self.loc = "" 475f9996aaSopenharmony_ci self.critical = [] 485f9996aaSopenharmony_ci self.related_item = ProcessItem() 495f9996aaSopenharmony_ci 505f9996aaSopenharmony_ci def __init__(self, loc): 515f9996aaSopenharmony_ci self.uid = "" 525f9996aaSopenharmony_ci self.gid = [] 535f9996aaSopenharmony_ci self.need_verified = False 545f9996aaSopenharmony_ci self.enabled_critical = False 555f9996aaSopenharmony_ci self.loc = loc 565f9996aaSopenharmony_ci self.critical = [] 575f9996aaSopenharmony_ci self.related_item = ProcessItem() 585f9996aaSopenharmony_ci 595f9996aaSopenharmony_ci @classmethod 605f9996aaSopenharmony_ci def _is_need_verified_uid(self, uid: str): 615f9996aaSopenharmony_ci return uid == "root" or uid == "system" 625f9996aaSopenharmony_ci 635f9996aaSopenharmony_ci @classmethod 645f9996aaSopenharmony_ci def _is_need_verified_gid(self, gid): 655f9996aaSopenharmony_ci # To enable gid-root validate, change it to "return gid == "root"" 665f9996aaSopenharmony_ci return False 675f9996aaSopenharmony_ci 685f9996aaSopenharmony_ci @classmethod 695f9996aaSopenharmony_ci def _is_need_verified_critical(self, critical: list): 705f9996aaSopenharmony_ci return critical[0] == 1 715f9996aaSopenharmony_ci 725f9996aaSopenharmony_ci def set_uid(self, uid): 735f9996aaSopenharmony_ci """ 745f9996aaSopenharmony_ci Set uid and check it at the same time. 755f9996aaSopenharmony_ci The uid needs to be validated only if _is_need_verified_uid return True 765f9996aaSopenharmony_ci """ 775f9996aaSopenharmony_ci if CfgItem._is_need_verified_uid(uid): 785f9996aaSopenharmony_ci self.uid = uid 795f9996aaSopenharmony_ci self.need_verified = True 805f9996aaSopenharmony_ci 815f9996aaSopenharmony_ci def append_gid(self, gid): 825f9996aaSopenharmony_ci """ 835f9996aaSopenharmony_ci Append gid and check it at the same time. 845f9996aaSopenharmony_ci The gid needs to be validated only if _is_need_verified_gid return True 855f9996aaSopenharmony_ci """ 865f9996aaSopenharmony_ci if CfgItem._is_need_verified_gid(gid) and gid not in self.gid: 875f9996aaSopenharmony_ci self.gid.append(gid) 885f9996aaSopenharmony_ci self.need_verified = True 895f9996aaSopenharmony_ci 905f9996aaSopenharmony_ci def set_critical(self, critical: bool): 915f9996aaSopenharmony_ci if CfgItem._is_need_verified_critical(critical): 925f9996aaSopenharmony_ci self.critical = critical 935f9996aaSopenharmony_ci self.enabled_critical = True 945f9996aaSopenharmony_ci 955f9996aaSopenharmony_ci def handle_socket(self, socket: dict): 965f9996aaSopenharmony_ci """ 975f9996aaSopenharmony_ci Validate possible field "socket" in the field "services" 985f9996aaSopenharmony_ci """ 995f9996aaSopenharmony_ci for i in socket: 1005f9996aaSopenharmony_ci if ("uid" in i) and CfgItem._is_need_verified_uid(i["uid"]): 1015f9996aaSopenharmony_ci self.need_verified = True 1025f9996aaSopenharmony_ci if self.uid != "" and self.uid != i["uid"]: 1035f9996aaSopenharmony_ci print("Error: uid and uid in socket is not same!") 1045f9996aaSopenharmony_ci print("Cfg location: {}".format(self.loc)) 1055f9996aaSopenharmony_ci raise CfgValidateError("Customization Error", "cfgs check not pass") 1065f9996aaSopenharmony_ci self.uid = i["uid"] 1075f9996aaSopenharmony_ci if "gid" in i : 1085f9996aaSopenharmony_ci if isinstance(i["gid"], str) and i["gid"] not in self.gid: 1095f9996aaSopenharmony_ci self.append_gid(i["gid"]) 1105f9996aaSopenharmony_ci continue 1115f9996aaSopenharmony_ci for item in i["gid"]: 1125f9996aaSopenharmony_ci self.append_gid(item) 1135f9996aaSopenharmony_ci 1145f9996aaSopenharmony_ci 1155f9996aaSopenharmony_ci def record_related_item(self, related_item): 1165f9996aaSopenharmony_ci """ 1175f9996aaSopenharmony_ci When its permissions does not match those in process list, 1185f9996aaSopenharmony_ci records the permissions given in process list 1195f9996aaSopenharmony_ci """ 1205f9996aaSopenharmony_ci self.related_item = related_item 1215f9996aaSopenharmony_ci 1225f9996aaSopenharmony_ci 1235f9996aaSopenharmony_ciclass ProcessItem: 1245f9996aaSopenharmony_ci """ 1255f9996aaSopenharmony_ci Processitem is the data structure of an item read from the process list 1265f9996aaSopenharmony_ci """ 1275f9996aaSopenharmony_ci def __init__(self): 1285f9996aaSopenharmony_ci self.name = "" 1295f9996aaSopenharmony_ci self.uid = "" 1305f9996aaSopenharmony_ci self.gid = [] 1315f9996aaSopenharmony_ci self.critical = [] 1325f9996aaSopenharmony_ci 1335f9996aaSopenharmony_ci def __init__(self, process_item=None): 1345f9996aaSopenharmony_ci """ 1355f9996aaSopenharmony_ci Use the JSON item in the process list to initialize the class 1365f9996aaSopenharmony_ci """ 1375f9996aaSopenharmony_ci if process_item is None: 1385f9996aaSopenharmony_ci self.name = "" 1395f9996aaSopenharmony_ci self.uid = "" 1405f9996aaSopenharmony_ci self.gid = [] 1415f9996aaSopenharmony_ci self.critical = [] 1425f9996aaSopenharmony_ci return 1435f9996aaSopenharmony_ci 1445f9996aaSopenharmony_ci self.name = process_item["name"] 1455f9996aaSopenharmony_ci 1465f9996aaSopenharmony_ci if "uid" in process_item: 1475f9996aaSopenharmony_ci self.uid = process_item["uid"] 1485f9996aaSopenharmony_ci else: 1495f9996aaSopenharmony_ci self.uid = "" 1505f9996aaSopenharmony_ci if "gid" in process_item: 1515f9996aaSopenharmony_ci if isinstance(process_item["gid"], str): 1525f9996aaSopenharmony_ci self.gid = [] 1535f9996aaSopenharmony_ci self.gid.append(process_item["gid"]) 1545f9996aaSopenharmony_ci else: 1555f9996aaSopenharmony_ci self.gid = process_item["gid"] 1565f9996aaSopenharmony_ci else: 1575f9996aaSopenharmony_ci self.gid = [] 1585f9996aaSopenharmony_ci if "critical" in process_item: 1595f9996aaSopenharmony_ci self.critical = process_item["critical"] 1605f9996aaSopenharmony_ci else: 1615f9996aaSopenharmony_ci self.critical = [] 1625f9996aaSopenharmony_ci 1635f9996aaSopenharmony_ci def verify(self, cfg_item): 1645f9996aaSopenharmony_ci """ 1655f9996aaSopenharmony_ci Returns whether the corresponding CFG (cfg_item) has passed the verification 1665f9996aaSopenharmony_ci """ 1675f9996aaSopenharmony_ci if self.uid or self.gid: 1685f9996aaSopenharmony_ci return self._verify_uid(cfg_item.uid) and self._verify_gid(cfg_item.gid) 1695f9996aaSopenharmony_ci if self.critical: 1705f9996aaSopenharmony_ci return self.critical == cfg_item.critical 1715f9996aaSopenharmony_ci return False 1725f9996aaSopenharmony_ci 1735f9996aaSopenharmony_ci def _verify_uid(self, uid): 1745f9996aaSopenharmony_ci return not ((uid == "root" or uid == "system") and (uid != self.uid)) 1755f9996aaSopenharmony_ci 1765f9996aaSopenharmony_ci def _verify_gid(self, gid): 1775f9996aaSopenharmony_ci return not ("root" in gid and "root" not in self.gid) 1785f9996aaSopenharmony_ci 1795f9996aaSopenharmony_ci 1805f9996aaSopenharmony_cidef print_privilege_hash(): 1815f9996aaSopenharmony_ci global PRIVILEGE_HASH 1825f9996aaSopenharmony_ci for i in PRIVILEGE_HASH.items(): 1835f9996aaSopenharmony_ci print("Name: {}\nuid: {}\ngiven uid: {}\ngid: ".format(i[0], i[1].uid, i[1].related_item.uid), end="") 1845f9996aaSopenharmony_ci 1855f9996aaSopenharmony_ci for gid in i[1].gid: 1865f9996aaSopenharmony_ci print(gid, end=" ") 1875f9996aaSopenharmony_ci print("") 1885f9996aaSopenharmony_ci 1895f9996aaSopenharmony_ci print("given gid: ", end=" ") 1905f9996aaSopenharmony_ci for gid in i[1].related_item.gid: 1915f9996aaSopenharmony_ci print(gid, end=" ") 1925f9996aaSopenharmony_ci print("") 1935f9996aaSopenharmony_ci print("Cfg location: {}".format(i[1].loc)) 1945f9996aaSopenharmony_ci print("") 1955f9996aaSopenharmony_ci 1965f9996aaSopenharmony_ci 1975f9996aaSopenharmony_cidef print_critical_hash(): 1985f9996aaSopenharmony_ci global CRITICAL_HASH 1995f9996aaSopenharmony_ci for i in CRITICAL_HASH.items(): 2005f9996aaSopenharmony_ci print("Cfg location: {}\n".format(i[1].loc), end="") 2015f9996aaSopenharmony_ci print("Name: {}\ncritical: {}\n".format(i[0], i[1].critical), end="") 2025f9996aaSopenharmony_ci print("Whitelist-allowed critical: {}\n".format(i[1].related_item.critical)) 2035f9996aaSopenharmony_ci print("") 2045f9996aaSopenharmony_ci 2055f9996aaSopenharmony_ci 2065f9996aaSopenharmony_cidef container_validate(process_path: str, list_name: str, item_container: list): 2075f9996aaSopenharmony_ci with open(process_path) as fp: 2085f9996aaSopenharmony_ci data = json.load(fp) 2095f9996aaSopenharmony_ci if list_name not in data: 2105f9996aaSopenharmony_ci print("Error: {}is not a valid whilelist, it has not a wanted field name".format(process_path)) 2115f9996aaSopenharmony_ci raise CfgValidateError("Customization Error", "cfgs check not pass") 2125f9996aaSopenharmony_ci 2135f9996aaSopenharmony_ci for i in data[list_name]: 2145f9996aaSopenharmony_ci if i["name"] not in item_container : 2155f9996aaSopenharmony_ci # no CfgItem in HASH meet the item in process list 2165f9996aaSopenharmony_ci continue 2175f9996aaSopenharmony_ci 2185f9996aaSopenharmony_ci temp_item = ProcessItem(i) 2195f9996aaSopenharmony_ci if temp_item.name not in item_container: 2205f9996aaSopenharmony_ci continue 2215f9996aaSopenharmony_ci 2225f9996aaSopenharmony_ci if temp_item.verify(item_container.get(temp_item.name)): 2235f9996aaSopenharmony_ci # Process field check passed, remove the corresponding service from HASH 2245f9996aaSopenharmony_ci item_container.pop(temp_item.name) 2255f9996aaSopenharmony_ci else: 2265f9996aaSopenharmony_ci item_container.get(temp_item.name).record_related_item(temp_item) 2275f9996aaSopenharmony_ci 2285f9996aaSopenharmony_ci 2295f9996aaSopenharmony_cidef check_container(critical_process_path: str): 2305f9996aaSopenharmony_ci global PRIVILEGE_HASH 2315f9996aaSopenharmony_ci global CRITICAL_HASH 2325f9996aaSopenharmony_ci if PRIVILEGE_HASH: 2335f9996aaSopenharmony_ci # The remaining services in HASH do not pass the high-privilege validation 2345f9996aaSopenharmony_ci print("Error: some services are not authenticated. Listed as follow:") 2355f9996aaSopenharmony_ci print_privilege_hash() 2365f9996aaSopenharmony_ci 2375f9996aaSopenharmony_ci raise CfgValidateError("Customization Error", "cfgs check not pass") 2385f9996aaSopenharmony_ci 2395f9996aaSopenharmony_ci if CRITICAL_HASH: 2405f9996aaSopenharmony_ci # The remaining services in HASH do not pass the critical validation 2415f9996aaSopenharmony_ci print("Error: some services do not match with critical whitelist({}).".format(critical_process_path), end="") 2425f9996aaSopenharmony_ci print(" Directly enable critical or modify enabled critical services are prohibited!", end="") 2435f9996aaSopenharmony_ci print(" Misconfigured services listed as follow:") 2445f9996aaSopenharmony_ci print_critical_hash() 2455f9996aaSopenharmony_ci 2465f9996aaSopenharmony_ci raise CfgValidateError("Customization Error", "cfgs check not pass") 2475f9996aaSopenharmony_ci return 2485f9996aaSopenharmony_ci 2495f9996aaSopenharmony_ci 2505f9996aaSopenharmony_ci 2515f9996aaSopenharmony_cidef validate_cfg_file(privilege_process_path: str, critical_process_path: str, result_path: str): 2525f9996aaSopenharmony_ci """ 2535f9996aaSopenharmony_ci Load the process list file 2545f9996aaSopenharmony_ci For each item in the list, find out whether there is a CfgItem needs validation in HASH 2555f9996aaSopenharmony_ci """ 2565f9996aaSopenharmony_ci global PRIVILEGE_HASH 2575f9996aaSopenharmony_ci global CRITICAL_HASH 2585f9996aaSopenharmony_ci if not os.path.exists(privilege_process_path): 2595f9996aaSopenharmony_ci print("High-privilege process check skipped: file [{}] not exist".format(privilege_process_path)) 2605f9996aaSopenharmony_ci PRIVILEGE_HASH.clear() 2615f9996aaSopenharmony_ci else: 2625f9996aaSopenharmony_ci container_validate(privilege_process_path, "high_privilege_process_list", PRIVILEGE_HASH) 2635f9996aaSopenharmony_ci if not os.path.exists(critical_process_path): 2645f9996aaSopenharmony_ci print("Critical-reboot process check skipped: file [{}] not exist".format(critical_process_path)) 2655f9996aaSopenharmony_ci CRITICAL_HASH.clear() 2665f9996aaSopenharmony_ci else: 2675f9996aaSopenharmony_ci container_validate(critical_process_path, "critical_reboot_process_list", CRITICAL_HASH) 2685f9996aaSopenharmony_ci 2695f9996aaSopenharmony_ci check_container(critical_process_path) 2705f9996aaSopenharmony_ci 2715f9996aaSopenharmony_ci 2725f9996aaSopenharmony_cidef handle_services(filename: str, field: str): 2735f9996aaSopenharmony_ci global PRIVILEGE_HASH 2745f9996aaSopenharmony_ci global CRITICAL_HASH 2755f9996aaSopenharmony_ci cfg_item = CfgItem(filename) 2765f9996aaSopenharmony_ci key = field['name'] 2775f9996aaSopenharmony_ci if "uid" in field: 2785f9996aaSopenharmony_ci cfg_item.set_uid(field["uid"]) 2795f9996aaSopenharmony_ci if "gid" in field: 2805f9996aaSopenharmony_ci if isinstance(field["gid"], str): 2815f9996aaSopenharmony_ci cfg_item.append_gid(field["gid"]) 2825f9996aaSopenharmony_ci else: 2835f9996aaSopenharmony_ci for item in field["gid"]: 2845f9996aaSopenharmony_ci cfg_item.append_gid(item) 2855f9996aaSopenharmony_ci if "socket" in field: 2865f9996aaSopenharmony_ci cfg_item.handle_socket(field["socket"]) 2875f9996aaSopenharmony_ci if "critical" in field: 2885f9996aaSopenharmony_ci cfg_item.set_critical(field["critical"]) 2895f9996aaSopenharmony_ci if cfg_item.need_verified: 2905f9996aaSopenharmony_ci # Services that need to check permissions are added to HASH 2915f9996aaSopenharmony_ci PRIVILEGE_HASH[key] = cfg_item 2925f9996aaSopenharmony_ci if cfg_item.enabled_critical: 2935f9996aaSopenharmony_ci # Services that need to check critical are added to HASH 2945f9996aaSopenharmony_ci CRITICAL_HASH[key] = cfg_item 2955f9996aaSopenharmony_ci 2965f9996aaSopenharmony_ci 2975f9996aaSopenharmony_cidef parse_cfg_file(filename: str): 2985f9996aaSopenharmony_ci """ 2995f9996aaSopenharmony_ci Load the cfg file in JSON format 3005f9996aaSopenharmony_ci """ 3015f9996aaSopenharmony_ci with open(filename) as fp: 3025f9996aaSopenharmony_ci try: 3035f9996aaSopenharmony_ci data = json.load(fp) 3045f9996aaSopenharmony_ci except json.decoder.JSONDecodeError: 3055f9996aaSopenharmony_ci print("\nError: loading cfg file {} failed. Cfg file not in JSON format!\n".format(filename)) 3065f9996aaSopenharmony_ci if "services" not in data: 3075f9996aaSopenharmony_ci return 3085f9996aaSopenharmony_ci for field in data['services']: 3095f9996aaSopenharmony_ci handle_services(filename, field) 3105f9996aaSopenharmony_ci return 3115f9996aaSopenharmony_ci 3125f9996aaSopenharmony_ci 3135f9996aaSopenharmony_cidef iterate_cfg_folder(cfg_dir: str): 3145f9996aaSopenharmony_ci for file in os.listdir(cfg_dir): 3155f9996aaSopenharmony_ci if file.endswith(".cfg"): 3165f9996aaSopenharmony_ci parse_cfg_file("{}/{}".format(cfg_dir, file)) 3175f9996aaSopenharmony_ci return 3185f9996aaSopenharmony_ci 3195f9996aaSopenharmony_ci 3205f9996aaSopenharmony_cidef main(): 3215f9996aaSopenharmony_ci opts, args = getopt.getopt(sys.argv[1:], '', ['sys-cfg-folder=', 'vendor-cfg-folder=', \ 3225f9996aaSopenharmony_ci 'high-privilege-process-list-path=', 'critical-reboot-process-list-path=', 'result-path=']) 3235f9996aaSopenharmony_ci 3245f9996aaSopenharmony_ci sys_cfg_folder = opts[0][1] 3255f9996aaSopenharmony_ci if not os.path.exists(sys_cfg_folder): 3265f9996aaSopenharmony_ci print("Process field check skipped: file [{}] not exist".format(sys_cfg_folder)) 3275f9996aaSopenharmony_ci return 3285f9996aaSopenharmony_ci 3295f9996aaSopenharmony_ci vendor_cfg_folder = opts[1][1] 3305f9996aaSopenharmony_ci if not os.path.exists(vendor_cfg_folder): 3315f9996aaSopenharmony_ci print("Process field check skipped: file [{}] not exist".format(vendor_cfg_folder)) 3325f9996aaSopenharmony_ci return 3335f9996aaSopenharmony_ci 3345f9996aaSopenharmony_ci privilege_process_path = opts[2][1] 3355f9996aaSopenharmony_ci 3365f9996aaSopenharmony_ci critical_process_path = opts[3][1] 3375f9996aaSopenharmony_ci 3385f9996aaSopenharmony_ci iterate_cfg_folder(sys_cfg_folder) 3395f9996aaSopenharmony_ci iterate_cfg_folder(vendor_cfg_folder) 3405f9996aaSopenharmony_ci validate_cfg_file(privilege_process_path, critical_process_path, None) 3415f9996aaSopenharmony_ci 3425f9996aaSopenharmony_ci return 3435f9996aaSopenharmony_ci 3445f9996aaSopenharmony_ciif __name__ == "__main__": 3455f9996aaSopenharmony_ci 3465f9996aaSopenharmony_ci main() 3475f9996aaSopenharmony_ci 348