1 2/* interpreters module */ 3/* low-level access to interpreter primitives */ 4#ifndef Py_BUILD_CORE_BUILTIN 5# define Py_BUILD_CORE_MODULE 1 6#endif 7 8#include "Python.h" 9#include "pycore_frame.h" 10#include "pycore_pystate.h" // _PyThreadState_GET() 11#include "pycore_interpreteridobject.h" 12 13 14static char * 15_copy_raw_string(PyObject *strobj) 16{ 17 const char *str = PyUnicode_AsUTF8(strobj); 18 if (str == NULL) { 19 return NULL; 20 } 21 char *copied = PyMem_Malloc(strlen(str)+1); 22 if (copied == NULL) { 23 PyErr_NoMemory(); 24 return NULL; 25 } 26 strcpy(copied, str); 27 return copied; 28} 29 30static PyInterpreterState * 31_get_current(void) 32{ 33 // PyInterpreterState_Get() aborts if lookup fails, so don't need 34 // to check the result for NULL. 35 return PyInterpreterState_Get(); 36} 37 38 39/* data-sharing-specific code ***********************************************/ 40 41struct _sharednsitem { 42 char *name; 43 _PyCrossInterpreterData data; 44}; 45 46static void _sharednsitem_clear(struct _sharednsitem *); // forward 47 48static int 49_sharednsitem_init(struct _sharednsitem *item, PyObject *key, PyObject *value) 50{ 51 item->name = _copy_raw_string(key); 52 if (item->name == NULL) { 53 return -1; 54 } 55 if (_PyObject_GetCrossInterpreterData(value, &item->data) != 0) { 56 _sharednsitem_clear(item); 57 return -1; 58 } 59 return 0; 60} 61 62static void 63_sharednsitem_clear(struct _sharednsitem *item) 64{ 65 if (item->name != NULL) { 66 PyMem_Free(item->name); 67 item->name = NULL; 68 } 69 _PyCrossInterpreterData_Release(&item->data); 70} 71 72static int 73_sharednsitem_apply(struct _sharednsitem *item, PyObject *ns) 74{ 75 PyObject *name = PyUnicode_FromString(item->name); 76 if (name == NULL) { 77 return -1; 78 } 79 PyObject *value = _PyCrossInterpreterData_NewObject(&item->data); 80 if (value == NULL) { 81 Py_DECREF(name); 82 return -1; 83 } 84 int res = PyDict_SetItem(ns, name, value); 85 Py_DECREF(name); 86 Py_DECREF(value); 87 return res; 88} 89 90typedef struct _sharedns { 91 Py_ssize_t len; 92 struct _sharednsitem* items; 93} _sharedns; 94 95static _sharedns * 96_sharedns_new(Py_ssize_t len) 97{ 98 _sharedns *shared = PyMem_NEW(_sharedns, 1); 99 if (shared == NULL) { 100 PyErr_NoMemory(); 101 return NULL; 102 } 103 shared->len = len; 104 shared->items = PyMem_NEW(struct _sharednsitem, len); 105 if (shared->items == NULL) { 106 PyErr_NoMemory(); 107 PyMem_Free(shared); 108 return NULL; 109 } 110 return shared; 111} 112 113static void 114_sharedns_free(_sharedns *shared) 115{ 116 for (Py_ssize_t i=0; i < shared->len; i++) { 117 _sharednsitem_clear(&shared->items[i]); 118 } 119 PyMem_Free(shared->items); 120 PyMem_Free(shared); 121} 122 123static _sharedns * 124_get_shared_ns(PyObject *shareable) 125{ 126 if (shareable == NULL || shareable == Py_None) { 127 return NULL; 128 } 129 Py_ssize_t len = PyDict_Size(shareable); 130 if (len == 0) { 131 return NULL; 132 } 133 134 _sharedns *shared = _sharedns_new(len); 135 if (shared == NULL) { 136 return NULL; 137 } 138 Py_ssize_t pos = 0; 139 for (Py_ssize_t i=0; i < len; i++) { 140 PyObject *key, *value; 141 if (PyDict_Next(shareable, &pos, &key, &value) == 0) { 142 break; 143 } 144 if (_sharednsitem_init(&shared->items[i], key, value) != 0) { 145 break; 146 } 147 } 148 if (PyErr_Occurred()) { 149 _sharedns_free(shared); 150 return NULL; 151 } 152 return shared; 153} 154 155static int 156_sharedns_apply(_sharedns *shared, PyObject *ns) 157{ 158 for (Py_ssize_t i=0; i < shared->len; i++) { 159 if (_sharednsitem_apply(&shared->items[i], ns) != 0) { 160 return -1; 161 } 162 } 163 return 0; 164} 165 166// Ultimately we'd like to preserve enough information about the 167// exception and traceback that we could re-constitute (or at least 168// simulate, a la traceback.TracebackException), and even chain, a copy 169// of the exception in the calling interpreter. 170 171typedef struct _sharedexception { 172 char *name; 173 char *msg; 174} _sharedexception; 175 176static _sharedexception * 177_sharedexception_new(void) 178{ 179 _sharedexception *err = PyMem_NEW(_sharedexception, 1); 180 if (err == NULL) { 181 PyErr_NoMemory(); 182 return NULL; 183 } 184 err->name = NULL; 185 err->msg = NULL; 186 return err; 187} 188 189static void 190_sharedexception_clear(_sharedexception *exc) 191{ 192 if (exc->name != NULL) { 193 PyMem_Free(exc->name); 194 } 195 if (exc->msg != NULL) { 196 PyMem_Free(exc->msg); 197 } 198} 199 200static void 201_sharedexception_free(_sharedexception *exc) 202{ 203 _sharedexception_clear(exc); 204 PyMem_Free(exc); 205} 206 207static _sharedexception * 208_sharedexception_bind(PyObject *exctype, PyObject *exc, PyObject *tb) 209{ 210 assert(exctype != NULL); 211 char *failure = NULL; 212 213 _sharedexception *err = _sharedexception_new(); 214 if (err == NULL) { 215 goto finally; 216 } 217 218 PyObject *name = PyUnicode_FromFormat("%S", exctype); 219 if (name == NULL) { 220 failure = "unable to format exception type name"; 221 goto finally; 222 } 223 err->name = _copy_raw_string(name); 224 Py_DECREF(name); 225 if (err->name == NULL) { 226 if (PyErr_ExceptionMatches(PyExc_MemoryError)) { 227 failure = "out of memory copying exception type name"; 228 } else { 229 failure = "unable to encode and copy exception type name"; 230 } 231 goto finally; 232 } 233 234 if (exc != NULL) { 235 PyObject *msg = PyUnicode_FromFormat("%S", exc); 236 if (msg == NULL) { 237 failure = "unable to format exception message"; 238 goto finally; 239 } 240 err->msg = _copy_raw_string(msg); 241 Py_DECREF(msg); 242 if (err->msg == NULL) { 243 if (PyErr_ExceptionMatches(PyExc_MemoryError)) { 244 failure = "out of memory copying exception message"; 245 } else { 246 failure = "unable to encode and copy exception message"; 247 } 248 goto finally; 249 } 250 } 251 252finally: 253 if (failure != NULL) { 254 PyErr_Clear(); 255 if (err->name != NULL) { 256 PyMem_Free(err->name); 257 err->name = NULL; 258 } 259 err->msg = failure; 260 } 261 return err; 262} 263 264static void 265_sharedexception_apply(_sharedexception *exc, PyObject *wrapperclass) 266{ 267 if (exc->name != NULL) { 268 if (exc->msg != NULL) { 269 PyErr_Format(wrapperclass, "%s: %s", exc->name, exc->msg); 270 } 271 else { 272 PyErr_SetString(wrapperclass, exc->name); 273 } 274 } 275 else if (exc->msg != NULL) { 276 PyErr_SetString(wrapperclass, exc->msg); 277 } 278 else { 279 PyErr_SetNone(wrapperclass); 280 } 281} 282 283 284/* channel-specific code ****************************************************/ 285 286#define CHANNEL_SEND 1 287#define CHANNEL_BOTH 0 288#define CHANNEL_RECV -1 289 290static PyObject *ChannelError; 291static PyObject *ChannelNotFoundError; 292static PyObject *ChannelClosedError; 293static PyObject *ChannelEmptyError; 294static PyObject *ChannelNotEmptyError; 295 296static int 297channel_exceptions_init(PyObject *ns) 298{ 299 // XXX Move the exceptions into per-module memory? 300 301 // A channel-related operation failed. 302 ChannelError = PyErr_NewException("_xxsubinterpreters.ChannelError", 303 PyExc_RuntimeError, NULL); 304 if (ChannelError == NULL) { 305 return -1; 306 } 307 if (PyDict_SetItemString(ns, "ChannelError", ChannelError) != 0) { 308 return -1; 309 } 310 311 // An operation tried to use a channel that doesn't exist. 312 ChannelNotFoundError = PyErr_NewException( 313 "_xxsubinterpreters.ChannelNotFoundError", ChannelError, NULL); 314 if (ChannelNotFoundError == NULL) { 315 return -1; 316 } 317 if (PyDict_SetItemString(ns, "ChannelNotFoundError", ChannelNotFoundError) != 0) { 318 return -1; 319 } 320 321 // An operation tried to use a closed channel. 322 ChannelClosedError = PyErr_NewException( 323 "_xxsubinterpreters.ChannelClosedError", ChannelError, NULL); 324 if (ChannelClosedError == NULL) { 325 return -1; 326 } 327 if (PyDict_SetItemString(ns, "ChannelClosedError", ChannelClosedError) != 0) { 328 return -1; 329 } 330 331 // An operation tried to pop from an empty channel. 332 ChannelEmptyError = PyErr_NewException( 333 "_xxsubinterpreters.ChannelEmptyError", ChannelError, NULL); 334 if (ChannelEmptyError == NULL) { 335 return -1; 336 } 337 if (PyDict_SetItemString(ns, "ChannelEmptyError", ChannelEmptyError) != 0) { 338 return -1; 339 } 340 341 // An operation tried to close a non-empty channel. 342 ChannelNotEmptyError = PyErr_NewException( 343 "_xxsubinterpreters.ChannelNotEmptyError", ChannelError, NULL); 344 if (ChannelNotEmptyError == NULL) { 345 return -1; 346 } 347 if (PyDict_SetItemString(ns, "ChannelNotEmptyError", ChannelNotEmptyError) != 0) { 348 return -1; 349 } 350 351 return 0; 352} 353 354/* the channel queue */ 355 356struct _channelitem; 357 358typedef struct _channelitem { 359 _PyCrossInterpreterData *data; 360 struct _channelitem *next; 361} _channelitem; 362 363static _channelitem * 364_channelitem_new(void) 365{ 366 _channelitem *item = PyMem_NEW(_channelitem, 1); 367 if (item == NULL) { 368 PyErr_NoMemory(); 369 return NULL; 370 } 371 item->data = NULL; 372 item->next = NULL; 373 return item; 374} 375 376static void 377_channelitem_clear(_channelitem *item) 378{ 379 if (item->data != NULL) { 380 _PyCrossInterpreterData_Release(item->data); 381 PyMem_Free(item->data); 382 item->data = NULL; 383 } 384 item->next = NULL; 385} 386 387static void 388_channelitem_free(_channelitem *item) 389{ 390 _channelitem_clear(item); 391 PyMem_Free(item); 392} 393 394static void 395_channelitem_free_all(_channelitem *item) 396{ 397 while (item != NULL) { 398 _channelitem *last = item; 399 item = item->next; 400 _channelitem_free(last); 401 } 402} 403 404static _PyCrossInterpreterData * 405_channelitem_popped(_channelitem *item) 406{ 407 _PyCrossInterpreterData *data = item->data; 408 item->data = NULL; 409 _channelitem_free(item); 410 return data; 411} 412 413typedef struct _channelqueue { 414 int64_t count; 415 _channelitem *first; 416 _channelitem *last; 417} _channelqueue; 418 419static _channelqueue * 420_channelqueue_new(void) 421{ 422 _channelqueue *queue = PyMem_NEW(_channelqueue, 1); 423 if (queue == NULL) { 424 PyErr_NoMemory(); 425 return NULL; 426 } 427 queue->count = 0; 428 queue->first = NULL; 429 queue->last = NULL; 430 return queue; 431} 432 433static void 434_channelqueue_clear(_channelqueue *queue) 435{ 436 _channelitem_free_all(queue->first); 437 queue->count = 0; 438 queue->first = NULL; 439 queue->last = NULL; 440} 441 442static void 443_channelqueue_free(_channelqueue *queue) 444{ 445 _channelqueue_clear(queue); 446 PyMem_Free(queue); 447} 448 449static int 450_channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data) 451{ 452 _channelitem *item = _channelitem_new(); 453 if (item == NULL) { 454 return -1; 455 } 456 item->data = data; 457 458 queue->count += 1; 459 if (queue->first == NULL) { 460 queue->first = item; 461 } 462 else { 463 queue->last->next = item; 464 } 465 queue->last = item; 466 return 0; 467} 468 469static _PyCrossInterpreterData * 470_channelqueue_get(_channelqueue *queue) 471{ 472 _channelitem *item = queue->first; 473 if (item == NULL) { 474 return NULL; 475 } 476 queue->first = item->next; 477 if (queue->last == item) { 478 queue->last = NULL; 479 } 480 queue->count -= 1; 481 482 return _channelitem_popped(item); 483} 484 485/* channel-interpreter associations */ 486 487struct _channelend; 488 489typedef struct _channelend { 490 struct _channelend *next; 491 int64_t interp; 492 int open; 493} _channelend; 494 495static _channelend * 496_channelend_new(int64_t interp) 497{ 498 _channelend *end = PyMem_NEW(_channelend, 1); 499 if (end == NULL) { 500 PyErr_NoMemory(); 501 return NULL; 502 } 503 end->next = NULL; 504 end->interp = interp; 505 end->open = 1; 506 return end; 507} 508 509static void 510_channelend_free(_channelend *end) 511{ 512 PyMem_Free(end); 513} 514 515static void 516_channelend_free_all(_channelend *end) 517{ 518 while (end != NULL) { 519 _channelend *last = end; 520 end = end->next; 521 _channelend_free(last); 522 } 523} 524 525static _channelend * 526_channelend_find(_channelend *first, int64_t interp, _channelend **pprev) 527{ 528 _channelend *prev = NULL; 529 _channelend *end = first; 530 while (end != NULL) { 531 if (end->interp == interp) { 532 break; 533 } 534 prev = end; 535 end = end->next; 536 } 537 if (pprev != NULL) { 538 *pprev = prev; 539 } 540 return end; 541} 542 543typedef struct _channelassociations { 544 // Note that the list entries are never removed for interpreter 545 // for which the channel is closed. This should not be a problem in 546 // practice. Also, a channel isn't automatically closed when an 547 // interpreter is destroyed. 548 int64_t numsendopen; 549 int64_t numrecvopen; 550 _channelend *send; 551 _channelend *recv; 552} _channelends; 553 554static _channelends * 555_channelends_new(void) 556{ 557 _channelends *ends = PyMem_NEW(_channelends, 1); 558 if (ends== NULL) { 559 return NULL; 560 } 561 ends->numsendopen = 0; 562 ends->numrecvopen = 0; 563 ends->send = NULL; 564 ends->recv = NULL; 565 return ends; 566} 567 568static void 569_channelends_clear(_channelends *ends) 570{ 571 _channelend_free_all(ends->send); 572 ends->send = NULL; 573 ends->numsendopen = 0; 574 575 _channelend_free_all(ends->recv); 576 ends->recv = NULL; 577 ends->numrecvopen = 0; 578} 579 580static void 581_channelends_free(_channelends *ends) 582{ 583 _channelends_clear(ends); 584 PyMem_Free(ends); 585} 586 587static _channelend * 588_channelends_add(_channelends *ends, _channelend *prev, int64_t interp, 589 int send) 590{ 591 _channelend *end = _channelend_new(interp); 592 if (end == NULL) { 593 return NULL; 594 } 595 596 if (prev == NULL) { 597 if (send) { 598 ends->send = end; 599 } 600 else { 601 ends->recv = end; 602 } 603 } 604 else { 605 prev->next = end; 606 } 607 if (send) { 608 ends->numsendopen += 1; 609 } 610 else { 611 ends->numrecvopen += 1; 612 } 613 return end; 614} 615 616static int 617_channelends_associate(_channelends *ends, int64_t interp, int send) 618{ 619 _channelend *prev; 620 _channelend *end = _channelend_find(send ? ends->send : ends->recv, 621 interp, &prev); 622 if (end != NULL) { 623 if (!end->open) { 624 PyErr_SetString(ChannelClosedError, "channel already closed"); 625 return -1; 626 } 627 // already associated 628 return 0; 629 } 630 if (_channelends_add(ends, prev, interp, send) == NULL) { 631 return -1; 632 } 633 return 0; 634} 635 636static int 637_channelends_is_open(_channelends *ends) 638{ 639 if (ends->numsendopen != 0 || ends->numrecvopen != 0) { 640 return 1; 641 } 642 if (ends->send == NULL && ends->recv == NULL) { 643 return 1; 644 } 645 return 0; 646} 647 648static void 649_channelends_close_end(_channelends *ends, _channelend *end, int send) 650{ 651 end->open = 0; 652 if (send) { 653 ends->numsendopen -= 1; 654 } 655 else { 656 ends->numrecvopen -= 1; 657 } 658} 659 660static int 661_channelends_close_interpreter(_channelends *ends, int64_t interp, int which) 662{ 663 _channelend *prev; 664 _channelend *end; 665 if (which >= 0) { // send/both 666 end = _channelend_find(ends->send, interp, &prev); 667 if (end == NULL) { 668 // never associated so add it 669 end = _channelends_add(ends, prev, interp, 1); 670 if (end == NULL) { 671 return -1; 672 } 673 } 674 _channelends_close_end(ends, end, 1); 675 } 676 if (which <= 0) { // recv/both 677 end = _channelend_find(ends->recv, interp, &prev); 678 if (end == NULL) { 679 // never associated so add it 680 end = _channelends_add(ends, prev, interp, 0); 681 if (end == NULL) { 682 return -1; 683 } 684 } 685 _channelends_close_end(ends, end, 0); 686 } 687 return 0; 688} 689 690static void 691_channelends_close_all(_channelends *ends, int which, int force) 692{ 693 // XXX Handle the ends. 694 // XXX Handle force is True. 695 696 // Ensure all the "send"-associated interpreters are closed. 697 _channelend *end; 698 for (end = ends->send; end != NULL; end = end->next) { 699 _channelends_close_end(ends, end, 1); 700 } 701 702 // Ensure all the "recv"-associated interpreters are closed. 703 for (end = ends->recv; end != NULL; end = end->next) { 704 _channelends_close_end(ends, end, 0); 705 } 706} 707 708/* channels */ 709 710struct _channel; 711struct _channel_closing; 712static void _channel_clear_closing(struct _channel *); 713static void _channel_finish_closing(struct _channel *); 714 715typedef struct _channel { 716 PyThread_type_lock mutex; 717 _channelqueue *queue; 718 _channelends *ends; 719 int open; 720 struct _channel_closing *closing; 721} _PyChannelState; 722 723static _PyChannelState * 724_channel_new(void) 725{ 726 _PyChannelState *chan = PyMem_NEW(_PyChannelState, 1); 727 if (chan == NULL) { 728 return NULL; 729 } 730 chan->mutex = PyThread_allocate_lock(); 731 if (chan->mutex == NULL) { 732 PyMem_Free(chan); 733 PyErr_SetString(ChannelError, 734 "can't initialize mutex for new channel"); 735 return NULL; 736 } 737 chan->queue = _channelqueue_new(); 738 if (chan->queue == NULL) { 739 PyMem_Free(chan); 740 return NULL; 741 } 742 chan->ends = _channelends_new(); 743 if (chan->ends == NULL) { 744 _channelqueue_free(chan->queue); 745 PyMem_Free(chan); 746 return NULL; 747 } 748 chan->open = 1; 749 chan->closing = NULL; 750 return chan; 751} 752 753static void 754_channel_free(_PyChannelState *chan) 755{ 756 _channel_clear_closing(chan); 757 PyThread_acquire_lock(chan->mutex, WAIT_LOCK); 758 _channelqueue_free(chan->queue); 759 _channelends_free(chan->ends); 760 PyThread_release_lock(chan->mutex); 761 762 PyThread_free_lock(chan->mutex); 763 PyMem_Free(chan); 764} 765 766static int 767_channel_add(_PyChannelState *chan, int64_t interp, 768 _PyCrossInterpreterData *data) 769{ 770 int res = -1; 771 PyThread_acquire_lock(chan->mutex, WAIT_LOCK); 772 773 if (!chan->open) { 774 PyErr_SetString(ChannelClosedError, "channel closed"); 775 goto done; 776 } 777 if (_channelends_associate(chan->ends, interp, 1) != 0) { 778 goto done; 779 } 780 781 if (_channelqueue_put(chan->queue, data) != 0) { 782 goto done; 783 } 784 785 res = 0; 786done: 787 PyThread_release_lock(chan->mutex); 788 return res; 789} 790 791static _PyCrossInterpreterData * 792_channel_next(_PyChannelState *chan, int64_t interp) 793{ 794 _PyCrossInterpreterData *data = NULL; 795 PyThread_acquire_lock(chan->mutex, WAIT_LOCK); 796 797 if (!chan->open) { 798 PyErr_SetString(ChannelClosedError, "channel closed"); 799 goto done; 800 } 801 if (_channelends_associate(chan->ends, interp, 0) != 0) { 802 goto done; 803 } 804 805 data = _channelqueue_get(chan->queue); 806 if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) { 807 chan->open = 0; 808 } 809 810done: 811 PyThread_release_lock(chan->mutex); 812 if (chan->queue->count == 0) { 813 _channel_finish_closing(chan); 814 } 815 return data; 816} 817 818static int 819_channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end) 820{ 821 PyThread_acquire_lock(chan->mutex, WAIT_LOCK); 822 823 int res = -1; 824 if (!chan->open) { 825 PyErr_SetString(ChannelClosedError, "channel already closed"); 826 goto done; 827 } 828 829 if (_channelends_close_interpreter(chan->ends, interp, end) != 0) { 830 goto done; 831 } 832 chan->open = _channelends_is_open(chan->ends); 833 834 res = 0; 835done: 836 PyThread_release_lock(chan->mutex); 837 return res; 838} 839 840static int 841_channel_close_all(_PyChannelState *chan, int end, int force) 842{ 843 int res = -1; 844 PyThread_acquire_lock(chan->mutex, WAIT_LOCK); 845 846 if (!chan->open) { 847 PyErr_SetString(ChannelClosedError, "channel already closed"); 848 goto done; 849 } 850 851 if (!force && chan->queue->count > 0) { 852 PyErr_SetString(ChannelNotEmptyError, 853 "may not be closed if not empty (try force=True)"); 854 goto done; 855 } 856 857 chan->open = 0; 858 859 // We *could* also just leave these in place, since we've marked 860 // the channel as closed already. 861 _channelends_close_all(chan->ends, end, force); 862 863 res = 0; 864done: 865 PyThread_release_lock(chan->mutex); 866 return res; 867} 868 869/* the set of channels */ 870 871struct _channelref; 872 873typedef struct _channelref { 874 int64_t id; 875 _PyChannelState *chan; 876 struct _channelref *next; 877 Py_ssize_t objcount; 878} _channelref; 879 880static _channelref * 881_channelref_new(int64_t id, _PyChannelState *chan) 882{ 883 _channelref *ref = PyMem_NEW(_channelref, 1); 884 if (ref == NULL) { 885 return NULL; 886 } 887 ref->id = id; 888 ref->chan = chan; 889 ref->next = NULL; 890 ref->objcount = 0; 891 return ref; 892} 893 894//static void 895//_channelref_clear(_channelref *ref) 896//{ 897// ref->id = -1; 898// ref->chan = NULL; 899// ref->next = NULL; 900// ref->objcount = 0; 901//} 902 903static void 904_channelref_free(_channelref *ref) 905{ 906 if (ref->chan != NULL) { 907 _channel_clear_closing(ref->chan); 908 } 909 //_channelref_clear(ref); 910 PyMem_Free(ref); 911} 912 913static _channelref * 914_channelref_find(_channelref *first, int64_t id, _channelref **pprev) 915{ 916 _channelref *prev = NULL; 917 _channelref *ref = first; 918 while (ref != NULL) { 919 if (ref->id == id) { 920 break; 921 } 922 prev = ref; 923 ref = ref->next; 924 } 925 if (pprev != NULL) { 926 *pprev = prev; 927 } 928 return ref; 929} 930 931typedef struct _channels { 932 PyThread_type_lock mutex; 933 _channelref *head; 934 int64_t numopen; 935 int64_t next_id; 936} _channels; 937 938static int 939_channels_init(_channels *channels) 940{ 941 if (channels->mutex == NULL) { 942 channels->mutex = PyThread_allocate_lock(); 943 if (channels->mutex == NULL) { 944 PyErr_SetString(ChannelError, 945 "can't initialize mutex for channel management"); 946 return -1; 947 } 948 } 949 channels->head = NULL; 950 channels->numopen = 0; 951 channels->next_id = 0; 952 return 0; 953} 954 955static int64_t 956_channels_next_id(_channels *channels) // needs lock 957{ 958 int64_t id = channels->next_id; 959 if (id < 0) { 960 /* overflow */ 961 PyErr_SetString(ChannelError, 962 "failed to get a channel ID"); 963 return -1; 964 } 965 channels->next_id += 1; 966 return id; 967} 968 969static _PyChannelState * 970_channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex) 971{ 972 _PyChannelState *chan = NULL; 973 PyThread_acquire_lock(channels->mutex, WAIT_LOCK); 974 if (pmutex != NULL) { 975 *pmutex = NULL; 976 } 977 978 _channelref *ref = _channelref_find(channels->head, id, NULL); 979 if (ref == NULL) { 980 PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id); 981 goto done; 982 } 983 if (ref->chan == NULL || !ref->chan->open) { 984 PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id); 985 goto done; 986 } 987 988 if (pmutex != NULL) { 989 // The mutex will be closed by the caller. 990 *pmutex = channels->mutex; 991 } 992 993 chan = ref->chan; 994done: 995 if (pmutex == NULL || *pmutex == NULL) { 996 PyThread_release_lock(channels->mutex); 997 } 998 return chan; 999} 1000 1001static int64_t 1002_channels_add(_channels *channels, _PyChannelState *chan) 1003{ 1004 int64_t cid = -1; 1005 PyThread_acquire_lock(channels->mutex, WAIT_LOCK); 1006 1007 // Create a new ref. 1008 int64_t id = _channels_next_id(channels); 1009 if (id < 0) { 1010 goto done; 1011 } 1012 _channelref *ref = _channelref_new(id, chan); 1013 if (ref == NULL) { 1014 goto done; 1015 } 1016 1017 // Add it to the list. 1018 // We assume that the channel is a new one (not already in the list). 1019 ref->next = channels->head; 1020 channels->head = ref; 1021 channels->numopen += 1; 1022 1023 cid = id; 1024done: 1025 PyThread_release_lock(channels->mutex); 1026 return cid; 1027} 1028 1029/* forward */ 1030static int _channel_set_closing(struct _channelref *, PyThread_type_lock); 1031 1032static int 1033_channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan, 1034 int end, int force) 1035{ 1036 int res = -1; 1037 PyThread_acquire_lock(channels->mutex, WAIT_LOCK); 1038 if (pchan != NULL) { 1039 *pchan = NULL; 1040 } 1041 1042 _channelref *ref = _channelref_find(channels->head, cid, NULL); 1043 if (ref == NULL) { 1044 PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", cid); 1045 goto done; 1046 } 1047 1048 if (ref->chan == NULL) { 1049 PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid); 1050 goto done; 1051 } 1052 else if (!force && end == CHANNEL_SEND && ref->chan->closing != NULL) { 1053 PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid); 1054 goto done; 1055 } 1056 else { 1057 if (_channel_close_all(ref->chan, end, force) != 0) { 1058 if (end == CHANNEL_SEND && 1059 PyErr_ExceptionMatches(ChannelNotEmptyError)) { 1060 if (ref->chan->closing != NULL) { 1061 PyErr_Format(ChannelClosedError, 1062 "channel %" PRId64 " closed", cid); 1063 goto done; 1064 } 1065 // Mark the channel as closing and return. The channel 1066 // will be cleaned up in _channel_next(). 1067 PyErr_Clear(); 1068 if (_channel_set_closing(ref, channels->mutex) != 0) { 1069 goto done; 1070 } 1071 if (pchan != NULL) { 1072 *pchan = ref->chan; 1073 } 1074 res = 0; 1075 } 1076 goto done; 1077 } 1078 if (pchan != NULL) { 1079 *pchan = ref->chan; 1080 } 1081 else { 1082 _channel_free(ref->chan); 1083 } 1084 ref->chan = NULL; 1085 } 1086 1087 res = 0; 1088done: 1089 PyThread_release_lock(channels->mutex); 1090 return res; 1091} 1092 1093static void 1094_channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev, 1095 _PyChannelState **pchan) 1096{ 1097 if (ref == channels->head) { 1098 channels->head = ref->next; 1099 } 1100 else { 1101 prev->next = ref->next; 1102 } 1103 channels->numopen -= 1; 1104 1105 if (pchan != NULL) { 1106 *pchan = ref->chan; 1107 } 1108 _channelref_free(ref); 1109} 1110 1111static int 1112_channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan) 1113{ 1114 int res = -1; 1115 PyThread_acquire_lock(channels->mutex, WAIT_LOCK); 1116 1117 if (pchan != NULL) { 1118 *pchan = NULL; 1119 } 1120 1121 _channelref *prev = NULL; 1122 _channelref *ref = _channelref_find(channels->head, id, &prev); 1123 if (ref == NULL) { 1124 PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id); 1125 goto done; 1126 } 1127 1128 _channels_remove_ref(channels, ref, prev, pchan); 1129 1130 res = 0; 1131done: 1132 PyThread_release_lock(channels->mutex); 1133 return res; 1134} 1135 1136static int 1137_channels_add_id_object(_channels *channels, int64_t id) 1138{ 1139 int res = -1; 1140 PyThread_acquire_lock(channels->mutex, WAIT_LOCK); 1141 1142 _channelref *ref = _channelref_find(channels->head, id, NULL); 1143 if (ref == NULL) { 1144 PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id); 1145 goto done; 1146 } 1147 ref->objcount += 1; 1148 1149 res = 0; 1150done: 1151 PyThread_release_lock(channels->mutex); 1152 return res; 1153} 1154 1155static void 1156_channels_drop_id_object(_channels *channels, int64_t id) 1157{ 1158 PyThread_acquire_lock(channels->mutex, WAIT_LOCK); 1159 1160 _channelref *prev = NULL; 1161 _channelref *ref = _channelref_find(channels->head, id, &prev); 1162 if (ref == NULL) { 1163 // Already destroyed. 1164 goto done; 1165 } 1166 ref->objcount -= 1; 1167 1168 // Destroy if no longer used. 1169 if (ref->objcount == 0) { 1170 _PyChannelState *chan = NULL; 1171 _channels_remove_ref(channels, ref, prev, &chan); 1172 if (chan != NULL) { 1173 _channel_free(chan); 1174 } 1175 } 1176 1177done: 1178 PyThread_release_lock(channels->mutex); 1179} 1180 1181static int64_t * 1182_channels_list_all(_channels *channels, int64_t *count) 1183{ 1184 int64_t *cids = NULL; 1185 PyThread_acquire_lock(channels->mutex, WAIT_LOCK); 1186 int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen)); 1187 if (ids == NULL) { 1188 goto done; 1189 } 1190 _channelref *ref = channels->head; 1191 for (int64_t i=0; ref != NULL; ref = ref->next, i++) { 1192 ids[i] = ref->id; 1193 } 1194 *count = channels->numopen; 1195 1196 cids = ids; 1197done: 1198 PyThread_release_lock(channels->mutex); 1199 return cids; 1200} 1201 1202/* support for closing non-empty channels */ 1203 1204struct _channel_closing { 1205 struct _channelref *ref; 1206}; 1207 1208static int 1209_channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) { 1210 struct _channel *chan = ref->chan; 1211 if (chan == NULL) { 1212 // already closed 1213 return 0; 1214 } 1215 int res = -1; 1216 PyThread_acquire_lock(chan->mutex, WAIT_LOCK); 1217 if (chan->closing != NULL) { 1218 PyErr_SetString(ChannelClosedError, "channel closed"); 1219 goto done; 1220 } 1221 chan->closing = PyMem_NEW(struct _channel_closing, 1); 1222 if (chan->closing == NULL) { 1223 goto done; 1224 } 1225 chan->closing->ref = ref; 1226 1227 res = 0; 1228done: 1229 PyThread_release_lock(chan->mutex); 1230 return res; 1231} 1232 1233static void 1234_channel_clear_closing(struct _channel *chan) { 1235 PyThread_acquire_lock(chan->mutex, WAIT_LOCK); 1236 if (chan->closing != NULL) { 1237 PyMem_Free(chan->closing); 1238 chan->closing = NULL; 1239 } 1240 PyThread_release_lock(chan->mutex); 1241} 1242 1243static void 1244_channel_finish_closing(struct _channel *chan) { 1245 struct _channel_closing *closing = chan->closing; 1246 if (closing == NULL) { 1247 return; 1248 } 1249 _channelref *ref = closing->ref; 1250 _channel_clear_closing(chan); 1251 // Do the things that would have been done in _channels_close(). 1252 ref->chan = NULL; 1253 _channel_free(chan); 1254} 1255 1256/* "high"-level channel-related functions */ 1257 1258static int64_t 1259_channel_create(_channels *channels) 1260{ 1261 _PyChannelState *chan = _channel_new(); 1262 if (chan == NULL) { 1263 return -1; 1264 } 1265 int64_t id = _channels_add(channels, chan); 1266 if (id < 0) { 1267 _channel_free(chan); 1268 return -1; 1269 } 1270 return id; 1271} 1272 1273static int 1274_channel_destroy(_channels *channels, int64_t id) 1275{ 1276 _PyChannelState *chan = NULL; 1277 if (_channels_remove(channels, id, &chan) != 0) { 1278 return -1; 1279 } 1280 if (chan != NULL) { 1281 _channel_free(chan); 1282 } 1283 return 0; 1284} 1285 1286static int 1287_channel_send(_channels *channels, int64_t id, PyObject *obj) 1288{ 1289 PyInterpreterState *interp = _get_current(); 1290 if (interp == NULL) { 1291 return -1; 1292 } 1293 1294 // Look up the channel. 1295 PyThread_type_lock mutex = NULL; 1296 _PyChannelState *chan = _channels_lookup(channels, id, &mutex); 1297 if (chan == NULL) { 1298 return -1; 1299 } 1300 // Past this point we are responsible for releasing the mutex. 1301 1302 if (chan->closing != NULL) { 1303 PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id); 1304 PyThread_release_lock(mutex); 1305 return -1; 1306 } 1307 1308 // Convert the object to cross-interpreter data. 1309 _PyCrossInterpreterData *data = PyMem_NEW(_PyCrossInterpreterData, 1); 1310 if (data == NULL) { 1311 PyThread_release_lock(mutex); 1312 return -1; 1313 } 1314 if (_PyObject_GetCrossInterpreterData(obj, data) != 0) { 1315 PyThread_release_lock(mutex); 1316 PyMem_Free(data); 1317 return -1; 1318 } 1319 1320 // Add the data to the channel. 1321 int res = _channel_add(chan, PyInterpreterState_GetID(interp), data); 1322 PyThread_release_lock(mutex); 1323 if (res != 0) { 1324 _PyCrossInterpreterData_Release(data); 1325 PyMem_Free(data); 1326 return -1; 1327 } 1328 1329 return 0; 1330} 1331 1332static PyObject * 1333_channel_recv(_channels *channels, int64_t id) 1334{ 1335 PyInterpreterState *interp = _get_current(); 1336 if (interp == NULL) { 1337 return NULL; 1338 } 1339 1340 // Look up the channel. 1341 PyThread_type_lock mutex = NULL; 1342 _PyChannelState *chan = _channels_lookup(channels, id, &mutex); 1343 if (chan == NULL) { 1344 return NULL; 1345 } 1346 // Past this point we are responsible for releasing the mutex. 1347 1348 // Pop off the next item from the channel. 1349 _PyCrossInterpreterData *data = _channel_next(chan, PyInterpreterState_GetID(interp)); 1350 PyThread_release_lock(mutex); 1351 if (data == NULL) { 1352 return NULL; 1353 } 1354 1355 // Convert the data back to an object. 1356 PyObject *obj = _PyCrossInterpreterData_NewObject(data); 1357 _PyCrossInterpreterData_Release(data); 1358 PyMem_Free(data); 1359 if (obj == NULL) { 1360 return NULL; 1361 } 1362 1363 return obj; 1364} 1365 1366static int 1367_channel_drop(_channels *channels, int64_t id, int send, int recv) 1368{ 1369 PyInterpreterState *interp = _get_current(); 1370 if (interp == NULL) { 1371 return -1; 1372 } 1373 1374 // Look up the channel. 1375 PyThread_type_lock mutex = NULL; 1376 _PyChannelState *chan = _channels_lookup(channels, id, &mutex); 1377 if (chan == NULL) { 1378 return -1; 1379 } 1380 // Past this point we are responsible for releasing the mutex. 1381 1382 // Close one or both of the two ends. 1383 int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv); 1384 PyThread_release_lock(mutex); 1385 return res; 1386} 1387 1388static int 1389_channel_close(_channels *channels, int64_t id, int end, int force) 1390{ 1391 return _channels_close(channels, id, NULL, end, force); 1392} 1393 1394static int 1395_channel_is_associated(_channels *channels, int64_t cid, int64_t interp, 1396 int send) 1397{ 1398 _PyChannelState *chan = _channels_lookup(channels, cid, NULL); 1399 if (chan == NULL) { 1400 return -1; 1401 } else if (send && chan->closing != NULL) { 1402 PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid); 1403 return -1; 1404 } 1405 1406 _channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv, 1407 interp, NULL); 1408 1409 return (end != NULL && end->open); 1410} 1411 1412/* ChannelID class */ 1413 1414static PyTypeObject ChannelIDtype; 1415 1416typedef struct channelid { 1417 PyObject_HEAD 1418 int64_t id; 1419 int end; 1420 int resolve; 1421 _channels *channels; 1422} channelid; 1423 1424static int 1425channel_id_converter(PyObject *arg, void *ptr) 1426{ 1427 int64_t cid; 1428 if (PyObject_TypeCheck(arg, &ChannelIDtype)) { 1429 cid = ((channelid *)arg)->id; 1430 } 1431 else if (PyIndex_Check(arg)) { 1432 cid = PyLong_AsLongLong(arg); 1433 if (cid == -1 && PyErr_Occurred()) { 1434 return 0; 1435 } 1436 if (cid < 0) { 1437 PyErr_Format(PyExc_ValueError, 1438 "channel ID must be a non-negative int, got %R", arg); 1439 return 0; 1440 } 1441 } 1442 else { 1443 PyErr_Format(PyExc_TypeError, 1444 "channel ID must be an int, got %.100s", 1445 Py_TYPE(arg)->tp_name); 1446 return 0; 1447 } 1448 *(int64_t *)ptr = cid; 1449 return 1; 1450} 1451 1452static channelid * 1453newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels, 1454 int force, int resolve) 1455{ 1456 channelid *self = PyObject_New(channelid, cls); 1457 if (self == NULL) { 1458 return NULL; 1459 } 1460 self->id = cid; 1461 self->end = end; 1462 self->resolve = resolve; 1463 self->channels = channels; 1464 1465 if (_channels_add_id_object(channels, cid) != 0) { 1466 if (force && PyErr_ExceptionMatches(ChannelNotFoundError)) { 1467 PyErr_Clear(); 1468 } 1469 else { 1470 Py_DECREF((PyObject *)self); 1471 return NULL; 1472 } 1473 } 1474 1475 return self; 1476} 1477 1478static _channels * _global_channels(void); 1479 1480static PyObject * 1481channelid_new(PyTypeObject *cls, PyObject *args, PyObject *kwds) 1482{ 1483 static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL}; 1484 int64_t cid; 1485 int send = -1; 1486 int recv = -1; 1487 int force = 0; 1488 int resolve = 0; 1489 if (!PyArg_ParseTupleAndKeywords(args, kwds, 1490 "O&|$pppp:ChannelID.__new__", kwlist, 1491 channel_id_converter, &cid, &send, &recv, &force, &resolve)) 1492 return NULL; 1493 1494 // Handle "send" and "recv". 1495 if (send == 0 && recv == 0) { 1496 PyErr_SetString(PyExc_ValueError, 1497 "'send' and 'recv' cannot both be False"); 1498 return NULL; 1499 } 1500 1501 int end = 0; 1502 if (send == 1) { 1503 if (recv == 0 || recv == -1) { 1504 end = CHANNEL_SEND; 1505 } 1506 } 1507 else if (recv == 1) { 1508 end = CHANNEL_RECV; 1509 } 1510 1511 return (PyObject *)newchannelid(cls, cid, end, _global_channels(), 1512 force, resolve); 1513} 1514 1515static void 1516channelid_dealloc(PyObject *v) 1517{ 1518 int64_t cid = ((channelid *)v)->id; 1519 _channels *channels = ((channelid *)v)->channels; 1520 Py_TYPE(v)->tp_free(v); 1521 1522 _channels_drop_id_object(channels, cid); 1523} 1524 1525static PyObject * 1526channelid_repr(PyObject *self) 1527{ 1528 PyTypeObject *type = Py_TYPE(self); 1529 const char *name = _PyType_Name(type); 1530 1531 channelid *cid = (channelid *)self; 1532 const char *fmt; 1533 if (cid->end == CHANNEL_SEND) { 1534 fmt = "%s(%" PRId64 ", send=True)"; 1535 } 1536 else if (cid->end == CHANNEL_RECV) { 1537 fmt = "%s(%" PRId64 ", recv=True)"; 1538 } 1539 else { 1540 fmt = "%s(%" PRId64 ")"; 1541 } 1542 return PyUnicode_FromFormat(fmt, name, cid->id); 1543} 1544 1545static PyObject * 1546channelid_str(PyObject *self) 1547{ 1548 channelid *cid = (channelid *)self; 1549 return PyUnicode_FromFormat("%" PRId64 "", cid->id); 1550} 1551 1552static PyObject * 1553channelid_int(PyObject *self) 1554{ 1555 channelid *cid = (channelid *)self; 1556 return PyLong_FromLongLong(cid->id); 1557} 1558 1559static PyNumberMethods channelid_as_number = { 1560 0, /* nb_add */ 1561 0, /* nb_subtract */ 1562 0, /* nb_multiply */ 1563 0, /* nb_remainder */ 1564 0, /* nb_divmod */ 1565 0, /* nb_power */ 1566 0, /* nb_negative */ 1567 0, /* nb_positive */ 1568 0, /* nb_absolute */ 1569 0, /* nb_bool */ 1570 0, /* nb_invert */ 1571 0, /* nb_lshift */ 1572 0, /* nb_rshift */ 1573 0, /* nb_and */ 1574 0, /* nb_xor */ 1575 0, /* nb_or */ 1576 (unaryfunc)channelid_int, /* nb_int */ 1577 0, /* nb_reserved */ 1578 0, /* nb_float */ 1579 1580 0, /* nb_inplace_add */ 1581 0, /* nb_inplace_subtract */ 1582 0, /* nb_inplace_multiply */ 1583 0, /* nb_inplace_remainder */ 1584 0, /* nb_inplace_power */ 1585 0, /* nb_inplace_lshift */ 1586 0, /* nb_inplace_rshift */ 1587 0, /* nb_inplace_and */ 1588 0, /* nb_inplace_xor */ 1589 0, /* nb_inplace_or */ 1590 1591 0, /* nb_floor_divide */ 1592 0, /* nb_true_divide */ 1593 0, /* nb_inplace_floor_divide */ 1594 0, /* nb_inplace_true_divide */ 1595 1596 (unaryfunc)channelid_int, /* nb_index */ 1597}; 1598 1599static Py_hash_t 1600channelid_hash(PyObject *self) 1601{ 1602 channelid *cid = (channelid *)self; 1603 PyObject *id = PyLong_FromLongLong(cid->id); 1604 if (id == NULL) { 1605 return -1; 1606 } 1607 Py_hash_t hash = PyObject_Hash(id); 1608 Py_DECREF(id); 1609 return hash; 1610} 1611 1612static PyObject * 1613channelid_richcompare(PyObject *self, PyObject *other, int op) 1614{ 1615 if (op != Py_EQ && op != Py_NE) { 1616 Py_RETURN_NOTIMPLEMENTED; 1617 } 1618 1619 if (!PyObject_TypeCheck(self, &ChannelIDtype)) { 1620 Py_RETURN_NOTIMPLEMENTED; 1621 } 1622 1623 channelid *cid = (channelid *)self; 1624 int equal; 1625 if (PyObject_TypeCheck(other, &ChannelIDtype)) { 1626 channelid *othercid = (channelid *)other; 1627 equal = (cid->end == othercid->end) && (cid->id == othercid->id); 1628 } 1629 else if (PyLong_Check(other)) { 1630 /* Fast path */ 1631 int overflow; 1632 long long othercid = PyLong_AsLongLongAndOverflow(other, &overflow); 1633 if (othercid == -1 && PyErr_Occurred()) { 1634 return NULL; 1635 } 1636 equal = !overflow && (othercid >= 0) && (cid->id == othercid); 1637 } 1638 else if (PyNumber_Check(other)) { 1639 PyObject *pyid = PyLong_FromLongLong(cid->id); 1640 if (pyid == NULL) { 1641 return NULL; 1642 } 1643 PyObject *res = PyObject_RichCompare(pyid, other, op); 1644 Py_DECREF(pyid); 1645 return res; 1646 } 1647 else { 1648 Py_RETURN_NOTIMPLEMENTED; 1649 } 1650 1651 if ((op == Py_EQ && equal) || (op == Py_NE && !equal)) { 1652 Py_RETURN_TRUE; 1653 } 1654 Py_RETURN_FALSE; 1655} 1656 1657static PyObject * 1658_channel_from_cid(PyObject *cid, int end) 1659{ 1660 PyObject *highlevel = PyImport_ImportModule("interpreters"); 1661 if (highlevel == NULL) { 1662 PyErr_Clear(); 1663 highlevel = PyImport_ImportModule("test.support.interpreters"); 1664 if (highlevel == NULL) { 1665 return NULL; 1666 } 1667 } 1668 const char *clsname = (end == CHANNEL_RECV) ? "RecvChannel" : 1669 "SendChannel"; 1670 PyObject *cls = PyObject_GetAttrString(highlevel, clsname); 1671 Py_DECREF(highlevel); 1672 if (cls == NULL) { 1673 return NULL; 1674 } 1675 PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL); 1676 Py_DECREF(cls); 1677 if (chan == NULL) { 1678 return NULL; 1679 } 1680 return chan; 1681} 1682 1683struct _channelid_xid { 1684 int64_t id; 1685 int end; 1686 int resolve; 1687}; 1688 1689static PyObject * 1690_channelid_from_xid(_PyCrossInterpreterData *data) 1691{ 1692 struct _channelid_xid *xid = (struct _channelid_xid *)data->data; 1693 // Note that we do not preserve the "resolve" flag. 1694 PyObject *cid = (PyObject *)newchannelid(&ChannelIDtype, xid->id, xid->end, 1695 _global_channels(), 0, 0); 1696 if (xid->end == 0) { 1697 return cid; 1698 } 1699 if (!xid->resolve) { 1700 return cid; 1701 } 1702 1703 /* Try returning a high-level channel end but fall back to the ID. */ 1704 PyObject *chan = _channel_from_cid(cid, xid->end); 1705 if (chan == NULL) { 1706 PyErr_Clear(); 1707 return cid; 1708 } 1709 Py_DECREF(cid); 1710 return chan; 1711} 1712 1713static int 1714_channelid_shared(PyObject *obj, _PyCrossInterpreterData *data) 1715{ 1716 struct _channelid_xid *xid = PyMem_NEW(struct _channelid_xid, 1); 1717 if (xid == NULL) { 1718 return -1; 1719 } 1720 xid->id = ((channelid *)obj)->id; 1721 xid->end = ((channelid *)obj)->end; 1722 xid->resolve = ((channelid *)obj)->resolve; 1723 1724 data->data = xid; 1725 Py_INCREF(obj); 1726 data->obj = obj; 1727 data->new_object = _channelid_from_xid; 1728 data->free = PyMem_Free; 1729 return 0; 1730} 1731 1732static PyObject * 1733channelid_end(PyObject *self, void *end) 1734{ 1735 int force = 1; 1736 channelid *cid = (channelid *)self; 1737 if (end != NULL) { 1738 return (PyObject *)newchannelid(Py_TYPE(self), cid->id, *(int *)end, 1739 cid->channels, force, cid->resolve); 1740 } 1741 1742 if (cid->end == CHANNEL_SEND) { 1743 return PyUnicode_InternFromString("send"); 1744 } 1745 if (cid->end == CHANNEL_RECV) { 1746 return PyUnicode_InternFromString("recv"); 1747 } 1748 return PyUnicode_InternFromString("both"); 1749} 1750 1751static int _channelid_end_send = CHANNEL_SEND; 1752static int _channelid_end_recv = CHANNEL_RECV; 1753 1754static PyGetSetDef channelid_getsets[] = { 1755 {"end", (getter)channelid_end, NULL, 1756 PyDoc_STR("'send', 'recv', or 'both'")}, 1757 {"send", (getter)channelid_end, NULL, 1758 PyDoc_STR("the 'send' end of the channel"), &_channelid_end_send}, 1759 {"recv", (getter)channelid_end, NULL, 1760 PyDoc_STR("the 'recv' end of the channel"), &_channelid_end_recv}, 1761 {NULL} 1762}; 1763 1764PyDoc_STRVAR(channelid_doc, 1765"A channel ID identifies a channel and may be used as an int."); 1766 1767static PyTypeObject ChannelIDtype = { 1768 PyVarObject_HEAD_INIT(&PyType_Type, 0) 1769 "_xxsubinterpreters.ChannelID", /* tp_name */ 1770 sizeof(channelid), /* tp_basicsize */ 1771 0, /* tp_itemsize */ 1772 (destructor)channelid_dealloc, /* tp_dealloc */ 1773 0, /* tp_vectorcall_offset */ 1774 0, /* tp_getattr */ 1775 0, /* tp_setattr */ 1776 0, /* tp_as_async */ 1777 (reprfunc)channelid_repr, /* tp_repr */ 1778 &channelid_as_number, /* tp_as_number */ 1779 0, /* tp_as_sequence */ 1780 0, /* tp_as_mapping */ 1781 channelid_hash, /* tp_hash */ 1782 0, /* tp_call */ 1783 (reprfunc)channelid_str, /* tp_str */ 1784 0, /* tp_getattro */ 1785 0, /* tp_setattro */ 1786 0, /* tp_as_buffer */ 1787 // Use Py_TPFLAGS_DISALLOW_INSTANTIATION so the type cannot be instantiated 1788 // from Python code. We do this because there is a strong relationship 1789 // between channel IDs and the channel lifecycle, so this limitation avoids 1790 // related complications. Use the _channel_id() function instead. 1791 Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE 1792 | Py_TPFLAGS_DISALLOW_INSTANTIATION, /* tp_flags */ 1793 channelid_doc, /* tp_doc */ 1794 0, /* tp_traverse */ 1795 0, /* tp_clear */ 1796 channelid_richcompare, /* tp_richcompare */ 1797 0, /* tp_weaklistoffset */ 1798 0, /* tp_iter */ 1799 0, /* tp_iternext */ 1800 0, /* tp_methods */ 1801 0, /* tp_members */ 1802 channelid_getsets, /* tp_getset */ 1803}; 1804 1805 1806/* interpreter-specific code ************************************************/ 1807 1808static PyObject * RunFailedError = NULL; 1809 1810static int 1811interp_exceptions_init(PyObject *ns) 1812{ 1813 // XXX Move the exceptions into per-module memory? 1814 1815 if (RunFailedError == NULL) { 1816 // An uncaught exception came out of interp_run_string(). 1817 RunFailedError = PyErr_NewException("_xxsubinterpreters.RunFailedError", 1818 PyExc_RuntimeError, NULL); 1819 if (RunFailedError == NULL) { 1820 return -1; 1821 } 1822 if (PyDict_SetItemString(ns, "RunFailedError", RunFailedError) != 0) { 1823 return -1; 1824 } 1825 } 1826 1827 return 0; 1828} 1829 1830static int 1831_is_running(PyInterpreterState *interp) 1832{ 1833 PyThreadState *tstate = PyInterpreterState_ThreadHead(interp); 1834 if (PyThreadState_Next(tstate) != NULL) { 1835 PyErr_SetString(PyExc_RuntimeError, 1836 "interpreter has more than one thread"); 1837 return -1; 1838 } 1839 1840 assert(!PyErr_Occurred()); 1841 _PyInterpreterFrame *frame = tstate->cframe->current_frame; 1842 if (frame == NULL) { 1843 return 0; 1844 } 1845 return 1; 1846} 1847 1848static int 1849_ensure_not_running(PyInterpreterState *interp) 1850{ 1851 int is_running = _is_running(interp); 1852 if (is_running < 0) { 1853 return -1; 1854 } 1855 if (is_running) { 1856 PyErr_Format(PyExc_RuntimeError, "interpreter already running"); 1857 return -1; 1858 } 1859 return 0; 1860} 1861 1862static int 1863_run_script(PyInterpreterState *interp, const char *codestr, 1864 _sharedns *shared, _sharedexception **exc) 1865{ 1866 PyObject *exctype = NULL; 1867 PyObject *excval = NULL; 1868 PyObject *tb = NULL; 1869 1870 PyObject *main_mod = _PyInterpreterState_GetMainModule(interp); 1871 if (main_mod == NULL) { 1872 goto error; 1873 } 1874 PyObject *ns = PyModule_GetDict(main_mod); // borrowed 1875 Py_DECREF(main_mod); 1876 if (ns == NULL) { 1877 goto error; 1878 } 1879 Py_INCREF(ns); 1880 1881 // Apply the cross-interpreter data. 1882 if (shared != NULL) { 1883 if (_sharedns_apply(shared, ns) != 0) { 1884 Py_DECREF(ns); 1885 goto error; 1886 } 1887 } 1888 1889 // Run the string (see PyRun_SimpleStringFlags). 1890 PyObject *result = PyRun_StringFlags(codestr, Py_file_input, ns, ns, NULL); 1891 Py_DECREF(ns); 1892 if (result == NULL) { 1893 goto error; 1894 } 1895 else { 1896 Py_DECREF(result); // We throw away the result. 1897 } 1898 1899 *exc = NULL; 1900 return 0; 1901 1902error: 1903 PyErr_Fetch(&exctype, &excval, &tb); 1904 1905 _sharedexception *sharedexc = _sharedexception_bind(exctype, excval, tb); 1906 Py_XDECREF(exctype); 1907 Py_XDECREF(excval); 1908 Py_XDECREF(tb); 1909 if (sharedexc == NULL) { 1910 fprintf(stderr, "RunFailedError: script raised an uncaught exception"); 1911 PyErr_Clear(); 1912 sharedexc = NULL; 1913 } 1914 else { 1915 assert(!PyErr_Occurred()); 1916 } 1917 *exc = sharedexc; 1918 return -1; 1919} 1920 1921static int 1922_run_script_in_interpreter(PyInterpreterState *interp, const char *codestr, 1923 PyObject *shareables) 1924{ 1925 if (_ensure_not_running(interp) < 0) { 1926 return -1; 1927 } 1928 1929 _sharedns *shared = _get_shared_ns(shareables); 1930 if (shared == NULL && PyErr_Occurred()) { 1931 return -1; 1932 } 1933 1934 // Switch to interpreter. 1935 PyThreadState *save_tstate = NULL; 1936 if (interp != PyInterpreterState_Get()) { 1937 // XXX Using the "head" thread isn't strictly correct. 1938 PyThreadState *tstate = PyInterpreterState_ThreadHead(interp); 1939 // XXX Possible GILState issues? 1940 save_tstate = PyThreadState_Swap(tstate); 1941 } 1942 1943 // Run the script. 1944 _sharedexception *exc = NULL; 1945 int result = _run_script(interp, codestr, shared, &exc); 1946 1947 // Switch back. 1948 if (save_tstate != NULL) { 1949 PyThreadState_Swap(save_tstate); 1950 } 1951 1952 // Propagate any exception out to the caller. 1953 if (exc != NULL) { 1954 _sharedexception_apply(exc, RunFailedError); 1955 _sharedexception_free(exc); 1956 } 1957 else if (result != 0) { 1958 // We were unable to allocate a shared exception. 1959 PyErr_NoMemory(); 1960 } 1961 1962 if (shared != NULL) { 1963 _sharedns_free(shared); 1964 } 1965 1966 return result; 1967} 1968 1969 1970/* module level code ********************************************************/ 1971 1972/* globals is the process-global state for the module. It holds all 1973 the data that we need to share between interpreters, so it cannot 1974 hold PyObject values. */ 1975static struct globals { 1976 _channels channels; 1977} _globals = {{0}}; 1978 1979static int 1980_init_globals(void) 1981{ 1982 if (_channels_init(&_globals.channels) != 0) { 1983 return -1; 1984 } 1985 return 0; 1986} 1987 1988static _channels * 1989_global_channels(void) { 1990 return &_globals.channels; 1991} 1992 1993static PyObject * 1994interp_create(PyObject *self, PyObject *args, PyObject *kwds) 1995{ 1996 1997 static char *kwlist[] = {"isolated", NULL}; 1998 int isolated = 1; 1999 if (!PyArg_ParseTupleAndKeywords(args, kwds, "|$i:create", kwlist, 2000 &isolated)) { 2001 return NULL; 2002 } 2003 2004 // Create and initialize the new interpreter. 2005 PyThreadState *save_tstate = _PyThreadState_GET(); 2006 // XXX Possible GILState issues? 2007 PyThreadState *tstate = _Py_NewInterpreter(isolated); 2008 PyThreadState_Swap(save_tstate); 2009 if (tstate == NULL) { 2010 /* Since no new thread state was created, there is no exception to 2011 propagate; raise a fresh one after swapping in the old thread 2012 state. */ 2013 PyErr_SetString(PyExc_RuntimeError, "interpreter creation failed"); 2014 return NULL; 2015 } 2016 PyInterpreterState *interp = PyThreadState_GetInterpreter(tstate); 2017 PyObject *idobj = _PyInterpreterState_GetIDObject(interp); 2018 if (idobj == NULL) { 2019 // XXX Possible GILState issues? 2020 save_tstate = PyThreadState_Swap(tstate); 2021 Py_EndInterpreter(tstate); 2022 PyThreadState_Swap(save_tstate); 2023 return NULL; 2024 } 2025 _PyInterpreterState_RequireIDRef(interp, 1); 2026 return idobj; 2027} 2028 2029PyDoc_STRVAR(create_doc, 2030"create() -> ID\n\ 2031\n\ 2032Create a new interpreter and return a unique generated ID."); 2033 2034 2035static PyObject * 2036interp_destroy(PyObject *self, PyObject *args, PyObject *kwds) 2037{ 2038 static char *kwlist[] = {"id", NULL}; 2039 PyObject *id; 2040 // XXX Use "L" for id? 2041 if (!PyArg_ParseTupleAndKeywords(args, kwds, 2042 "O:destroy", kwlist, &id)) { 2043 return NULL; 2044 } 2045 2046 // Look up the interpreter. 2047 PyInterpreterState *interp = _PyInterpreterID_LookUp(id); 2048 if (interp == NULL) { 2049 return NULL; 2050 } 2051 2052 // Ensure we don't try to destroy the current interpreter. 2053 PyInterpreterState *current = _get_current(); 2054 if (current == NULL) { 2055 return NULL; 2056 } 2057 if (interp == current) { 2058 PyErr_SetString(PyExc_RuntimeError, 2059 "cannot destroy the current interpreter"); 2060 return NULL; 2061 } 2062 2063 // Ensure the interpreter isn't running. 2064 /* XXX We *could* support destroying a running interpreter but 2065 aren't going to worry about it for now. */ 2066 if (_ensure_not_running(interp) < 0) { 2067 return NULL; 2068 } 2069 2070 // Destroy the interpreter. 2071 PyThreadState *tstate = PyInterpreterState_ThreadHead(interp); 2072 // XXX Possible GILState issues? 2073 PyThreadState *save_tstate = PyThreadState_Swap(tstate); 2074 Py_EndInterpreter(tstate); 2075 PyThreadState_Swap(save_tstate); 2076 2077 Py_RETURN_NONE; 2078} 2079 2080PyDoc_STRVAR(destroy_doc, 2081"destroy(id)\n\ 2082\n\ 2083Destroy the identified interpreter.\n\ 2084\n\ 2085Attempting to destroy the current interpreter results in a RuntimeError.\n\ 2086So does an unrecognized ID."); 2087 2088 2089static PyObject * 2090interp_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) 2091{ 2092 PyObject *ids, *id; 2093 PyInterpreterState *interp; 2094 2095 ids = PyList_New(0); 2096 if (ids == NULL) { 2097 return NULL; 2098 } 2099 2100 interp = PyInterpreterState_Head(); 2101 while (interp != NULL) { 2102 id = _PyInterpreterState_GetIDObject(interp); 2103 if (id == NULL) { 2104 Py_DECREF(ids); 2105 return NULL; 2106 } 2107 // insert at front of list 2108 int res = PyList_Insert(ids, 0, id); 2109 Py_DECREF(id); 2110 if (res < 0) { 2111 Py_DECREF(ids); 2112 return NULL; 2113 } 2114 2115 interp = PyInterpreterState_Next(interp); 2116 } 2117 2118 return ids; 2119} 2120 2121PyDoc_STRVAR(list_all_doc, 2122"list_all() -> [ID]\n\ 2123\n\ 2124Return a list containing the ID of every existing interpreter."); 2125 2126 2127static PyObject * 2128interp_get_current(PyObject *self, PyObject *Py_UNUSED(ignored)) 2129{ 2130 PyInterpreterState *interp =_get_current(); 2131 if (interp == NULL) { 2132 return NULL; 2133 } 2134 return _PyInterpreterState_GetIDObject(interp); 2135} 2136 2137PyDoc_STRVAR(get_current_doc, 2138"get_current() -> ID\n\ 2139\n\ 2140Return the ID of current interpreter."); 2141 2142 2143static PyObject * 2144interp_get_main(PyObject *self, PyObject *Py_UNUSED(ignored)) 2145{ 2146 // Currently, 0 is always the main interpreter. 2147 int64_t id = 0; 2148 return _PyInterpreterID_New(id); 2149} 2150 2151PyDoc_STRVAR(get_main_doc, 2152"get_main() -> ID\n\ 2153\n\ 2154Return the ID of main interpreter."); 2155 2156 2157static PyObject * 2158interp_run_string(PyObject *self, PyObject *args, PyObject *kwds) 2159{ 2160 static char *kwlist[] = {"id", "script", "shared", NULL}; 2161 PyObject *id, *code; 2162 PyObject *shared = NULL; 2163 if (!PyArg_ParseTupleAndKeywords(args, kwds, 2164 "OU|O:run_string", kwlist, 2165 &id, &code, &shared)) { 2166 return NULL; 2167 } 2168 2169 // Look up the interpreter. 2170 PyInterpreterState *interp = _PyInterpreterID_LookUp(id); 2171 if (interp == NULL) { 2172 return NULL; 2173 } 2174 2175 // Extract code. 2176 Py_ssize_t size; 2177 const char *codestr = PyUnicode_AsUTF8AndSize(code, &size); 2178 if (codestr == NULL) { 2179 return NULL; 2180 } 2181 if (strlen(codestr) != (size_t)size) { 2182 PyErr_SetString(PyExc_ValueError, 2183 "source code string cannot contain null bytes"); 2184 return NULL; 2185 } 2186 2187 // Run the code in the interpreter. 2188 if (_run_script_in_interpreter(interp, codestr, shared) != 0) { 2189 return NULL; 2190 } 2191 Py_RETURN_NONE; 2192} 2193 2194PyDoc_STRVAR(run_string_doc, 2195"run_string(id, script, shared)\n\ 2196\n\ 2197Execute the provided string in the identified interpreter.\n\ 2198\n\ 2199See PyRun_SimpleStrings."); 2200 2201 2202static PyObject * 2203object_is_shareable(PyObject *self, PyObject *args, PyObject *kwds) 2204{ 2205 static char *kwlist[] = {"obj", NULL}; 2206 PyObject *obj; 2207 if (!PyArg_ParseTupleAndKeywords(args, kwds, 2208 "O:is_shareable", kwlist, &obj)) { 2209 return NULL; 2210 } 2211 2212 if (_PyObject_CheckCrossInterpreterData(obj) == 0) { 2213 Py_RETURN_TRUE; 2214 } 2215 PyErr_Clear(); 2216 Py_RETURN_FALSE; 2217} 2218 2219PyDoc_STRVAR(is_shareable_doc, 2220"is_shareable(obj) -> bool\n\ 2221\n\ 2222Return True if the object's data may be shared between interpreters and\n\ 2223False otherwise."); 2224 2225 2226static PyObject * 2227interp_is_running(PyObject *self, PyObject *args, PyObject *kwds) 2228{ 2229 static char *kwlist[] = {"id", NULL}; 2230 PyObject *id; 2231 if (!PyArg_ParseTupleAndKeywords(args, kwds, 2232 "O:is_running", kwlist, &id)) { 2233 return NULL; 2234 } 2235 2236 PyInterpreterState *interp = _PyInterpreterID_LookUp(id); 2237 if (interp == NULL) { 2238 return NULL; 2239 } 2240 int is_running = _is_running(interp); 2241 if (is_running < 0) { 2242 return NULL; 2243 } 2244 if (is_running) { 2245 Py_RETURN_TRUE; 2246 } 2247 Py_RETURN_FALSE; 2248} 2249 2250PyDoc_STRVAR(is_running_doc, 2251"is_running(id) -> bool\n\ 2252\n\ 2253Return whether or not the identified interpreter is running."); 2254 2255static PyObject * 2256channel_create(PyObject *self, PyObject *Py_UNUSED(ignored)) 2257{ 2258 int64_t cid = _channel_create(&_globals.channels); 2259 if (cid < 0) { 2260 return NULL; 2261 } 2262 PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, cid, 0, 2263 &_globals.channels, 0, 0); 2264 if (id == NULL) { 2265 if (_channel_destroy(&_globals.channels, cid) != 0) { 2266 // XXX issue a warning? 2267 } 2268 return NULL; 2269 } 2270 assert(((channelid *)id)->channels != NULL); 2271 return id; 2272} 2273 2274PyDoc_STRVAR(channel_create_doc, 2275"channel_create() -> cid\n\ 2276\n\ 2277Create a new cross-interpreter channel and return a unique generated ID."); 2278 2279static PyObject * 2280channel_destroy(PyObject *self, PyObject *args, PyObject *kwds) 2281{ 2282 static char *kwlist[] = {"cid", NULL}; 2283 int64_t cid; 2284 if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_destroy", kwlist, 2285 channel_id_converter, &cid)) { 2286 return NULL; 2287 } 2288 2289 if (_channel_destroy(&_globals.channels, cid) != 0) { 2290 return NULL; 2291 } 2292 Py_RETURN_NONE; 2293} 2294 2295PyDoc_STRVAR(channel_destroy_doc, 2296"channel_destroy(cid)\n\ 2297\n\ 2298Close and finalize the channel. Afterward attempts to use the channel\n\ 2299will behave as though it never existed."); 2300 2301static PyObject * 2302channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) 2303{ 2304 int64_t count = 0; 2305 int64_t *cids = _channels_list_all(&_globals.channels, &count); 2306 if (cids == NULL) { 2307 if (count == 0) { 2308 return PyList_New(0); 2309 } 2310 return NULL; 2311 } 2312 PyObject *ids = PyList_New((Py_ssize_t)count); 2313 if (ids == NULL) { 2314 goto finally; 2315 } 2316 int64_t *cur = cids; 2317 for (int64_t i=0; i < count; cur++, i++) { 2318 PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, *cur, 0, 2319 &_globals.channels, 0, 0); 2320 if (id == NULL) { 2321 Py_DECREF(ids); 2322 ids = NULL; 2323 break; 2324 } 2325 PyList_SET_ITEM(ids, i, id); 2326 } 2327 2328finally: 2329 PyMem_Free(cids); 2330 return ids; 2331} 2332 2333PyDoc_STRVAR(channel_list_all_doc, 2334"channel_list_all() -> [cid]\n\ 2335\n\ 2336Return the list of all IDs for active channels."); 2337 2338static PyObject * 2339channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds) 2340{ 2341 static char *kwlist[] = {"cid", "send", NULL}; 2342 int64_t cid; /* Channel ID */ 2343 int send = 0; /* Send or receive end? */ 2344 int64_t id; 2345 PyObject *ids, *id_obj; 2346 PyInterpreterState *interp; 2347 2348 if (!PyArg_ParseTupleAndKeywords( 2349 args, kwds, "O&$p:channel_list_interpreters", 2350 kwlist, channel_id_converter, &cid, &send)) { 2351 return NULL; 2352 } 2353 2354 ids = PyList_New(0); 2355 if (ids == NULL) { 2356 goto except; 2357 } 2358 2359 interp = PyInterpreterState_Head(); 2360 while (interp != NULL) { 2361 id = PyInterpreterState_GetID(interp); 2362 assert(id >= 0); 2363 int res = _channel_is_associated(&_globals.channels, cid, id, send); 2364 if (res < 0) { 2365 goto except; 2366 } 2367 if (res) { 2368 id_obj = _PyInterpreterState_GetIDObject(interp); 2369 if (id_obj == NULL) { 2370 goto except; 2371 } 2372 res = PyList_Insert(ids, 0, id_obj); 2373 Py_DECREF(id_obj); 2374 if (res < 0) { 2375 goto except; 2376 } 2377 } 2378 interp = PyInterpreterState_Next(interp); 2379 } 2380 2381 goto finally; 2382 2383except: 2384 Py_XDECREF(ids); 2385 ids = NULL; 2386 2387finally: 2388 return ids; 2389} 2390 2391PyDoc_STRVAR(channel_list_interpreters_doc, 2392"channel_list_interpreters(cid, *, send) -> [id]\n\ 2393\n\ 2394Return the list of all interpreter IDs associated with an end of the channel.\n\ 2395\n\ 2396The 'send' argument should be a boolean indicating whether to use the send or\n\ 2397receive end."); 2398 2399 2400static PyObject * 2401channel_send(PyObject *self, PyObject *args, PyObject *kwds) 2402{ 2403 static char *kwlist[] = {"cid", "obj", NULL}; 2404 int64_t cid; 2405 PyObject *obj; 2406 if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist, 2407 channel_id_converter, &cid, &obj)) { 2408 return NULL; 2409 } 2410 2411 if (_channel_send(&_globals.channels, cid, obj) != 0) { 2412 return NULL; 2413 } 2414 Py_RETURN_NONE; 2415} 2416 2417PyDoc_STRVAR(channel_send_doc, 2418"channel_send(cid, obj)\n\ 2419\n\ 2420Add the object's data to the channel's queue."); 2421 2422static PyObject * 2423channel_recv(PyObject *self, PyObject *args, PyObject *kwds) 2424{ 2425 static char *kwlist[] = {"cid", "default", NULL}; 2426 int64_t cid; 2427 PyObject *dflt = NULL; 2428 if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&|O:channel_recv", kwlist, 2429 channel_id_converter, &cid, &dflt)) { 2430 return NULL; 2431 } 2432 Py_XINCREF(dflt); 2433 2434 PyObject *obj = _channel_recv(&_globals.channels, cid); 2435 if (obj != NULL) { 2436 Py_XDECREF(dflt); 2437 return obj; 2438 } else if (PyErr_Occurred()) { 2439 Py_XDECREF(dflt); 2440 return NULL; 2441 } else if (dflt != NULL) { 2442 return dflt; 2443 } else { 2444 PyErr_Format(ChannelEmptyError, "channel %" PRId64 " is empty", cid); 2445 return NULL; 2446 } 2447} 2448 2449PyDoc_STRVAR(channel_recv_doc, 2450"channel_recv(cid, [default]) -> obj\n\ 2451\n\ 2452Return a new object from the data at the front of the channel's queue.\n\ 2453\n\ 2454If there is nothing to receive then raise ChannelEmptyError, unless\n\ 2455a default value is provided. In that case return it."); 2456 2457static PyObject * 2458channel_close(PyObject *self, PyObject *args, PyObject *kwds) 2459{ 2460 static char *kwlist[] = {"cid", "send", "recv", "force", NULL}; 2461 int64_t cid; 2462 int send = 0; 2463 int recv = 0; 2464 int force = 0; 2465 if (!PyArg_ParseTupleAndKeywords(args, kwds, 2466 "O&|$ppp:channel_close", kwlist, 2467 channel_id_converter, &cid, &send, &recv, &force)) { 2468 return NULL; 2469 } 2470 2471 if (_channel_close(&_globals.channels, cid, send-recv, force) != 0) { 2472 return NULL; 2473 } 2474 Py_RETURN_NONE; 2475} 2476 2477PyDoc_STRVAR(channel_close_doc, 2478"channel_close(cid, *, send=None, recv=None, force=False)\n\ 2479\n\ 2480Close the channel for all interpreters.\n\ 2481\n\ 2482If the channel is empty then the keyword args are ignored and both\n\ 2483ends are immediately closed. Otherwise, if 'force' is True then\n\ 2484all queued items are released and both ends are immediately\n\ 2485closed.\n\ 2486\n\ 2487If the channel is not empty *and* 'force' is False then following\n\ 2488happens:\n\ 2489\n\ 2490 * recv is True (regardless of send):\n\ 2491 - raise ChannelNotEmptyError\n\ 2492 * recv is None and send is None:\n\ 2493 - raise ChannelNotEmptyError\n\ 2494 * send is True and recv is not True:\n\ 2495 - fully close the 'send' end\n\ 2496 - close the 'recv' end to interpreters not already receiving\n\ 2497 - fully close it once empty\n\ 2498\n\ 2499Closing an already closed channel results in a ChannelClosedError.\n\ 2500\n\ 2501Once the channel's ID has no more ref counts in any interpreter\n\ 2502the channel will be destroyed."); 2503 2504static PyObject * 2505channel_release(PyObject *self, PyObject *args, PyObject *kwds) 2506{ 2507 // Note that only the current interpreter is affected. 2508 static char *kwlist[] = {"cid", "send", "recv", "force", NULL}; 2509 int64_t cid; 2510 int send = 0; 2511 int recv = 0; 2512 int force = 0; 2513 if (!PyArg_ParseTupleAndKeywords(args, kwds, 2514 "O&|$ppp:channel_release", kwlist, 2515 channel_id_converter, &cid, &send, &recv, &force)) { 2516 return NULL; 2517 } 2518 if (send == 0 && recv == 0) { 2519 send = 1; 2520 recv = 1; 2521 } 2522 2523 // XXX Handle force is True. 2524 // XXX Fix implicit release. 2525 2526 if (_channel_drop(&_globals.channels, cid, send, recv) != 0) { 2527 return NULL; 2528 } 2529 Py_RETURN_NONE; 2530} 2531 2532PyDoc_STRVAR(channel_release_doc, 2533"channel_release(cid, *, send=None, recv=None, force=True)\n\ 2534\n\ 2535Close the channel for the current interpreter. 'send' and 'recv'\n\ 2536(bool) may be used to indicate the ends to close. By default both\n\ 2537ends are closed. Closing an already closed end is a noop."); 2538 2539static PyObject * 2540channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds) 2541{ 2542 return channelid_new(&ChannelIDtype, args, kwds); 2543} 2544 2545static PyMethodDef module_functions[] = { 2546 {"create", _PyCFunction_CAST(interp_create), 2547 METH_VARARGS | METH_KEYWORDS, create_doc}, 2548 {"destroy", _PyCFunction_CAST(interp_destroy), 2549 METH_VARARGS | METH_KEYWORDS, destroy_doc}, 2550 {"list_all", interp_list_all, 2551 METH_NOARGS, list_all_doc}, 2552 {"get_current", interp_get_current, 2553 METH_NOARGS, get_current_doc}, 2554 {"get_main", interp_get_main, 2555 METH_NOARGS, get_main_doc}, 2556 {"is_running", _PyCFunction_CAST(interp_is_running), 2557 METH_VARARGS | METH_KEYWORDS, is_running_doc}, 2558 {"run_string", _PyCFunction_CAST(interp_run_string), 2559 METH_VARARGS | METH_KEYWORDS, run_string_doc}, 2560 2561 {"is_shareable", _PyCFunction_CAST(object_is_shareable), 2562 METH_VARARGS | METH_KEYWORDS, is_shareable_doc}, 2563 2564 {"channel_create", channel_create, 2565 METH_NOARGS, channel_create_doc}, 2566 {"channel_destroy", _PyCFunction_CAST(channel_destroy), 2567 METH_VARARGS | METH_KEYWORDS, channel_destroy_doc}, 2568 {"channel_list_all", channel_list_all, 2569 METH_NOARGS, channel_list_all_doc}, 2570 {"channel_list_interpreters", _PyCFunction_CAST(channel_list_interpreters), 2571 METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc}, 2572 {"channel_send", _PyCFunction_CAST(channel_send), 2573 METH_VARARGS | METH_KEYWORDS, channel_send_doc}, 2574 {"channel_recv", _PyCFunction_CAST(channel_recv), 2575 METH_VARARGS | METH_KEYWORDS, channel_recv_doc}, 2576 {"channel_close", _PyCFunction_CAST(channel_close), 2577 METH_VARARGS | METH_KEYWORDS, channel_close_doc}, 2578 {"channel_release", _PyCFunction_CAST(channel_release), 2579 METH_VARARGS | METH_KEYWORDS, channel_release_doc}, 2580 {"_channel_id", _PyCFunction_CAST(channel__channel_id), 2581 METH_VARARGS | METH_KEYWORDS, NULL}, 2582 2583 {NULL, NULL} /* sentinel */ 2584}; 2585 2586 2587/* initialization function */ 2588 2589PyDoc_STRVAR(module_doc, 2590"This module provides primitive operations to manage Python interpreters.\n\ 2591The 'interpreters' module provides a more convenient interface."); 2592 2593static struct PyModuleDef interpretersmodule = { 2594 PyModuleDef_HEAD_INIT, 2595 "_xxsubinterpreters", /* m_name */ 2596 module_doc, /* m_doc */ 2597 -1, /* m_size */ 2598 module_functions, /* m_methods */ 2599 NULL, /* m_slots */ 2600 NULL, /* m_traverse */ 2601 NULL, /* m_clear */ 2602 NULL /* m_free */ 2603}; 2604 2605 2606PyMODINIT_FUNC 2607PyInit__xxsubinterpreters(void) 2608{ 2609 if (_init_globals() != 0) { 2610 return NULL; 2611 } 2612 2613 /* Initialize types */ 2614 if (PyType_Ready(&ChannelIDtype) != 0) { 2615 return NULL; 2616 } 2617 2618 /* Create the module */ 2619 PyObject *module = PyModule_Create(&interpretersmodule); 2620 if (module == NULL) { 2621 return NULL; 2622 } 2623 2624 /* Add exception types */ 2625 PyObject *ns = PyModule_GetDict(module); // borrowed 2626 if (interp_exceptions_init(ns) != 0) { 2627 return NULL; 2628 } 2629 if (channel_exceptions_init(ns) != 0) { 2630 return NULL; 2631 } 2632 2633 /* Add other types */ 2634 Py_INCREF(&ChannelIDtype); 2635 if (PyDict_SetItemString(ns, "ChannelID", (PyObject *)&ChannelIDtype) != 0) { 2636 return NULL; 2637 } 2638 Py_INCREF(&_PyInterpreterID_Type); 2639 if (PyDict_SetItemString(ns, "InterpreterID", (PyObject *)&_PyInterpreterID_Type) != 0) { 2640 return NULL; 2641 } 2642 2643 if (_PyCrossInterpreterData_RegisterClass(&ChannelIDtype, _channelid_shared)) { 2644 return NULL; 2645 } 2646 2647 return module; 2648} 2649