140681896Sopenharmony_ci#!/usr/bin/env python
240681896Sopenharmony_ci# -*- coding: utf-8 -*-
340681896Sopenharmony_ci
440681896Sopenharmony_ci# Copyright (c) 2022 Huawei Device Co., Ltd.
540681896Sopenharmony_ci# Licensed under the Apache License, Version 2.0 (the "License");
640681896Sopenharmony_ci# you may not use this file except in compliance with the License.
740681896Sopenharmony_ci# You may obtain a copy of the License at
840681896Sopenharmony_ci#
940681896Sopenharmony_ci# http://www.apache.org/licenses/LICENSE-2.0
1040681896Sopenharmony_ci#
1140681896Sopenharmony_ci# Unless required by applicable law or agreed to in writing, software
1240681896Sopenharmony_ci# distributed under the License is distributed on an "AS IS" BASIS,
1340681896Sopenharmony_ci# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1440681896Sopenharmony_ci# See the License for the specific language governing permissions and
1540681896Sopenharmony_ci# limitations under the License.
1640681896Sopenharmony_ci
1740681896Sopenharmony_ciimport io
1840681896Sopenharmony_ciimport os
1940681896Sopenharmony_ciimport hashlib
2040681896Sopenharmony_ciimport struct
2140681896Sopenharmony_ci
2240681896Sopenharmony_cifrom log_exception import UPDATE_LOGGER
2340681896Sopenharmony_cifrom asn1crypto import cms
2440681896Sopenharmony_cifrom asn1crypto import pem
2540681896Sopenharmony_cifrom asn1crypto import util
2640681896Sopenharmony_cifrom asn1crypto import x509
2740681896Sopenharmony_cifrom cryptography.hazmat.backends import default_backend
2840681896Sopenharmony_cifrom cryptography.hazmat.primitives import serialization
2940681896Sopenharmony_cifrom cryptography.hazmat.primitives.asymmetric import padding
3040681896Sopenharmony_cifrom cryptography.hazmat.primitives import hashes
3140681896Sopenharmony_ci
3240681896Sopenharmony_cioperation_path = os.path.dirname(os.path.realpath(__file__))
3340681896Sopenharmony_ciCERT_PATH = os.path.join(operation_path, 'sign_cert/signing_cert.crt')
3440681896Sopenharmony_ciBLOCK_SIZE = 8192
3540681896Sopenharmony_ciFOOTER_LENGTH = 6
3640681896Sopenharmony_ciZIP_EOCD_LENGTH = 22
3740681896Sopenharmony_ciDIGEST_SHA256 = 672
3840681896Sopenharmony_ciSHA256_HASH_LEN = 32
3940681896Sopenharmony_ci
4040681896Sopenharmony_ciCONTENT_INFO_FORMAT = "<2H32s"
4140681896Sopenharmony_ci# the length of zip eocd comment
4240681896Sopenharmony_ciZIP_EOCD_COMMENT_LEN_FORMAT = "<H"
4340681896Sopenharmony_ci# signed package footer
4440681896Sopenharmony_ciSIGNATURE_FOOTER_FORMAT = "<3H"
4540681896Sopenharmony_ci
4640681896Sopenharmony_ci
4740681896Sopenharmony_cidef load_public_cert():
4840681896Sopenharmony_ci    with open(CERT_PATH, 'rb') as cert_file:
4940681896Sopenharmony_ci        der_bytes = cert_file.read()
5040681896Sopenharmony_ci        if pem.detect(der_bytes):
5140681896Sopenharmony_ci            type_name, headers, der_bytes = pem.unarmor(der_bytes)
5240681896Sopenharmony_ci
5340681896Sopenharmony_ci    return x509.Certificate.load(der_bytes)
5440681896Sopenharmony_ci
5540681896Sopenharmony_ci
5640681896Sopenharmony_cidef calculate_package_hash(package_path):
5740681896Sopenharmony_ci    """
5840681896Sopenharmony_ci    :return: (hash) for path using hashlib.sha256()
5940681896Sopenharmony_ci    """
6040681896Sopenharmony_ci    hash_sha256 = hashlib.sha256()
6140681896Sopenharmony_ci    length = 0
6240681896Sopenharmony_ci
6340681896Sopenharmony_ci    remain_len = os.path.getsize(package_path) - ZIP_EOCD_LENGTH
6440681896Sopenharmony_ci    with open(package_path, 'rb') as package_file:
6540681896Sopenharmony_ci        while remain_len > BLOCK_SIZE:
6640681896Sopenharmony_ci            hash_sha256.update(package_file.read(BLOCK_SIZE))
6740681896Sopenharmony_ci            remain_len -= BLOCK_SIZE
6840681896Sopenharmony_ci        if remain_len > 0:
6940681896Sopenharmony_ci            hash_sha256.update(package_file.read(remain_len))
7040681896Sopenharmony_ci
7140681896Sopenharmony_ci    return hash_sha256.digest()
7240681896Sopenharmony_ci
7340681896Sopenharmony_ci
7440681896Sopenharmony_cidef sign_digest_with_pss(digest, private_key_file):
7540681896Sopenharmony_ci    # read private key from pem file
7640681896Sopenharmony_ci    try:
7740681896Sopenharmony_ci        with open(private_key_file, 'rb') as f_r:
7840681896Sopenharmony_ci            key_data = f_r.read()
7940681896Sopenharmony_ci
8040681896Sopenharmony_ci        private_key = serialization.load_pem_private_key(
8140681896Sopenharmony_ci            key_data,
8240681896Sopenharmony_ci            password=None,
8340681896Sopenharmony_ci            backend=default_backend())
8440681896Sopenharmony_ci        pad = padding.PSS(
8540681896Sopenharmony_ci            mgf=padding.MGF1(hashes.SHA256()),
8640681896Sopenharmony_ci            salt_length=padding.PSS.MAX_LENGTH)
8740681896Sopenharmony_ci
8840681896Sopenharmony_ci        signature = private_key.sign(
8940681896Sopenharmony_ci            digest,
9040681896Sopenharmony_ci            pad,
9140681896Sopenharmony_ci            hashes.SHA256()
9240681896Sopenharmony_ci        )
9340681896Sopenharmony_ci    except (OSError, ValueError):
9440681896Sopenharmony_ci        return False
9540681896Sopenharmony_ci    return signature
9640681896Sopenharmony_ci
9740681896Sopenharmony_ci
9840681896Sopenharmony_cidef sign_digest(digest, private_key_file):
9940681896Sopenharmony_ci    # read private key from pem file
10040681896Sopenharmony_ci    try:
10140681896Sopenharmony_ci        with open(private_key_file, 'rb') as f_r:
10240681896Sopenharmony_ci            key_data = f_r.read()
10340681896Sopenharmony_ci
10440681896Sopenharmony_ci        private_key = serialization.load_pem_private_key(
10540681896Sopenharmony_ci            key_data,
10640681896Sopenharmony_ci            password=None,
10740681896Sopenharmony_ci            backend=default_backend())
10840681896Sopenharmony_ci
10940681896Sopenharmony_ci        signature = private_key.sign(
11040681896Sopenharmony_ci            digest,
11140681896Sopenharmony_ci            padding.PKCS1v15(),
11240681896Sopenharmony_ci            hashes.SHA256()
11340681896Sopenharmony_ci        )
11440681896Sopenharmony_ci    except (OSError, ValueError):
11540681896Sopenharmony_ci        return False
11640681896Sopenharmony_ci    return signature
11740681896Sopenharmony_ci
11840681896Sopenharmony_ci
11940681896Sopenharmony_cidef create_encap_content_info(digest):
12040681896Sopenharmony_ci    if not digest:
12140681896Sopenharmony_ci        UPDATE_LOGGER.print_log("calc package hash failed! file: %s",
12240681896Sopenharmony_ci            log_type=UPDATE_LOGGER.ERROR_LOG)
12340681896Sopenharmony_ci        return False
12440681896Sopenharmony_ci    content_header = struct.pack(CONTENT_INFO_FORMAT, DIGEST_SHA256,
12540681896Sopenharmony_ci        SHA256_HASH_LEN, digest)
12640681896Sopenharmony_ci    return content_header
12740681896Sopenharmony_ci
12840681896Sopenharmony_ci
12940681896Sopenharmony_cidef write_signed_package(unsigned_package, signature, signed_package):
13040681896Sopenharmony_ci    """
13140681896Sopenharmony_ci    :Write signature to signed package
13240681896Sopenharmony_ci    """
13340681896Sopenharmony_ci    signature_size = len(signature)
13440681896Sopenharmony_ci    signature_total_size = signature_size + FOOTER_LENGTH
13540681896Sopenharmony_ci
13640681896Sopenharmony_ci    package_fd = os.open(signed_package, os.O_RDWR | os.O_CREAT | os.O_TRUNC, 0o755)
13740681896Sopenharmony_ci    f_signed = os.fdopen(package_fd, 'wb')
13840681896Sopenharmony_ci
13940681896Sopenharmony_ci    remain_len = os.path.getsize(unsigned_package) - 2
14040681896Sopenharmony_ci    with open(unsigned_package, 'rb') as f_unsign:
14140681896Sopenharmony_ci        while remain_len > BLOCK_SIZE:
14240681896Sopenharmony_ci            f_signed.write(f_unsign.read(BLOCK_SIZE))
14340681896Sopenharmony_ci            remain_len -= BLOCK_SIZE
14440681896Sopenharmony_ci        if remain_len > 0:
14540681896Sopenharmony_ci            f_signed.write(f_unsign.read(remain_len))
14640681896Sopenharmony_ci
14740681896Sopenharmony_ci    zip_comment_len = struct.pack(ZIP_EOCD_COMMENT_LEN_FORMAT,
14840681896Sopenharmony_ci            signature_total_size)
14940681896Sopenharmony_ci    f_signed.write(zip_comment_len)
15040681896Sopenharmony_ci
15140681896Sopenharmony_ci    f_signed.write(signature)
15240681896Sopenharmony_ci    footer = struct.pack(SIGNATURE_FOOTER_FORMAT, signature_total_size,
15340681896Sopenharmony_ci            0xffff, signature_total_size)
15440681896Sopenharmony_ci    f_signed.write(footer)
15540681896Sopenharmony_ci    f_signed.close()
15640681896Sopenharmony_ci
15740681896Sopenharmony_ci
15840681896Sopenharmony_cidef sign_ota_package(package_path, signed_package, private_key):
15940681896Sopenharmony_ci    digest = calculate_package_hash(package_path)
16040681896Sopenharmony_ci    data = create_encap_content_info(digest)
16140681896Sopenharmony_ci    signature = sign_digest(digest, private_key)
16240681896Sopenharmony_ci
16340681896Sopenharmony_ci    digest_fd = os.open("digest", os.O_RDWR | os.O_CREAT, 0o755)
16440681896Sopenharmony_ci    digest_file = os.fdopen(digest_fd, 'wb')
16540681896Sopenharmony_ci    digest_file.write(digest)
16640681896Sopenharmony_ci    digest_file.close()
16740681896Sopenharmony_ci
16840681896Sopenharmony_ci    signatute_fd = os.open("signature", os.O_RDWR | os.O_CREAT, 0o755)
16940681896Sopenharmony_ci    signatute_file = os.fdopen(signatute_fd, 'wb')
17040681896Sopenharmony_ci    signatute_file.write(signature)
17140681896Sopenharmony_ci    signatute_file.close()
17240681896Sopenharmony_ci
17340681896Sopenharmony_ci    # Creating a SignedData object from cms
17440681896Sopenharmony_ci    signed_data = cms.SignedData()
17540681896Sopenharmony_ci    signed_data['version'] = 'v1'
17640681896Sopenharmony_ci    signed_data['encap_content_info'] = util.OrderedDict([
17740681896Sopenharmony_ci        ('content_type', 'data'),
17840681896Sopenharmony_ci        ('content', data)])
17940681896Sopenharmony_ci
18040681896Sopenharmony_ci    signed_data['digest_algorithms'] = [util.OrderedDict([
18140681896Sopenharmony_ci        ('algorithm', 'sha256'),
18240681896Sopenharmony_ci        ('parameters', None)])]
18340681896Sopenharmony_ci
18440681896Sopenharmony_ci    cert = load_public_cert()
18540681896Sopenharmony_ci
18640681896Sopenharmony_ci    # Adding this certificate to SignedData object
18740681896Sopenharmony_ci    signed_data['certificates'] = [cert]
18840681896Sopenharmony_ci
18940681896Sopenharmony_ci    # Setting signer info section
19040681896Sopenharmony_ci    signer_info = cms.SignerInfo()
19140681896Sopenharmony_ci    signer_info['version'] = 'v1'
19240681896Sopenharmony_ci    signer_info['digest_algorithm'] = util.OrderedDict([
19340681896Sopenharmony_ci                ('algorithm', 'sha256'),
19440681896Sopenharmony_ci                ('parameters', None)])
19540681896Sopenharmony_ci    signer_info['signature_algorithm'] = util.OrderedDict([
19640681896Sopenharmony_ci                ('algorithm', 'sha256_rsa'),
19740681896Sopenharmony_ci                ('parameters', None)])
19840681896Sopenharmony_ci
19940681896Sopenharmony_ci    issuer = cert.issuer
20040681896Sopenharmony_ci    serial_number = cert.serial_number
20140681896Sopenharmony_ci    issuer_and_serial = cms.IssuerAndSerialNumber()
20240681896Sopenharmony_ci    issuer_and_serial['issuer'] = cert.issuer
20340681896Sopenharmony_ci    issuer_and_serial['serial_number'] = cert.serial_number
20440681896Sopenharmony_ci
20540681896Sopenharmony_ci    key_id = cert.key_identifier_value.native
20640681896Sopenharmony_ci    signer_info['sid'] = cms.SignerIdentifier({
20740681896Sopenharmony_ci        'issuer_and_serial_number': issuer_and_serial})
20840681896Sopenharmony_ci
20940681896Sopenharmony_ci    signer_info['signature'] = signature
21040681896Sopenharmony_ci    # Adding SignerInfo object to SignedData object
21140681896Sopenharmony_ci    signed_data['signer_infos'] = [signer_info]
21240681896Sopenharmony_ci
21340681896Sopenharmony_ci    # Writing everything into ASN.1 object
21440681896Sopenharmony_ci    asn1obj = cms.ContentInfo()
21540681896Sopenharmony_ci    asn1obj['content_type'] = 'signed_data'
21640681896Sopenharmony_ci    asn1obj['content'] = signed_data
21740681896Sopenharmony_ci
21840681896Sopenharmony_ci    # This asn1obj can be dumped to a disk using dump() method (DER format)
21940681896Sopenharmony_ci    write_signed_package(package_path, asn1obj.dump(), signed_package)
22040681896Sopenharmony_ci    return True