17db96d56Sopenharmony_ci"""RPC Implementation, originally written for the Python Idle IDE 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciFor security reasons, GvR requested that Idle's Python execution server process 47db96d56Sopenharmony_ciconnect to the Idle process, which listens for the connection. Since Idle has 57db96d56Sopenharmony_cionly one client per server, this was not a limitation. 67db96d56Sopenharmony_ci 77db96d56Sopenharmony_ci +---------------------------------+ +-------------+ 87db96d56Sopenharmony_ci | socketserver.BaseRequestHandler | | SocketIO | 97db96d56Sopenharmony_ci +---------------------------------+ +-------------+ 107db96d56Sopenharmony_ci ^ | register() | 117db96d56Sopenharmony_ci | | unregister()| 127db96d56Sopenharmony_ci | +-------------+ 137db96d56Sopenharmony_ci | ^ ^ 147db96d56Sopenharmony_ci | | | 157db96d56Sopenharmony_ci | + -------------------+ | 167db96d56Sopenharmony_ci | | | 177db96d56Sopenharmony_ci +-------------------------+ +-----------------+ 187db96d56Sopenharmony_ci | RPCHandler | | RPCClient | 197db96d56Sopenharmony_ci | [attribute of RPCServer]| | | 207db96d56Sopenharmony_ci +-------------------------+ +-----------------+ 217db96d56Sopenharmony_ci 227db96d56Sopenharmony_ciThe RPCServer handler class is expected to provide register/unregister methods. 237db96d56Sopenharmony_ciRPCHandler inherits the mix-in class SocketIO, which provides these methods. 247db96d56Sopenharmony_ci 257db96d56Sopenharmony_ciSee the Idle run.main() docstring for further information on how this was 267db96d56Sopenharmony_ciaccomplished in Idle. 277db96d56Sopenharmony_ci 287db96d56Sopenharmony_ci""" 297db96d56Sopenharmony_ciimport builtins 307db96d56Sopenharmony_ciimport copyreg 317db96d56Sopenharmony_ciimport io 327db96d56Sopenharmony_ciimport marshal 337db96d56Sopenharmony_ciimport os 347db96d56Sopenharmony_ciimport pickle 357db96d56Sopenharmony_ciimport queue 367db96d56Sopenharmony_ciimport select 377db96d56Sopenharmony_ciimport socket 387db96d56Sopenharmony_ciimport socketserver 397db96d56Sopenharmony_ciimport struct 407db96d56Sopenharmony_ciimport sys 417db96d56Sopenharmony_ciimport threading 427db96d56Sopenharmony_ciimport traceback 437db96d56Sopenharmony_ciimport types 447db96d56Sopenharmony_ci 457db96d56Sopenharmony_cidef unpickle_code(ms): 467db96d56Sopenharmony_ci "Return code object from marshal string ms." 477db96d56Sopenharmony_ci co = marshal.loads(ms) 487db96d56Sopenharmony_ci assert isinstance(co, types.CodeType) 497db96d56Sopenharmony_ci return co 507db96d56Sopenharmony_ci 517db96d56Sopenharmony_cidef pickle_code(co): 527db96d56Sopenharmony_ci "Return unpickle function and tuple with marshalled co code object." 537db96d56Sopenharmony_ci assert isinstance(co, types.CodeType) 547db96d56Sopenharmony_ci ms = marshal.dumps(co) 557db96d56Sopenharmony_ci return unpickle_code, (ms,) 567db96d56Sopenharmony_ci 577db96d56Sopenharmony_cidef dumps(obj, protocol=None): 587db96d56Sopenharmony_ci "Return pickled (or marshalled) string for obj." 597db96d56Sopenharmony_ci # IDLE passes 'None' to select pickle.DEFAULT_PROTOCOL. 607db96d56Sopenharmony_ci f = io.BytesIO() 617db96d56Sopenharmony_ci p = CodePickler(f, protocol) 627db96d56Sopenharmony_ci p.dump(obj) 637db96d56Sopenharmony_ci return f.getvalue() 647db96d56Sopenharmony_ci 657db96d56Sopenharmony_ci 667db96d56Sopenharmony_ciclass CodePickler(pickle.Pickler): 677db96d56Sopenharmony_ci dispatch_table = {types.CodeType: pickle_code, **copyreg.dispatch_table} 687db96d56Sopenharmony_ci 697db96d56Sopenharmony_ci 707db96d56Sopenharmony_ciBUFSIZE = 8*1024 717db96d56Sopenharmony_ciLOCALHOST = '127.0.0.1' 727db96d56Sopenharmony_ci 737db96d56Sopenharmony_ciclass RPCServer(socketserver.TCPServer): 747db96d56Sopenharmony_ci 757db96d56Sopenharmony_ci def __init__(self, addr, handlerclass=None): 767db96d56Sopenharmony_ci if handlerclass is None: 777db96d56Sopenharmony_ci handlerclass = RPCHandler 787db96d56Sopenharmony_ci socketserver.TCPServer.__init__(self, addr, handlerclass) 797db96d56Sopenharmony_ci 807db96d56Sopenharmony_ci def server_bind(self): 817db96d56Sopenharmony_ci "Override TCPServer method, no bind() phase for connecting entity" 827db96d56Sopenharmony_ci pass 837db96d56Sopenharmony_ci 847db96d56Sopenharmony_ci def server_activate(self): 857db96d56Sopenharmony_ci """Override TCPServer method, connect() instead of listen() 867db96d56Sopenharmony_ci 877db96d56Sopenharmony_ci Due to the reversed connection, self.server_address is actually the 887db96d56Sopenharmony_ci address of the Idle Client to which we are connecting. 897db96d56Sopenharmony_ci 907db96d56Sopenharmony_ci """ 917db96d56Sopenharmony_ci self.socket.connect(self.server_address) 927db96d56Sopenharmony_ci 937db96d56Sopenharmony_ci def get_request(self): 947db96d56Sopenharmony_ci "Override TCPServer method, return already connected socket" 957db96d56Sopenharmony_ci return self.socket, self.server_address 967db96d56Sopenharmony_ci 977db96d56Sopenharmony_ci def handle_error(self, request, client_address): 987db96d56Sopenharmony_ci """Override TCPServer method 997db96d56Sopenharmony_ci 1007db96d56Sopenharmony_ci Error message goes to __stderr__. No error message if exiting 1017db96d56Sopenharmony_ci normally or socket raised EOF. Other exceptions not handled in 1027db96d56Sopenharmony_ci server code will cause os._exit. 1037db96d56Sopenharmony_ci 1047db96d56Sopenharmony_ci """ 1057db96d56Sopenharmony_ci try: 1067db96d56Sopenharmony_ci raise 1077db96d56Sopenharmony_ci except SystemExit: 1087db96d56Sopenharmony_ci raise 1097db96d56Sopenharmony_ci except: 1107db96d56Sopenharmony_ci erf = sys.__stderr__ 1117db96d56Sopenharmony_ci print('\n' + '-'*40, file=erf) 1127db96d56Sopenharmony_ci print('Unhandled server exception!', file=erf) 1137db96d56Sopenharmony_ci print('Thread: %s' % threading.current_thread().name, file=erf) 1147db96d56Sopenharmony_ci print('Client Address: ', client_address, file=erf) 1157db96d56Sopenharmony_ci print('Request: ', repr(request), file=erf) 1167db96d56Sopenharmony_ci traceback.print_exc(file=erf) 1177db96d56Sopenharmony_ci print('\n*** Unrecoverable, server exiting!', file=erf) 1187db96d56Sopenharmony_ci print('-'*40, file=erf) 1197db96d56Sopenharmony_ci os._exit(0) 1207db96d56Sopenharmony_ci 1217db96d56Sopenharmony_ci#----------------- end class RPCServer -------------------- 1227db96d56Sopenharmony_ci 1237db96d56Sopenharmony_ciobjecttable = {} 1247db96d56Sopenharmony_cirequest_queue = queue.Queue(0) 1257db96d56Sopenharmony_ciresponse_queue = queue.Queue(0) 1267db96d56Sopenharmony_ci 1277db96d56Sopenharmony_ci 1287db96d56Sopenharmony_ciclass SocketIO: 1297db96d56Sopenharmony_ci 1307db96d56Sopenharmony_ci nextseq = 0 1317db96d56Sopenharmony_ci 1327db96d56Sopenharmony_ci def __init__(self, sock, objtable=None, debugging=None): 1337db96d56Sopenharmony_ci self.sockthread = threading.current_thread() 1347db96d56Sopenharmony_ci if debugging is not None: 1357db96d56Sopenharmony_ci self.debugging = debugging 1367db96d56Sopenharmony_ci self.sock = sock 1377db96d56Sopenharmony_ci if objtable is None: 1387db96d56Sopenharmony_ci objtable = objecttable 1397db96d56Sopenharmony_ci self.objtable = objtable 1407db96d56Sopenharmony_ci self.responses = {} 1417db96d56Sopenharmony_ci self.cvars = {} 1427db96d56Sopenharmony_ci 1437db96d56Sopenharmony_ci def close(self): 1447db96d56Sopenharmony_ci sock = self.sock 1457db96d56Sopenharmony_ci self.sock = None 1467db96d56Sopenharmony_ci if sock is not None: 1477db96d56Sopenharmony_ci sock.close() 1487db96d56Sopenharmony_ci 1497db96d56Sopenharmony_ci def exithook(self): 1507db96d56Sopenharmony_ci "override for specific exit action" 1517db96d56Sopenharmony_ci os._exit(0) 1527db96d56Sopenharmony_ci 1537db96d56Sopenharmony_ci def debug(self, *args): 1547db96d56Sopenharmony_ci if not self.debugging: 1557db96d56Sopenharmony_ci return 1567db96d56Sopenharmony_ci s = self.location + " " + str(threading.current_thread().name) 1577db96d56Sopenharmony_ci for a in args: 1587db96d56Sopenharmony_ci s = s + " " + str(a) 1597db96d56Sopenharmony_ci print(s, file=sys.__stderr__) 1607db96d56Sopenharmony_ci 1617db96d56Sopenharmony_ci def register(self, oid, object): 1627db96d56Sopenharmony_ci self.objtable[oid] = object 1637db96d56Sopenharmony_ci 1647db96d56Sopenharmony_ci def unregister(self, oid): 1657db96d56Sopenharmony_ci try: 1667db96d56Sopenharmony_ci del self.objtable[oid] 1677db96d56Sopenharmony_ci except KeyError: 1687db96d56Sopenharmony_ci pass 1697db96d56Sopenharmony_ci 1707db96d56Sopenharmony_ci def localcall(self, seq, request): 1717db96d56Sopenharmony_ci self.debug("localcall:", request) 1727db96d56Sopenharmony_ci try: 1737db96d56Sopenharmony_ci how, (oid, methodname, args, kwargs) = request 1747db96d56Sopenharmony_ci except TypeError: 1757db96d56Sopenharmony_ci return ("ERROR", "Bad request format") 1767db96d56Sopenharmony_ci if oid not in self.objtable: 1777db96d56Sopenharmony_ci return ("ERROR", f"Unknown object id: {oid!r}") 1787db96d56Sopenharmony_ci obj = self.objtable[oid] 1797db96d56Sopenharmony_ci if methodname == "__methods__": 1807db96d56Sopenharmony_ci methods = {} 1817db96d56Sopenharmony_ci _getmethods(obj, methods) 1827db96d56Sopenharmony_ci return ("OK", methods) 1837db96d56Sopenharmony_ci if methodname == "__attributes__": 1847db96d56Sopenharmony_ci attributes = {} 1857db96d56Sopenharmony_ci _getattributes(obj, attributes) 1867db96d56Sopenharmony_ci return ("OK", attributes) 1877db96d56Sopenharmony_ci if not hasattr(obj, methodname): 1887db96d56Sopenharmony_ci return ("ERROR", f"Unsupported method name: {methodname!r}") 1897db96d56Sopenharmony_ci method = getattr(obj, methodname) 1907db96d56Sopenharmony_ci try: 1917db96d56Sopenharmony_ci if how == 'CALL': 1927db96d56Sopenharmony_ci ret = method(*args, **kwargs) 1937db96d56Sopenharmony_ci if isinstance(ret, RemoteObject): 1947db96d56Sopenharmony_ci ret = remoteref(ret) 1957db96d56Sopenharmony_ci return ("OK", ret) 1967db96d56Sopenharmony_ci elif how == 'QUEUE': 1977db96d56Sopenharmony_ci request_queue.put((seq, (method, args, kwargs))) 1987db96d56Sopenharmony_ci return("QUEUED", None) 1997db96d56Sopenharmony_ci else: 2007db96d56Sopenharmony_ci return ("ERROR", "Unsupported message type: %s" % how) 2017db96d56Sopenharmony_ci except SystemExit: 2027db96d56Sopenharmony_ci raise 2037db96d56Sopenharmony_ci except KeyboardInterrupt: 2047db96d56Sopenharmony_ci raise 2057db96d56Sopenharmony_ci except OSError: 2067db96d56Sopenharmony_ci raise 2077db96d56Sopenharmony_ci except Exception as ex: 2087db96d56Sopenharmony_ci return ("CALLEXC", ex) 2097db96d56Sopenharmony_ci except: 2107db96d56Sopenharmony_ci msg = "*** Internal Error: rpc.py:SocketIO.localcall()\n\n"\ 2117db96d56Sopenharmony_ci " Object: %s \n Method: %s \n Args: %s\n" 2127db96d56Sopenharmony_ci print(msg % (oid, method, args), file=sys.__stderr__) 2137db96d56Sopenharmony_ci traceback.print_exc(file=sys.__stderr__) 2147db96d56Sopenharmony_ci return ("EXCEPTION", None) 2157db96d56Sopenharmony_ci 2167db96d56Sopenharmony_ci def remotecall(self, oid, methodname, args, kwargs): 2177db96d56Sopenharmony_ci self.debug("remotecall:asynccall: ", oid, methodname) 2187db96d56Sopenharmony_ci seq = self.asynccall(oid, methodname, args, kwargs) 2197db96d56Sopenharmony_ci return self.asyncreturn(seq) 2207db96d56Sopenharmony_ci 2217db96d56Sopenharmony_ci def remotequeue(self, oid, methodname, args, kwargs): 2227db96d56Sopenharmony_ci self.debug("remotequeue:asyncqueue: ", oid, methodname) 2237db96d56Sopenharmony_ci seq = self.asyncqueue(oid, methodname, args, kwargs) 2247db96d56Sopenharmony_ci return self.asyncreturn(seq) 2257db96d56Sopenharmony_ci 2267db96d56Sopenharmony_ci def asynccall(self, oid, methodname, args, kwargs): 2277db96d56Sopenharmony_ci request = ("CALL", (oid, methodname, args, kwargs)) 2287db96d56Sopenharmony_ci seq = self.newseq() 2297db96d56Sopenharmony_ci if threading.current_thread() != self.sockthread: 2307db96d56Sopenharmony_ci cvar = threading.Condition() 2317db96d56Sopenharmony_ci self.cvars[seq] = cvar 2327db96d56Sopenharmony_ci self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs) 2337db96d56Sopenharmony_ci self.putmessage((seq, request)) 2347db96d56Sopenharmony_ci return seq 2357db96d56Sopenharmony_ci 2367db96d56Sopenharmony_ci def asyncqueue(self, oid, methodname, args, kwargs): 2377db96d56Sopenharmony_ci request = ("QUEUE", (oid, methodname, args, kwargs)) 2387db96d56Sopenharmony_ci seq = self.newseq() 2397db96d56Sopenharmony_ci if threading.current_thread() != self.sockthread: 2407db96d56Sopenharmony_ci cvar = threading.Condition() 2417db96d56Sopenharmony_ci self.cvars[seq] = cvar 2427db96d56Sopenharmony_ci self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs) 2437db96d56Sopenharmony_ci self.putmessage((seq, request)) 2447db96d56Sopenharmony_ci return seq 2457db96d56Sopenharmony_ci 2467db96d56Sopenharmony_ci def asyncreturn(self, seq): 2477db96d56Sopenharmony_ci self.debug("asyncreturn:%d:call getresponse(): " % seq) 2487db96d56Sopenharmony_ci response = self.getresponse(seq, wait=0.05) 2497db96d56Sopenharmony_ci self.debug(("asyncreturn:%d:response: " % seq), response) 2507db96d56Sopenharmony_ci return self.decoderesponse(response) 2517db96d56Sopenharmony_ci 2527db96d56Sopenharmony_ci def decoderesponse(self, response): 2537db96d56Sopenharmony_ci how, what = response 2547db96d56Sopenharmony_ci if how == "OK": 2557db96d56Sopenharmony_ci return what 2567db96d56Sopenharmony_ci if how == "QUEUED": 2577db96d56Sopenharmony_ci return None 2587db96d56Sopenharmony_ci if how == "EXCEPTION": 2597db96d56Sopenharmony_ci self.debug("decoderesponse: EXCEPTION") 2607db96d56Sopenharmony_ci return None 2617db96d56Sopenharmony_ci if how == "EOF": 2627db96d56Sopenharmony_ci self.debug("decoderesponse: EOF") 2637db96d56Sopenharmony_ci self.decode_interrupthook() 2647db96d56Sopenharmony_ci return None 2657db96d56Sopenharmony_ci if how == "ERROR": 2667db96d56Sopenharmony_ci self.debug("decoderesponse: Internal ERROR:", what) 2677db96d56Sopenharmony_ci raise RuntimeError(what) 2687db96d56Sopenharmony_ci if how == "CALLEXC": 2697db96d56Sopenharmony_ci self.debug("decoderesponse: Call Exception:", what) 2707db96d56Sopenharmony_ci raise what 2717db96d56Sopenharmony_ci raise SystemError(how, what) 2727db96d56Sopenharmony_ci 2737db96d56Sopenharmony_ci def decode_interrupthook(self): 2747db96d56Sopenharmony_ci "" 2757db96d56Sopenharmony_ci raise EOFError 2767db96d56Sopenharmony_ci 2777db96d56Sopenharmony_ci def mainloop(self): 2787db96d56Sopenharmony_ci """Listen on socket until I/O not ready or EOF 2797db96d56Sopenharmony_ci 2807db96d56Sopenharmony_ci pollresponse() will loop looking for seq number None, which 2817db96d56Sopenharmony_ci never comes, and exit on EOFError. 2827db96d56Sopenharmony_ci 2837db96d56Sopenharmony_ci """ 2847db96d56Sopenharmony_ci try: 2857db96d56Sopenharmony_ci self.getresponse(myseq=None, wait=0.05) 2867db96d56Sopenharmony_ci except EOFError: 2877db96d56Sopenharmony_ci self.debug("mainloop:return") 2887db96d56Sopenharmony_ci return 2897db96d56Sopenharmony_ci 2907db96d56Sopenharmony_ci def getresponse(self, myseq, wait): 2917db96d56Sopenharmony_ci response = self._getresponse(myseq, wait) 2927db96d56Sopenharmony_ci if response is not None: 2937db96d56Sopenharmony_ci how, what = response 2947db96d56Sopenharmony_ci if how == "OK": 2957db96d56Sopenharmony_ci response = how, self._proxify(what) 2967db96d56Sopenharmony_ci return response 2977db96d56Sopenharmony_ci 2987db96d56Sopenharmony_ci def _proxify(self, obj): 2997db96d56Sopenharmony_ci if isinstance(obj, RemoteProxy): 3007db96d56Sopenharmony_ci return RPCProxy(self, obj.oid) 3017db96d56Sopenharmony_ci if isinstance(obj, list): 3027db96d56Sopenharmony_ci return list(map(self._proxify, obj)) 3037db96d56Sopenharmony_ci # XXX Check for other types -- not currently needed 3047db96d56Sopenharmony_ci return obj 3057db96d56Sopenharmony_ci 3067db96d56Sopenharmony_ci def _getresponse(self, myseq, wait): 3077db96d56Sopenharmony_ci self.debug("_getresponse:myseq:", myseq) 3087db96d56Sopenharmony_ci if threading.current_thread() is self.sockthread: 3097db96d56Sopenharmony_ci # this thread does all reading of requests or responses 3107db96d56Sopenharmony_ci while True: 3117db96d56Sopenharmony_ci response = self.pollresponse(myseq, wait) 3127db96d56Sopenharmony_ci if response is not None: 3137db96d56Sopenharmony_ci return response 3147db96d56Sopenharmony_ci else: 3157db96d56Sopenharmony_ci # wait for notification from socket handling thread 3167db96d56Sopenharmony_ci cvar = self.cvars[myseq] 3177db96d56Sopenharmony_ci cvar.acquire() 3187db96d56Sopenharmony_ci while myseq not in self.responses: 3197db96d56Sopenharmony_ci cvar.wait() 3207db96d56Sopenharmony_ci response = self.responses[myseq] 3217db96d56Sopenharmony_ci self.debug("_getresponse:%s: thread woke up: response: %s" % 3227db96d56Sopenharmony_ci (myseq, response)) 3237db96d56Sopenharmony_ci del self.responses[myseq] 3247db96d56Sopenharmony_ci del self.cvars[myseq] 3257db96d56Sopenharmony_ci cvar.release() 3267db96d56Sopenharmony_ci return response 3277db96d56Sopenharmony_ci 3287db96d56Sopenharmony_ci def newseq(self): 3297db96d56Sopenharmony_ci self.nextseq = seq = self.nextseq + 2 3307db96d56Sopenharmony_ci return seq 3317db96d56Sopenharmony_ci 3327db96d56Sopenharmony_ci def putmessage(self, message): 3337db96d56Sopenharmony_ci self.debug("putmessage:%d:" % message[0]) 3347db96d56Sopenharmony_ci try: 3357db96d56Sopenharmony_ci s = dumps(message) 3367db96d56Sopenharmony_ci except pickle.PicklingError: 3377db96d56Sopenharmony_ci print("Cannot pickle:", repr(message), file=sys.__stderr__) 3387db96d56Sopenharmony_ci raise 3397db96d56Sopenharmony_ci s = struct.pack("<i", len(s)) + s 3407db96d56Sopenharmony_ci while len(s) > 0: 3417db96d56Sopenharmony_ci try: 3427db96d56Sopenharmony_ci r, w, x = select.select([], [self.sock], []) 3437db96d56Sopenharmony_ci n = self.sock.send(s[:BUFSIZE]) 3447db96d56Sopenharmony_ci except (AttributeError, TypeError): 3457db96d56Sopenharmony_ci raise OSError("socket no longer exists") 3467db96d56Sopenharmony_ci s = s[n:] 3477db96d56Sopenharmony_ci 3487db96d56Sopenharmony_ci buff = b'' 3497db96d56Sopenharmony_ci bufneed = 4 3507db96d56Sopenharmony_ci bufstate = 0 # meaning: 0 => reading count; 1 => reading data 3517db96d56Sopenharmony_ci 3527db96d56Sopenharmony_ci def pollpacket(self, wait): 3537db96d56Sopenharmony_ci self._stage0() 3547db96d56Sopenharmony_ci if len(self.buff) < self.bufneed: 3557db96d56Sopenharmony_ci r, w, x = select.select([self.sock.fileno()], [], [], wait) 3567db96d56Sopenharmony_ci if len(r) == 0: 3577db96d56Sopenharmony_ci return None 3587db96d56Sopenharmony_ci try: 3597db96d56Sopenharmony_ci s = self.sock.recv(BUFSIZE) 3607db96d56Sopenharmony_ci except OSError: 3617db96d56Sopenharmony_ci raise EOFError 3627db96d56Sopenharmony_ci if len(s) == 0: 3637db96d56Sopenharmony_ci raise EOFError 3647db96d56Sopenharmony_ci self.buff += s 3657db96d56Sopenharmony_ci self._stage0() 3667db96d56Sopenharmony_ci return self._stage1() 3677db96d56Sopenharmony_ci 3687db96d56Sopenharmony_ci def _stage0(self): 3697db96d56Sopenharmony_ci if self.bufstate == 0 and len(self.buff) >= 4: 3707db96d56Sopenharmony_ci s = self.buff[:4] 3717db96d56Sopenharmony_ci self.buff = self.buff[4:] 3727db96d56Sopenharmony_ci self.bufneed = struct.unpack("<i", s)[0] 3737db96d56Sopenharmony_ci self.bufstate = 1 3747db96d56Sopenharmony_ci 3757db96d56Sopenharmony_ci def _stage1(self): 3767db96d56Sopenharmony_ci if self.bufstate == 1 and len(self.buff) >= self.bufneed: 3777db96d56Sopenharmony_ci packet = self.buff[:self.bufneed] 3787db96d56Sopenharmony_ci self.buff = self.buff[self.bufneed:] 3797db96d56Sopenharmony_ci self.bufneed = 4 3807db96d56Sopenharmony_ci self.bufstate = 0 3817db96d56Sopenharmony_ci return packet 3827db96d56Sopenharmony_ci 3837db96d56Sopenharmony_ci def pollmessage(self, wait): 3847db96d56Sopenharmony_ci packet = self.pollpacket(wait) 3857db96d56Sopenharmony_ci if packet is None: 3867db96d56Sopenharmony_ci return None 3877db96d56Sopenharmony_ci try: 3887db96d56Sopenharmony_ci message = pickle.loads(packet) 3897db96d56Sopenharmony_ci except pickle.UnpicklingError: 3907db96d56Sopenharmony_ci print("-----------------------", file=sys.__stderr__) 3917db96d56Sopenharmony_ci print("cannot unpickle packet:", repr(packet), file=sys.__stderr__) 3927db96d56Sopenharmony_ci traceback.print_stack(file=sys.__stderr__) 3937db96d56Sopenharmony_ci print("-----------------------", file=sys.__stderr__) 3947db96d56Sopenharmony_ci raise 3957db96d56Sopenharmony_ci return message 3967db96d56Sopenharmony_ci 3977db96d56Sopenharmony_ci def pollresponse(self, myseq, wait): 3987db96d56Sopenharmony_ci """Handle messages received on the socket. 3997db96d56Sopenharmony_ci 4007db96d56Sopenharmony_ci Some messages received may be asynchronous 'call' or 'queue' requests, 4017db96d56Sopenharmony_ci and some may be responses for other threads. 4027db96d56Sopenharmony_ci 4037db96d56Sopenharmony_ci 'call' requests are passed to self.localcall() with the expectation of 4047db96d56Sopenharmony_ci immediate execution, during which time the socket is not serviced. 4057db96d56Sopenharmony_ci 4067db96d56Sopenharmony_ci 'queue' requests are used for tasks (which may block or hang) to be 4077db96d56Sopenharmony_ci processed in a different thread. These requests are fed into 4087db96d56Sopenharmony_ci request_queue by self.localcall(). Responses to queued requests are 4097db96d56Sopenharmony_ci taken from response_queue and sent across the link with the associated 4107db96d56Sopenharmony_ci sequence numbers. Messages in the queues are (sequence_number, 4117db96d56Sopenharmony_ci request/response) tuples and code using this module removing messages 4127db96d56Sopenharmony_ci from the request_queue is responsible for returning the correct 4137db96d56Sopenharmony_ci sequence number in the response_queue. 4147db96d56Sopenharmony_ci 4157db96d56Sopenharmony_ci pollresponse() will loop until a response message with the myseq 4167db96d56Sopenharmony_ci sequence number is received, and will save other responses in 4177db96d56Sopenharmony_ci self.responses and notify the owning thread. 4187db96d56Sopenharmony_ci 4197db96d56Sopenharmony_ci """ 4207db96d56Sopenharmony_ci while True: 4217db96d56Sopenharmony_ci # send queued response if there is one available 4227db96d56Sopenharmony_ci try: 4237db96d56Sopenharmony_ci qmsg = response_queue.get(0) 4247db96d56Sopenharmony_ci except queue.Empty: 4257db96d56Sopenharmony_ci pass 4267db96d56Sopenharmony_ci else: 4277db96d56Sopenharmony_ci seq, response = qmsg 4287db96d56Sopenharmony_ci message = (seq, ('OK', response)) 4297db96d56Sopenharmony_ci self.putmessage(message) 4307db96d56Sopenharmony_ci # poll for message on link 4317db96d56Sopenharmony_ci try: 4327db96d56Sopenharmony_ci message = self.pollmessage(wait) 4337db96d56Sopenharmony_ci if message is None: # socket not ready 4347db96d56Sopenharmony_ci return None 4357db96d56Sopenharmony_ci except EOFError: 4367db96d56Sopenharmony_ci self.handle_EOF() 4377db96d56Sopenharmony_ci return None 4387db96d56Sopenharmony_ci except AttributeError: 4397db96d56Sopenharmony_ci return None 4407db96d56Sopenharmony_ci seq, resq = message 4417db96d56Sopenharmony_ci how = resq[0] 4427db96d56Sopenharmony_ci self.debug("pollresponse:%d:myseq:%s" % (seq, myseq)) 4437db96d56Sopenharmony_ci # process or queue a request 4447db96d56Sopenharmony_ci if how in ("CALL", "QUEUE"): 4457db96d56Sopenharmony_ci self.debug("pollresponse:%d:localcall:call:" % seq) 4467db96d56Sopenharmony_ci response = self.localcall(seq, resq) 4477db96d56Sopenharmony_ci self.debug("pollresponse:%d:localcall:response:%s" 4487db96d56Sopenharmony_ci % (seq, response)) 4497db96d56Sopenharmony_ci if how == "CALL": 4507db96d56Sopenharmony_ci self.putmessage((seq, response)) 4517db96d56Sopenharmony_ci elif how == "QUEUE": 4527db96d56Sopenharmony_ci # don't acknowledge the 'queue' request! 4537db96d56Sopenharmony_ci pass 4547db96d56Sopenharmony_ci continue 4557db96d56Sopenharmony_ci # return if completed message transaction 4567db96d56Sopenharmony_ci elif seq == myseq: 4577db96d56Sopenharmony_ci return resq 4587db96d56Sopenharmony_ci # must be a response for a different thread: 4597db96d56Sopenharmony_ci else: 4607db96d56Sopenharmony_ci cv = self.cvars.get(seq, None) 4617db96d56Sopenharmony_ci # response involving unknown sequence number is discarded, 4627db96d56Sopenharmony_ci # probably intended for prior incarnation of server 4637db96d56Sopenharmony_ci if cv is not None: 4647db96d56Sopenharmony_ci cv.acquire() 4657db96d56Sopenharmony_ci self.responses[seq] = resq 4667db96d56Sopenharmony_ci cv.notify() 4677db96d56Sopenharmony_ci cv.release() 4687db96d56Sopenharmony_ci continue 4697db96d56Sopenharmony_ci 4707db96d56Sopenharmony_ci def handle_EOF(self): 4717db96d56Sopenharmony_ci "action taken upon link being closed by peer" 4727db96d56Sopenharmony_ci self.EOFhook() 4737db96d56Sopenharmony_ci self.debug("handle_EOF") 4747db96d56Sopenharmony_ci for key in self.cvars: 4757db96d56Sopenharmony_ci cv = self.cvars[key] 4767db96d56Sopenharmony_ci cv.acquire() 4777db96d56Sopenharmony_ci self.responses[key] = ('EOF', None) 4787db96d56Sopenharmony_ci cv.notify() 4797db96d56Sopenharmony_ci cv.release() 4807db96d56Sopenharmony_ci # call our (possibly overridden) exit function 4817db96d56Sopenharmony_ci self.exithook() 4827db96d56Sopenharmony_ci 4837db96d56Sopenharmony_ci def EOFhook(self): 4847db96d56Sopenharmony_ci "Classes using rpc client/server can override to augment EOF action" 4857db96d56Sopenharmony_ci pass 4867db96d56Sopenharmony_ci 4877db96d56Sopenharmony_ci#----------------- end class SocketIO -------------------- 4887db96d56Sopenharmony_ci 4897db96d56Sopenharmony_ciclass RemoteObject: 4907db96d56Sopenharmony_ci # Token mix-in class 4917db96d56Sopenharmony_ci pass 4927db96d56Sopenharmony_ci 4937db96d56Sopenharmony_ci 4947db96d56Sopenharmony_cidef remoteref(obj): 4957db96d56Sopenharmony_ci oid = id(obj) 4967db96d56Sopenharmony_ci objecttable[oid] = obj 4977db96d56Sopenharmony_ci return RemoteProxy(oid) 4987db96d56Sopenharmony_ci 4997db96d56Sopenharmony_ci 5007db96d56Sopenharmony_ciclass RemoteProxy: 5017db96d56Sopenharmony_ci 5027db96d56Sopenharmony_ci def __init__(self, oid): 5037db96d56Sopenharmony_ci self.oid = oid 5047db96d56Sopenharmony_ci 5057db96d56Sopenharmony_ci 5067db96d56Sopenharmony_ciclass RPCHandler(socketserver.BaseRequestHandler, SocketIO): 5077db96d56Sopenharmony_ci 5087db96d56Sopenharmony_ci debugging = False 5097db96d56Sopenharmony_ci location = "#S" # Server 5107db96d56Sopenharmony_ci 5117db96d56Sopenharmony_ci def __init__(self, sock, addr, svr): 5127db96d56Sopenharmony_ci svr.current_handler = self ## cgt xxx 5137db96d56Sopenharmony_ci SocketIO.__init__(self, sock) 5147db96d56Sopenharmony_ci socketserver.BaseRequestHandler.__init__(self, sock, addr, svr) 5157db96d56Sopenharmony_ci 5167db96d56Sopenharmony_ci def handle(self): 5177db96d56Sopenharmony_ci "handle() method required by socketserver" 5187db96d56Sopenharmony_ci self.mainloop() 5197db96d56Sopenharmony_ci 5207db96d56Sopenharmony_ci def get_remote_proxy(self, oid): 5217db96d56Sopenharmony_ci return RPCProxy(self, oid) 5227db96d56Sopenharmony_ci 5237db96d56Sopenharmony_ci 5247db96d56Sopenharmony_ciclass RPCClient(SocketIO): 5257db96d56Sopenharmony_ci 5267db96d56Sopenharmony_ci debugging = False 5277db96d56Sopenharmony_ci location = "#C" # Client 5287db96d56Sopenharmony_ci 5297db96d56Sopenharmony_ci nextseq = 1 # Requests coming from the client are odd numbered 5307db96d56Sopenharmony_ci 5317db96d56Sopenharmony_ci def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM): 5327db96d56Sopenharmony_ci self.listening_sock = socket.socket(family, type) 5337db96d56Sopenharmony_ci self.listening_sock.bind(address) 5347db96d56Sopenharmony_ci self.listening_sock.listen(1) 5357db96d56Sopenharmony_ci 5367db96d56Sopenharmony_ci def accept(self): 5377db96d56Sopenharmony_ci working_sock, address = self.listening_sock.accept() 5387db96d56Sopenharmony_ci if self.debugging: 5397db96d56Sopenharmony_ci print("****** Connection request from ", address, file=sys.__stderr__) 5407db96d56Sopenharmony_ci if address[0] == LOCALHOST: 5417db96d56Sopenharmony_ci SocketIO.__init__(self, working_sock) 5427db96d56Sopenharmony_ci else: 5437db96d56Sopenharmony_ci print("** Invalid host: ", address, file=sys.__stderr__) 5447db96d56Sopenharmony_ci raise OSError 5457db96d56Sopenharmony_ci 5467db96d56Sopenharmony_ci def get_remote_proxy(self, oid): 5477db96d56Sopenharmony_ci return RPCProxy(self, oid) 5487db96d56Sopenharmony_ci 5497db96d56Sopenharmony_ci 5507db96d56Sopenharmony_ciclass RPCProxy: 5517db96d56Sopenharmony_ci 5527db96d56Sopenharmony_ci __methods = None 5537db96d56Sopenharmony_ci __attributes = None 5547db96d56Sopenharmony_ci 5557db96d56Sopenharmony_ci def __init__(self, sockio, oid): 5567db96d56Sopenharmony_ci self.sockio = sockio 5577db96d56Sopenharmony_ci self.oid = oid 5587db96d56Sopenharmony_ci 5597db96d56Sopenharmony_ci def __getattr__(self, name): 5607db96d56Sopenharmony_ci if self.__methods is None: 5617db96d56Sopenharmony_ci self.__getmethods() 5627db96d56Sopenharmony_ci if self.__methods.get(name): 5637db96d56Sopenharmony_ci return MethodProxy(self.sockio, self.oid, name) 5647db96d56Sopenharmony_ci if self.__attributes is None: 5657db96d56Sopenharmony_ci self.__getattributes() 5667db96d56Sopenharmony_ci if name in self.__attributes: 5677db96d56Sopenharmony_ci value = self.sockio.remotecall(self.oid, '__getattribute__', 5687db96d56Sopenharmony_ci (name,), {}) 5697db96d56Sopenharmony_ci return value 5707db96d56Sopenharmony_ci else: 5717db96d56Sopenharmony_ci raise AttributeError(name) 5727db96d56Sopenharmony_ci 5737db96d56Sopenharmony_ci def __getattributes(self): 5747db96d56Sopenharmony_ci self.__attributes = self.sockio.remotecall(self.oid, 5757db96d56Sopenharmony_ci "__attributes__", (), {}) 5767db96d56Sopenharmony_ci 5777db96d56Sopenharmony_ci def __getmethods(self): 5787db96d56Sopenharmony_ci self.__methods = self.sockio.remotecall(self.oid, 5797db96d56Sopenharmony_ci "__methods__", (), {}) 5807db96d56Sopenharmony_ci 5817db96d56Sopenharmony_cidef _getmethods(obj, methods): 5827db96d56Sopenharmony_ci # Helper to get a list of methods from an object 5837db96d56Sopenharmony_ci # Adds names to dictionary argument 'methods' 5847db96d56Sopenharmony_ci for name in dir(obj): 5857db96d56Sopenharmony_ci attr = getattr(obj, name) 5867db96d56Sopenharmony_ci if callable(attr): 5877db96d56Sopenharmony_ci methods[name] = 1 5887db96d56Sopenharmony_ci if isinstance(obj, type): 5897db96d56Sopenharmony_ci for super in obj.__bases__: 5907db96d56Sopenharmony_ci _getmethods(super, methods) 5917db96d56Sopenharmony_ci 5927db96d56Sopenharmony_cidef _getattributes(obj, attributes): 5937db96d56Sopenharmony_ci for name in dir(obj): 5947db96d56Sopenharmony_ci attr = getattr(obj, name) 5957db96d56Sopenharmony_ci if not callable(attr): 5967db96d56Sopenharmony_ci attributes[name] = 1 5977db96d56Sopenharmony_ci 5987db96d56Sopenharmony_ci 5997db96d56Sopenharmony_ciclass MethodProxy: 6007db96d56Sopenharmony_ci 6017db96d56Sopenharmony_ci def __init__(self, sockio, oid, name): 6027db96d56Sopenharmony_ci self.sockio = sockio 6037db96d56Sopenharmony_ci self.oid = oid 6047db96d56Sopenharmony_ci self.name = name 6057db96d56Sopenharmony_ci 6067db96d56Sopenharmony_ci def __call__(self, /, *args, **kwargs): 6077db96d56Sopenharmony_ci value = self.sockio.remotecall(self.oid, self.name, args, kwargs) 6087db96d56Sopenharmony_ci return value 6097db96d56Sopenharmony_ci 6107db96d56Sopenharmony_ci 6117db96d56Sopenharmony_ci# XXX KBK 09Sep03 We need a proper unit test for this module. Previously 6127db96d56Sopenharmony_ci# existing test code was removed at Rev 1.27 (r34098). 6137db96d56Sopenharmony_ci 6147db96d56Sopenharmony_cidef displayhook(value): 6157db96d56Sopenharmony_ci """Override standard display hook to use non-locale encoding""" 6167db96d56Sopenharmony_ci if value is None: 6177db96d56Sopenharmony_ci return 6187db96d56Sopenharmony_ci # Set '_' to None to avoid recursion 6197db96d56Sopenharmony_ci builtins._ = None 6207db96d56Sopenharmony_ci text = repr(value) 6217db96d56Sopenharmony_ci try: 6227db96d56Sopenharmony_ci sys.stdout.write(text) 6237db96d56Sopenharmony_ci except UnicodeEncodeError: 6247db96d56Sopenharmony_ci # let's use ascii while utf8-bmp codec doesn't present 6257db96d56Sopenharmony_ci encoding = 'ascii' 6267db96d56Sopenharmony_ci bytes = text.encode(encoding, 'backslashreplace') 6277db96d56Sopenharmony_ci text = bytes.decode(encoding, 'strict') 6287db96d56Sopenharmony_ci sys.stdout.write(text) 6297db96d56Sopenharmony_ci sys.stdout.write("\n") 6307db96d56Sopenharmony_ci builtins._ = value 6317db96d56Sopenharmony_ci 6327db96d56Sopenharmony_ci 6337db96d56Sopenharmony_ciif __name__ == '__main__': 6347db96d56Sopenharmony_ci from unittest import main 6357db96d56Sopenharmony_ci main('idlelib.idle_test.test_rpc', verbosity=2,) 636