17db96d56Sopenharmony_ci"""Stuff to parse WAVE files. 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciUsage. 47db96d56Sopenharmony_ci 57db96d56Sopenharmony_ciReading WAVE files: 67db96d56Sopenharmony_ci f = wave.open(file, 'r') 77db96d56Sopenharmony_ciwhere file is either the name of a file or an open file pointer. 87db96d56Sopenharmony_ciThe open file pointer must have methods read(), seek(), and close(). 97db96d56Sopenharmony_ciWhen the setpos() and rewind() methods are not used, the seek() 107db96d56Sopenharmony_cimethod is not necessary. 117db96d56Sopenharmony_ci 127db96d56Sopenharmony_ciThis returns an instance of a class with the following public methods: 137db96d56Sopenharmony_ci getnchannels() -- returns number of audio channels (1 for 147db96d56Sopenharmony_ci mono, 2 for stereo) 157db96d56Sopenharmony_ci getsampwidth() -- returns sample width in bytes 167db96d56Sopenharmony_ci getframerate() -- returns sampling frequency 177db96d56Sopenharmony_ci getnframes() -- returns number of audio frames 187db96d56Sopenharmony_ci getcomptype() -- returns compression type ('NONE' for linear samples) 197db96d56Sopenharmony_ci getcompname() -- returns human-readable version of 207db96d56Sopenharmony_ci compression type ('not compressed' linear samples) 217db96d56Sopenharmony_ci getparams() -- returns a namedtuple consisting of all of the 227db96d56Sopenharmony_ci above in the above order 237db96d56Sopenharmony_ci getmarkers() -- returns None (for compatibility with the 247db96d56Sopenharmony_ci aifc module) 257db96d56Sopenharmony_ci getmark(id) -- raises an error since the mark does not 267db96d56Sopenharmony_ci exist (for compatibility with the aifc module) 277db96d56Sopenharmony_ci readframes(n) -- returns at most n frames of audio 287db96d56Sopenharmony_ci rewind() -- rewind to the beginning of the audio stream 297db96d56Sopenharmony_ci setpos(pos) -- seek to the specified position 307db96d56Sopenharmony_ci tell() -- return the current position 317db96d56Sopenharmony_ci close() -- close the instance (make it unusable) 327db96d56Sopenharmony_ciThe position returned by tell() and the position given to setpos() 337db96d56Sopenharmony_ciare compatible and have nothing to do with the actual position in the 347db96d56Sopenharmony_cifile. 357db96d56Sopenharmony_ciThe close() method is called automatically when the class instance 367db96d56Sopenharmony_ciis destroyed. 377db96d56Sopenharmony_ci 387db96d56Sopenharmony_ciWriting WAVE files: 397db96d56Sopenharmony_ci f = wave.open(file, 'w') 407db96d56Sopenharmony_ciwhere file is either the name of a file or an open file pointer. 417db96d56Sopenharmony_ciThe open file pointer must have methods write(), tell(), seek(), and 427db96d56Sopenharmony_ciclose(). 437db96d56Sopenharmony_ci 447db96d56Sopenharmony_ciThis returns an instance of a class with the following public methods: 457db96d56Sopenharmony_ci setnchannels(n) -- set the number of channels 467db96d56Sopenharmony_ci setsampwidth(n) -- set the sample width 477db96d56Sopenharmony_ci setframerate(n) -- set the frame rate 487db96d56Sopenharmony_ci setnframes(n) -- set the number of frames 497db96d56Sopenharmony_ci setcomptype(type, name) 507db96d56Sopenharmony_ci -- set the compression type and the 517db96d56Sopenharmony_ci human-readable compression type 527db96d56Sopenharmony_ci setparams(tuple) 537db96d56Sopenharmony_ci -- set all parameters at once 547db96d56Sopenharmony_ci tell() -- return current position in output file 557db96d56Sopenharmony_ci writeframesraw(data) 567db96d56Sopenharmony_ci -- write audio frames without patching up the 577db96d56Sopenharmony_ci file header 587db96d56Sopenharmony_ci writeframes(data) 597db96d56Sopenharmony_ci -- write audio frames and patch up the file header 607db96d56Sopenharmony_ci close() -- patch up the file header and close the 617db96d56Sopenharmony_ci output file 627db96d56Sopenharmony_ciYou should set the parameters before the first writeframesraw or 637db96d56Sopenharmony_ciwriteframes. The total number of frames does not need to be set, 647db96d56Sopenharmony_cibut when it is set to the correct value, the header does not have to 657db96d56Sopenharmony_cibe patched up. 667db96d56Sopenharmony_ciIt is best to first set all parameters, perhaps possibly the 677db96d56Sopenharmony_cicompression type, and then write audio frames using writeframesraw. 687db96d56Sopenharmony_ciWhen all frames have been written, either call writeframes(b'') or 697db96d56Sopenharmony_ciclose() to patch up the sizes in the header. 707db96d56Sopenharmony_ciThe close() method is called automatically when the class instance 717db96d56Sopenharmony_ciis destroyed. 727db96d56Sopenharmony_ci""" 737db96d56Sopenharmony_ci 747db96d56Sopenharmony_cifrom collections import namedtuple 757db96d56Sopenharmony_ciimport builtins 767db96d56Sopenharmony_ciimport struct 777db96d56Sopenharmony_ciimport sys 787db96d56Sopenharmony_ci 797db96d56Sopenharmony_ci 807db96d56Sopenharmony_ci__all__ = ["open", "Error", "Wave_read", "Wave_write"] 817db96d56Sopenharmony_ci 827db96d56Sopenharmony_ciclass Error(Exception): 837db96d56Sopenharmony_ci pass 847db96d56Sopenharmony_ci 857db96d56Sopenharmony_ciWAVE_FORMAT_PCM = 0x0001 867db96d56Sopenharmony_ci 877db96d56Sopenharmony_ci_array_fmts = None, 'b', 'h', None, 'i' 887db96d56Sopenharmony_ci 897db96d56Sopenharmony_ci_wave_params = namedtuple('_wave_params', 907db96d56Sopenharmony_ci 'nchannels sampwidth framerate nframes comptype compname') 917db96d56Sopenharmony_ci 927db96d56Sopenharmony_ci 937db96d56Sopenharmony_cidef _byteswap(data, width): 947db96d56Sopenharmony_ci swapped_data = bytearray(len(data)) 957db96d56Sopenharmony_ci 967db96d56Sopenharmony_ci for i in range(0, len(data), width): 977db96d56Sopenharmony_ci for j in range(width): 987db96d56Sopenharmony_ci swapped_data[i + width - 1 - j] = data[i + j] 997db96d56Sopenharmony_ci 1007db96d56Sopenharmony_ci return bytes(swapped_data) 1017db96d56Sopenharmony_ci 1027db96d56Sopenharmony_ci 1037db96d56Sopenharmony_ciclass _Chunk: 1047db96d56Sopenharmony_ci def __init__(self, file, align=True, bigendian=True, inclheader=False): 1057db96d56Sopenharmony_ci self.closed = False 1067db96d56Sopenharmony_ci self.align = align # whether to align to word (2-byte) boundaries 1077db96d56Sopenharmony_ci if bigendian: 1087db96d56Sopenharmony_ci strflag = '>' 1097db96d56Sopenharmony_ci else: 1107db96d56Sopenharmony_ci strflag = '<' 1117db96d56Sopenharmony_ci self.file = file 1127db96d56Sopenharmony_ci self.chunkname = file.read(4) 1137db96d56Sopenharmony_ci if len(self.chunkname) < 4: 1147db96d56Sopenharmony_ci raise EOFError 1157db96d56Sopenharmony_ci try: 1167db96d56Sopenharmony_ci self.chunksize = struct.unpack_from(strflag+'L', file.read(4))[0] 1177db96d56Sopenharmony_ci except struct.error: 1187db96d56Sopenharmony_ci raise EOFError from None 1197db96d56Sopenharmony_ci if inclheader: 1207db96d56Sopenharmony_ci self.chunksize = self.chunksize - 8 # subtract header 1217db96d56Sopenharmony_ci self.size_read = 0 1227db96d56Sopenharmony_ci try: 1237db96d56Sopenharmony_ci self.offset = self.file.tell() 1247db96d56Sopenharmony_ci except (AttributeError, OSError): 1257db96d56Sopenharmony_ci self.seekable = False 1267db96d56Sopenharmony_ci else: 1277db96d56Sopenharmony_ci self.seekable = True 1287db96d56Sopenharmony_ci 1297db96d56Sopenharmony_ci def getname(self): 1307db96d56Sopenharmony_ci """Return the name (ID) of the current chunk.""" 1317db96d56Sopenharmony_ci return self.chunkname 1327db96d56Sopenharmony_ci 1337db96d56Sopenharmony_ci def close(self): 1347db96d56Sopenharmony_ci if not self.closed: 1357db96d56Sopenharmony_ci try: 1367db96d56Sopenharmony_ci self.skip() 1377db96d56Sopenharmony_ci finally: 1387db96d56Sopenharmony_ci self.closed = True 1397db96d56Sopenharmony_ci 1407db96d56Sopenharmony_ci def seek(self, pos, whence=0): 1417db96d56Sopenharmony_ci """Seek to specified position into the chunk. 1427db96d56Sopenharmony_ci Default position is 0 (start of chunk). 1437db96d56Sopenharmony_ci If the file is not seekable, this will result in an error. 1447db96d56Sopenharmony_ci """ 1457db96d56Sopenharmony_ci 1467db96d56Sopenharmony_ci if self.closed: 1477db96d56Sopenharmony_ci raise ValueError("I/O operation on closed file") 1487db96d56Sopenharmony_ci if not self.seekable: 1497db96d56Sopenharmony_ci raise OSError("cannot seek") 1507db96d56Sopenharmony_ci if whence == 1: 1517db96d56Sopenharmony_ci pos = pos + self.size_read 1527db96d56Sopenharmony_ci elif whence == 2: 1537db96d56Sopenharmony_ci pos = pos + self.chunksize 1547db96d56Sopenharmony_ci if pos < 0 or pos > self.chunksize: 1557db96d56Sopenharmony_ci raise RuntimeError 1567db96d56Sopenharmony_ci self.file.seek(self.offset + pos, 0) 1577db96d56Sopenharmony_ci self.size_read = pos 1587db96d56Sopenharmony_ci 1597db96d56Sopenharmony_ci def tell(self): 1607db96d56Sopenharmony_ci if self.closed: 1617db96d56Sopenharmony_ci raise ValueError("I/O operation on closed file") 1627db96d56Sopenharmony_ci return self.size_read 1637db96d56Sopenharmony_ci 1647db96d56Sopenharmony_ci def read(self, size=-1): 1657db96d56Sopenharmony_ci """Read at most size bytes from the chunk. 1667db96d56Sopenharmony_ci If size is omitted or negative, read until the end 1677db96d56Sopenharmony_ci of the chunk. 1687db96d56Sopenharmony_ci """ 1697db96d56Sopenharmony_ci 1707db96d56Sopenharmony_ci if self.closed: 1717db96d56Sopenharmony_ci raise ValueError("I/O operation on closed file") 1727db96d56Sopenharmony_ci if self.size_read >= self.chunksize: 1737db96d56Sopenharmony_ci return b'' 1747db96d56Sopenharmony_ci if size < 0: 1757db96d56Sopenharmony_ci size = self.chunksize - self.size_read 1767db96d56Sopenharmony_ci if size > self.chunksize - self.size_read: 1777db96d56Sopenharmony_ci size = self.chunksize - self.size_read 1787db96d56Sopenharmony_ci data = self.file.read(size) 1797db96d56Sopenharmony_ci self.size_read = self.size_read + len(data) 1807db96d56Sopenharmony_ci if self.size_read == self.chunksize and \ 1817db96d56Sopenharmony_ci self.align and \ 1827db96d56Sopenharmony_ci (self.chunksize & 1): 1837db96d56Sopenharmony_ci dummy = self.file.read(1) 1847db96d56Sopenharmony_ci self.size_read = self.size_read + len(dummy) 1857db96d56Sopenharmony_ci return data 1867db96d56Sopenharmony_ci 1877db96d56Sopenharmony_ci def skip(self): 1887db96d56Sopenharmony_ci """Skip the rest of the chunk. 1897db96d56Sopenharmony_ci If you are not interested in the contents of the chunk, 1907db96d56Sopenharmony_ci this method should be called so that the file points to 1917db96d56Sopenharmony_ci the start of the next chunk. 1927db96d56Sopenharmony_ci """ 1937db96d56Sopenharmony_ci 1947db96d56Sopenharmony_ci if self.closed: 1957db96d56Sopenharmony_ci raise ValueError("I/O operation on closed file") 1967db96d56Sopenharmony_ci if self.seekable: 1977db96d56Sopenharmony_ci try: 1987db96d56Sopenharmony_ci n = self.chunksize - self.size_read 1997db96d56Sopenharmony_ci # maybe fix alignment 2007db96d56Sopenharmony_ci if self.align and (self.chunksize & 1): 2017db96d56Sopenharmony_ci n = n + 1 2027db96d56Sopenharmony_ci self.file.seek(n, 1) 2037db96d56Sopenharmony_ci self.size_read = self.size_read + n 2047db96d56Sopenharmony_ci return 2057db96d56Sopenharmony_ci except OSError: 2067db96d56Sopenharmony_ci pass 2077db96d56Sopenharmony_ci while self.size_read < self.chunksize: 2087db96d56Sopenharmony_ci n = min(8192, self.chunksize - self.size_read) 2097db96d56Sopenharmony_ci dummy = self.read(n) 2107db96d56Sopenharmony_ci if not dummy: 2117db96d56Sopenharmony_ci raise EOFError 2127db96d56Sopenharmony_ci 2137db96d56Sopenharmony_ci 2147db96d56Sopenharmony_ciclass Wave_read: 2157db96d56Sopenharmony_ci """Variables used in this class: 2167db96d56Sopenharmony_ci 2177db96d56Sopenharmony_ci These variables are available to the user though appropriate 2187db96d56Sopenharmony_ci methods of this class: 2197db96d56Sopenharmony_ci _file -- the open file with methods read(), close(), and seek() 2207db96d56Sopenharmony_ci set through the __init__() method 2217db96d56Sopenharmony_ci _nchannels -- the number of audio channels 2227db96d56Sopenharmony_ci available through the getnchannels() method 2237db96d56Sopenharmony_ci _nframes -- the number of audio frames 2247db96d56Sopenharmony_ci available through the getnframes() method 2257db96d56Sopenharmony_ci _sampwidth -- the number of bytes per audio sample 2267db96d56Sopenharmony_ci available through the getsampwidth() method 2277db96d56Sopenharmony_ci _framerate -- the sampling frequency 2287db96d56Sopenharmony_ci available through the getframerate() method 2297db96d56Sopenharmony_ci _comptype -- the AIFF-C compression type ('NONE' if AIFF) 2307db96d56Sopenharmony_ci available through the getcomptype() method 2317db96d56Sopenharmony_ci _compname -- the human-readable AIFF-C compression type 2327db96d56Sopenharmony_ci available through the getcomptype() method 2337db96d56Sopenharmony_ci _soundpos -- the position in the audio stream 2347db96d56Sopenharmony_ci available through the tell() method, set through the 2357db96d56Sopenharmony_ci setpos() method 2367db96d56Sopenharmony_ci 2377db96d56Sopenharmony_ci These variables are used internally only: 2387db96d56Sopenharmony_ci _fmt_chunk_read -- 1 iff the FMT chunk has been read 2397db96d56Sopenharmony_ci _data_seek_needed -- 1 iff positioned correctly in audio 2407db96d56Sopenharmony_ci file for readframes() 2417db96d56Sopenharmony_ci _data_chunk -- instantiation of a chunk class for the DATA chunk 2427db96d56Sopenharmony_ci _framesize -- size of one frame in the file 2437db96d56Sopenharmony_ci """ 2447db96d56Sopenharmony_ci 2457db96d56Sopenharmony_ci def initfp(self, file): 2467db96d56Sopenharmony_ci self._convert = None 2477db96d56Sopenharmony_ci self._soundpos = 0 2487db96d56Sopenharmony_ci self._file = _Chunk(file, bigendian = 0) 2497db96d56Sopenharmony_ci if self._file.getname() != b'RIFF': 2507db96d56Sopenharmony_ci raise Error('file does not start with RIFF id') 2517db96d56Sopenharmony_ci if self._file.read(4) != b'WAVE': 2527db96d56Sopenharmony_ci raise Error('not a WAVE file') 2537db96d56Sopenharmony_ci self._fmt_chunk_read = 0 2547db96d56Sopenharmony_ci self._data_chunk = None 2557db96d56Sopenharmony_ci while 1: 2567db96d56Sopenharmony_ci self._data_seek_needed = 1 2577db96d56Sopenharmony_ci try: 2587db96d56Sopenharmony_ci chunk = _Chunk(self._file, bigendian = 0) 2597db96d56Sopenharmony_ci except EOFError: 2607db96d56Sopenharmony_ci break 2617db96d56Sopenharmony_ci chunkname = chunk.getname() 2627db96d56Sopenharmony_ci if chunkname == b'fmt ': 2637db96d56Sopenharmony_ci self._read_fmt_chunk(chunk) 2647db96d56Sopenharmony_ci self._fmt_chunk_read = 1 2657db96d56Sopenharmony_ci elif chunkname == b'data': 2667db96d56Sopenharmony_ci if not self._fmt_chunk_read: 2677db96d56Sopenharmony_ci raise Error('data chunk before fmt chunk') 2687db96d56Sopenharmony_ci self._data_chunk = chunk 2697db96d56Sopenharmony_ci self._nframes = chunk.chunksize // self._framesize 2707db96d56Sopenharmony_ci self._data_seek_needed = 0 2717db96d56Sopenharmony_ci break 2727db96d56Sopenharmony_ci chunk.skip() 2737db96d56Sopenharmony_ci if not self._fmt_chunk_read or not self._data_chunk: 2747db96d56Sopenharmony_ci raise Error('fmt chunk and/or data chunk missing') 2757db96d56Sopenharmony_ci 2767db96d56Sopenharmony_ci def __init__(self, f): 2777db96d56Sopenharmony_ci self._i_opened_the_file = None 2787db96d56Sopenharmony_ci if isinstance(f, str): 2797db96d56Sopenharmony_ci f = builtins.open(f, 'rb') 2807db96d56Sopenharmony_ci self._i_opened_the_file = f 2817db96d56Sopenharmony_ci # else, assume it is an open file object already 2827db96d56Sopenharmony_ci try: 2837db96d56Sopenharmony_ci self.initfp(f) 2847db96d56Sopenharmony_ci except: 2857db96d56Sopenharmony_ci if self._i_opened_the_file: 2867db96d56Sopenharmony_ci f.close() 2877db96d56Sopenharmony_ci raise 2887db96d56Sopenharmony_ci 2897db96d56Sopenharmony_ci def __del__(self): 2907db96d56Sopenharmony_ci self.close() 2917db96d56Sopenharmony_ci 2927db96d56Sopenharmony_ci def __enter__(self): 2937db96d56Sopenharmony_ci return self 2947db96d56Sopenharmony_ci 2957db96d56Sopenharmony_ci def __exit__(self, *args): 2967db96d56Sopenharmony_ci self.close() 2977db96d56Sopenharmony_ci 2987db96d56Sopenharmony_ci # 2997db96d56Sopenharmony_ci # User visible methods. 3007db96d56Sopenharmony_ci # 3017db96d56Sopenharmony_ci def getfp(self): 3027db96d56Sopenharmony_ci return self._file 3037db96d56Sopenharmony_ci 3047db96d56Sopenharmony_ci def rewind(self): 3057db96d56Sopenharmony_ci self._data_seek_needed = 1 3067db96d56Sopenharmony_ci self._soundpos = 0 3077db96d56Sopenharmony_ci 3087db96d56Sopenharmony_ci def close(self): 3097db96d56Sopenharmony_ci self._file = None 3107db96d56Sopenharmony_ci file = self._i_opened_the_file 3117db96d56Sopenharmony_ci if file: 3127db96d56Sopenharmony_ci self._i_opened_the_file = None 3137db96d56Sopenharmony_ci file.close() 3147db96d56Sopenharmony_ci 3157db96d56Sopenharmony_ci def tell(self): 3167db96d56Sopenharmony_ci return self._soundpos 3177db96d56Sopenharmony_ci 3187db96d56Sopenharmony_ci def getnchannels(self): 3197db96d56Sopenharmony_ci return self._nchannels 3207db96d56Sopenharmony_ci 3217db96d56Sopenharmony_ci def getnframes(self): 3227db96d56Sopenharmony_ci return self._nframes 3237db96d56Sopenharmony_ci 3247db96d56Sopenharmony_ci def getsampwidth(self): 3257db96d56Sopenharmony_ci return self._sampwidth 3267db96d56Sopenharmony_ci 3277db96d56Sopenharmony_ci def getframerate(self): 3287db96d56Sopenharmony_ci return self._framerate 3297db96d56Sopenharmony_ci 3307db96d56Sopenharmony_ci def getcomptype(self): 3317db96d56Sopenharmony_ci return self._comptype 3327db96d56Sopenharmony_ci 3337db96d56Sopenharmony_ci def getcompname(self): 3347db96d56Sopenharmony_ci return self._compname 3357db96d56Sopenharmony_ci 3367db96d56Sopenharmony_ci def getparams(self): 3377db96d56Sopenharmony_ci return _wave_params(self.getnchannels(), self.getsampwidth(), 3387db96d56Sopenharmony_ci self.getframerate(), self.getnframes(), 3397db96d56Sopenharmony_ci self.getcomptype(), self.getcompname()) 3407db96d56Sopenharmony_ci 3417db96d56Sopenharmony_ci def getmarkers(self): 3427db96d56Sopenharmony_ci return None 3437db96d56Sopenharmony_ci 3447db96d56Sopenharmony_ci def getmark(self, id): 3457db96d56Sopenharmony_ci raise Error('no marks') 3467db96d56Sopenharmony_ci 3477db96d56Sopenharmony_ci def setpos(self, pos): 3487db96d56Sopenharmony_ci if pos < 0 or pos > self._nframes: 3497db96d56Sopenharmony_ci raise Error('position not in range') 3507db96d56Sopenharmony_ci self._soundpos = pos 3517db96d56Sopenharmony_ci self._data_seek_needed = 1 3527db96d56Sopenharmony_ci 3537db96d56Sopenharmony_ci def readframes(self, nframes): 3547db96d56Sopenharmony_ci if self._data_seek_needed: 3557db96d56Sopenharmony_ci self._data_chunk.seek(0, 0) 3567db96d56Sopenharmony_ci pos = self._soundpos * self._framesize 3577db96d56Sopenharmony_ci if pos: 3587db96d56Sopenharmony_ci self._data_chunk.seek(pos, 0) 3597db96d56Sopenharmony_ci self._data_seek_needed = 0 3607db96d56Sopenharmony_ci if nframes == 0: 3617db96d56Sopenharmony_ci return b'' 3627db96d56Sopenharmony_ci data = self._data_chunk.read(nframes * self._framesize) 3637db96d56Sopenharmony_ci if self._sampwidth != 1 and sys.byteorder == 'big': 3647db96d56Sopenharmony_ci data = _byteswap(data, self._sampwidth) 3657db96d56Sopenharmony_ci if self._convert and data: 3667db96d56Sopenharmony_ci data = self._convert(data) 3677db96d56Sopenharmony_ci self._soundpos = self._soundpos + len(data) // (self._nchannels * self._sampwidth) 3687db96d56Sopenharmony_ci return data 3697db96d56Sopenharmony_ci 3707db96d56Sopenharmony_ci # 3717db96d56Sopenharmony_ci # Internal methods. 3727db96d56Sopenharmony_ci # 3737db96d56Sopenharmony_ci 3747db96d56Sopenharmony_ci def _read_fmt_chunk(self, chunk): 3757db96d56Sopenharmony_ci try: 3767db96d56Sopenharmony_ci wFormatTag, self._nchannels, self._framerate, dwAvgBytesPerSec, wBlockAlign = struct.unpack_from('<HHLLH', chunk.read(14)) 3777db96d56Sopenharmony_ci except struct.error: 3787db96d56Sopenharmony_ci raise EOFError from None 3797db96d56Sopenharmony_ci if wFormatTag == WAVE_FORMAT_PCM: 3807db96d56Sopenharmony_ci try: 3817db96d56Sopenharmony_ci sampwidth = struct.unpack_from('<H', chunk.read(2))[0] 3827db96d56Sopenharmony_ci except struct.error: 3837db96d56Sopenharmony_ci raise EOFError from None 3847db96d56Sopenharmony_ci self._sampwidth = (sampwidth + 7) // 8 3857db96d56Sopenharmony_ci if not self._sampwidth: 3867db96d56Sopenharmony_ci raise Error('bad sample width') 3877db96d56Sopenharmony_ci else: 3887db96d56Sopenharmony_ci raise Error('unknown format: %r' % (wFormatTag,)) 3897db96d56Sopenharmony_ci if not self._nchannels: 3907db96d56Sopenharmony_ci raise Error('bad # of channels') 3917db96d56Sopenharmony_ci self._framesize = self._nchannels * self._sampwidth 3927db96d56Sopenharmony_ci self._comptype = 'NONE' 3937db96d56Sopenharmony_ci self._compname = 'not compressed' 3947db96d56Sopenharmony_ci 3957db96d56Sopenharmony_ci 3967db96d56Sopenharmony_ciclass Wave_write: 3977db96d56Sopenharmony_ci """Variables used in this class: 3987db96d56Sopenharmony_ci 3997db96d56Sopenharmony_ci These variables are user settable through appropriate methods 4007db96d56Sopenharmony_ci of this class: 4017db96d56Sopenharmony_ci _file -- the open file with methods write(), close(), tell(), seek() 4027db96d56Sopenharmony_ci set through the __init__() method 4037db96d56Sopenharmony_ci _comptype -- the AIFF-C compression type ('NONE' in AIFF) 4047db96d56Sopenharmony_ci set through the setcomptype() or setparams() method 4057db96d56Sopenharmony_ci _compname -- the human-readable AIFF-C compression type 4067db96d56Sopenharmony_ci set through the setcomptype() or setparams() method 4077db96d56Sopenharmony_ci _nchannels -- the number of audio channels 4087db96d56Sopenharmony_ci set through the setnchannels() or setparams() method 4097db96d56Sopenharmony_ci _sampwidth -- the number of bytes per audio sample 4107db96d56Sopenharmony_ci set through the setsampwidth() or setparams() method 4117db96d56Sopenharmony_ci _framerate -- the sampling frequency 4127db96d56Sopenharmony_ci set through the setframerate() or setparams() method 4137db96d56Sopenharmony_ci _nframes -- the number of audio frames written to the header 4147db96d56Sopenharmony_ci set through the setnframes() or setparams() method 4157db96d56Sopenharmony_ci 4167db96d56Sopenharmony_ci These variables are used internally only: 4177db96d56Sopenharmony_ci _datalength -- the size of the audio samples written to the header 4187db96d56Sopenharmony_ci _nframeswritten -- the number of frames actually written 4197db96d56Sopenharmony_ci _datawritten -- the size of the audio samples actually written 4207db96d56Sopenharmony_ci """ 4217db96d56Sopenharmony_ci 4227db96d56Sopenharmony_ci def __init__(self, f): 4237db96d56Sopenharmony_ci self._i_opened_the_file = None 4247db96d56Sopenharmony_ci if isinstance(f, str): 4257db96d56Sopenharmony_ci f = builtins.open(f, 'wb') 4267db96d56Sopenharmony_ci self._i_opened_the_file = f 4277db96d56Sopenharmony_ci try: 4287db96d56Sopenharmony_ci self.initfp(f) 4297db96d56Sopenharmony_ci except: 4307db96d56Sopenharmony_ci if self._i_opened_the_file: 4317db96d56Sopenharmony_ci f.close() 4327db96d56Sopenharmony_ci raise 4337db96d56Sopenharmony_ci 4347db96d56Sopenharmony_ci def initfp(self, file): 4357db96d56Sopenharmony_ci self._file = file 4367db96d56Sopenharmony_ci self._convert = None 4377db96d56Sopenharmony_ci self._nchannels = 0 4387db96d56Sopenharmony_ci self._sampwidth = 0 4397db96d56Sopenharmony_ci self._framerate = 0 4407db96d56Sopenharmony_ci self._nframes = 0 4417db96d56Sopenharmony_ci self._nframeswritten = 0 4427db96d56Sopenharmony_ci self._datawritten = 0 4437db96d56Sopenharmony_ci self._datalength = 0 4447db96d56Sopenharmony_ci self._headerwritten = False 4457db96d56Sopenharmony_ci 4467db96d56Sopenharmony_ci def __del__(self): 4477db96d56Sopenharmony_ci self.close() 4487db96d56Sopenharmony_ci 4497db96d56Sopenharmony_ci def __enter__(self): 4507db96d56Sopenharmony_ci return self 4517db96d56Sopenharmony_ci 4527db96d56Sopenharmony_ci def __exit__(self, *args): 4537db96d56Sopenharmony_ci self.close() 4547db96d56Sopenharmony_ci 4557db96d56Sopenharmony_ci # 4567db96d56Sopenharmony_ci # User visible methods. 4577db96d56Sopenharmony_ci # 4587db96d56Sopenharmony_ci def setnchannels(self, nchannels): 4597db96d56Sopenharmony_ci if self._datawritten: 4607db96d56Sopenharmony_ci raise Error('cannot change parameters after starting to write') 4617db96d56Sopenharmony_ci if nchannels < 1: 4627db96d56Sopenharmony_ci raise Error('bad # of channels') 4637db96d56Sopenharmony_ci self._nchannels = nchannels 4647db96d56Sopenharmony_ci 4657db96d56Sopenharmony_ci def getnchannels(self): 4667db96d56Sopenharmony_ci if not self._nchannels: 4677db96d56Sopenharmony_ci raise Error('number of channels not set') 4687db96d56Sopenharmony_ci return self._nchannels 4697db96d56Sopenharmony_ci 4707db96d56Sopenharmony_ci def setsampwidth(self, sampwidth): 4717db96d56Sopenharmony_ci if self._datawritten: 4727db96d56Sopenharmony_ci raise Error('cannot change parameters after starting to write') 4737db96d56Sopenharmony_ci if sampwidth < 1 or sampwidth > 4: 4747db96d56Sopenharmony_ci raise Error('bad sample width') 4757db96d56Sopenharmony_ci self._sampwidth = sampwidth 4767db96d56Sopenharmony_ci 4777db96d56Sopenharmony_ci def getsampwidth(self): 4787db96d56Sopenharmony_ci if not self._sampwidth: 4797db96d56Sopenharmony_ci raise Error('sample width not set') 4807db96d56Sopenharmony_ci return self._sampwidth 4817db96d56Sopenharmony_ci 4827db96d56Sopenharmony_ci def setframerate(self, framerate): 4837db96d56Sopenharmony_ci if self._datawritten: 4847db96d56Sopenharmony_ci raise Error('cannot change parameters after starting to write') 4857db96d56Sopenharmony_ci if framerate <= 0: 4867db96d56Sopenharmony_ci raise Error('bad frame rate') 4877db96d56Sopenharmony_ci self._framerate = int(round(framerate)) 4887db96d56Sopenharmony_ci 4897db96d56Sopenharmony_ci def getframerate(self): 4907db96d56Sopenharmony_ci if not self._framerate: 4917db96d56Sopenharmony_ci raise Error('frame rate not set') 4927db96d56Sopenharmony_ci return self._framerate 4937db96d56Sopenharmony_ci 4947db96d56Sopenharmony_ci def setnframes(self, nframes): 4957db96d56Sopenharmony_ci if self._datawritten: 4967db96d56Sopenharmony_ci raise Error('cannot change parameters after starting to write') 4977db96d56Sopenharmony_ci self._nframes = nframes 4987db96d56Sopenharmony_ci 4997db96d56Sopenharmony_ci def getnframes(self): 5007db96d56Sopenharmony_ci return self._nframeswritten 5017db96d56Sopenharmony_ci 5027db96d56Sopenharmony_ci def setcomptype(self, comptype, compname): 5037db96d56Sopenharmony_ci if self._datawritten: 5047db96d56Sopenharmony_ci raise Error('cannot change parameters after starting to write') 5057db96d56Sopenharmony_ci if comptype not in ('NONE',): 5067db96d56Sopenharmony_ci raise Error('unsupported compression type') 5077db96d56Sopenharmony_ci self._comptype = comptype 5087db96d56Sopenharmony_ci self._compname = compname 5097db96d56Sopenharmony_ci 5107db96d56Sopenharmony_ci def getcomptype(self): 5117db96d56Sopenharmony_ci return self._comptype 5127db96d56Sopenharmony_ci 5137db96d56Sopenharmony_ci def getcompname(self): 5147db96d56Sopenharmony_ci return self._compname 5157db96d56Sopenharmony_ci 5167db96d56Sopenharmony_ci def setparams(self, params): 5177db96d56Sopenharmony_ci nchannels, sampwidth, framerate, nframes, comptype, compname = params 5187db96d56Sopenharmony_ci if self._datawritten: 5197db96d56Sopenharmony_ci raise Error('cannot change parameters after starting to write') 5207db96d56Sopenharmony_ci self.setnchannels(nchannels) 5217db96d56Sopenharmony_ci self.setsampwidth(sampwidth) 5227db96d56Sopenharmony_ci self.setframerate(framerate) 5237db96d56Sopenharmony_ci self.setnframes(nframes) 5247db96d56Sopenharmony_ci self.setcomptype(comptype, compname) 5257db96d56Sopenharmony_ci 5267db96d56Sopenharmony_ci def getparams(self): 5277db96d56Sopenharmony_ci if not self._nchannels or not self._sampwidth or not self._framerate: 5287db96d56Sopenharmony_ci raise Error('not all parameters set') 5297db96d56Sopenharmony_ci return _wave_params(self._nchannels, self._sampwidth, self._framerate, 5307db96d56Sopenharmony_ci self._nframes, self._comptype, self._compname) 5317db96d56Sopenharmony_ci 5327db96d56Sopenharmony_ci def setmark(self, id, pos, name): 5337db96d56Sopenharmony_ci raise Error('setmark() not supported') 5347db96d56Sopenharmony_ci 5357db96d56Sopenharmony_ci def getmark(self, id): 5367db96d56Sopenharmony_ci raise Error('no marks') 5377db96d56Sopenharmony_ci 5387db96d56Sopenharmony_ci def getmarkers(self): 5397db96d56Sopenharmony_ci return None 5407db96d56Sopenharmony_ci 5417db96d56Sopenharmony_ci def tell(self): 5427db96d56Sopenharmony_ci return self._nframeswritten 5437db96d56Sopenharmony_ci 5447db96d56Sopenharmony_ci def writeframesraw(self, data): 5457db96d56Sopenharmony_ci if not isinstance(data, (bytes, bytearray)): 5467db96d56Sopenharmony_ci data = memoryview(data).cast('B') 5477db96d56Sopenharmony_ci self._ensure_header_written(len(data)) 5487db96d56Sopenharmony_ci nframes = len(data) // (self._sampwidth * self._nchannels) 5497db96d56Sopenharmony_ci if self._convert: 5507db96d56Sopenharmony_ci data = self._convert(data) 5517db96d56Sopenharmony_ci if self._sampwidth != 1 and sys.byteorder == 'big': 5527db96d56Sopenharmony_ci data = _byteswap(data, self._sampwidth) 5537db96d56Sopenharmony_ci self._file.write(data) 5547db96d56Sopenharmony_ci self._datawritten += len(data) 5557db96d56Sopenharmony_ci self._nframeswritten = self._nframeswritten + nframes 5567db96d56Sopenharmony_ci 5577db96d56Sopenharmony_ci def writeframes(self, data): 5587db96d56Sopenharmony_ci self.writeframesraw(data) 5597db96d56Sopenharmony_ci if self._datalength != self._datawritten: 5607db96d56Sopenharmony_ci self._patchheader() 5617db96d56Sopenharmony_ci 5627db96d56Sopenharmony_ci def close(self): 5637db96d56Sopenharmony_ci try: 5647db96d56Sopenharmony_ci if self._file: 5657db96d56Sopenharmony_ci self._ensure_header_written(0) 5667db96d56Sopenharmony_ci if self._datalength != self._datawritten: 5677db96d56Sopenharmony_ci self._patchheader() 5687db96d56Sopenharmony_ci self._file.flush() 5697db96d56Sopenharmony_ci finally: 5707db96d56Sopenharmony_ci self._file = None 5717db96d56Sopenharmony_ci file = self._i_opened_the_file 5727db96d56Sopenharmony_ci if file: 5737db96d56Sopenharmony_ci self._i_opened_the_file = None 5747db96d56Sopenharmony_ci file.close() 5757db96d56Sopenharmony_ci 5767db96d56Sopenharmony_ci # 5777db96d56Sopenharmony_ci # Internal methods. 5787db96d56Sopenharmony_ci # 5797db96d56Sopenharmony_ci 5807db96d56Sopenharmony_ci def _ensure_header_written(self, datasize): 5817db96d56Sopenharmony_ci if not self._headerwritten: 5827db96d56Sopenharmony_ci if not self._nchannels: 5837db96d56Sopenharmony_ci raise Error('# channels not specified') 5847db96d56Sopenharmony_ci if not self._sampwidth: 5857db96d56Sopenharmony_ci raise Error('sample width not specified') 5867db96d56Sopenharmony_ci if not self._framerate: 5877db96d56Sopenharmony_ci raise Error('sampling rate not specified') 5887db96d56Sopenharmony_ci self._write_header(datasize) 5897db96d56Sopenharmony_ci 5907db96d56Sopenharmony_ci def _write_header(self, initlength): 5917db96d56Sopenharmony_ci assert not self._headerwritten 5927db96d56Sopenharmony_ci self._file.write(b'RIFF') 5937db96d56Sopenharmony_ci if not self._nframes: 5947db96d56Sopenharmony_ci self._nframes = initlength // (self._nchannels * self._sampwidth) 5957db96d56Sopenharmony_ci self._datalength = self._nframes * self._nchannels * self._sampwidth 5967db96d56Sopenharmony_ci try: 5977db96d56Sopenharmony_ci self._form_length_pos = self._file.tell() 5987db96d56Sopenharmony_ci except (AttributeError, OSError): 5997db96d56Sopenharmony_ci self._form_length_pos = None 6007db96d56Sopenharmony_ci self._file.write(struct.pack('<L4s4sLHHLLHH4s', 6017db96d56Sopenharmony_ci 36 + self._datalength, b'WAVE', b'fmt ', 16, 6027db96d56Sopenharmony_ci WAVE_FORMAT_PCM, self._nchannels, self._framerate, 6037db96d56Sopenharmony_ci self._nchannels * self._framerate * self._sampwidth, 6047db96d56Sopenharmony_ci self._nchannels * self._sampwidth, 6057db96d56Sopenharmony_ci self._sampwidth * 8, b'data')) 6067db96d56Sopenharmony_ci if self._form_length_pos is not None: 6077db96d56Sopenharmony_ci self._data_length_pos = self._file.tell() 6087db96d56Sopenharmony_ci self._file.write(struct.pack('<L', self._datalength)) 6097db96d56Sopenharmony_ci self._headerwritten = True 6107db96d56Sopenharmony_ci 6117db96d56Sopenharmony_ci def _patchheader(self): 6127db96d56Sopenharmony_ci assert self._headerwritten 6137db96d56Sopenharmony_ci if self._datawritten == self._datalength: 6147db96d56Sopenharmony_ci return 6157db96d56Sopenharmony_ci curpos = self._file.tell() 6167db96d56Sopenharmony_ci self._file.seek(self._form_length_pos, 0) 6177db96d56Sopenharmony_ci self._file.write(struct.pack('<L', 36 + self._datawritten)) 6187db96d56Sopenharmony_ci self._file.seek(self._data_length_pos, 0) 6197db96d56Sopenharmony_ci self._file.write(struct.pack('<L', self._datawritten)) 6207db96d56Sopenharmony_ci self._file.seek(curpos, 0) 6217db96d56Sopenharmony_ci self._datalength = self._datawritten 6227db96d56Sopenharmony_ci 6237db96d56Sopenharmony_ci 6247db96d56Sopenharmony_cidef open(f, mode=None): 6257db96d56Sopenharmony_ci if mode is None: 6267db96d56Sopenharmony_ci if hasattr(f, 'mode'): 6277db96d56Sopenharmony_ci mode = f.mode 6287db96d56Sopenharmony_ci else: 6297db96d56Sopenharmony_ci mode = 'rb' 6307db96d56Sopenharmony_ci if mode in ('r', 'rb'): 6317db96d56Sopenharmony_ci return Wave_read(f) 6327db96d56Sopenharmony_ci elif mode in ('w', 'wb'): 6337db96d56Sopenharmony_ci return Wave_write(f) 6347db96d56Sopenharmony_ci else: 6357db96d56Sopenharmony_ci raise Error("mode must be 'r', 'rb', 'w', or 'wb'") 636