Lines Matching refs:threading
2 Tests for the threading module.
14 import threading
41 testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook)
42 threading.excepthook = threading.__excepthook__
56 class TestThread(threading.Thread):
58 threading.Thread.__init__(self, name=name)
104 thread = threading.Thread(name="myname1")
108 thread = threading.Thread(name=123)
112 thread = threading.Thread(target=func, name="myname2")
115 with mock.patch.object(threading, '_counter', return_value=2):
116 thread = threading.Thread(name="")
119 with mock.patch.object(threading, '_counter', return_value=3):
120 thread = threading.Thread()
123 with mock.patch.object(threading, '_counter', return_value=5):
124 thread = threading.Thread(target=func)
150 t = threading.Thread(target=target, args=args)
157 lock = threading.Lock()
168 sema = threading.BoundedSemaphore(value=3)
169 mutex = threading.RLock()
181 if hasattr(threading, 'get_native_id'):
182 native_ids = set(t.native_id for t in threads) | {threading.get_native_id()}
200 self.assertIsNotNone(threading.current_thread().ident)
202 ident.append(threading.current_thread().ident)
204 done = threading.Event()
211 del threading._active[ident[0]]
218 threading.stack_size(262144)
223 threading.stack_size(0)
230 threading.stack_size(0x100000)
235 threading.stack_size(0)
238 # Check that a "foreign" thread can use the threading module.
241 # thread to get made in the threading._active map.
242 threading.current_thread()
245 mutex = threading.Lock()
251 self.assertIn(tid, threading._active)
252 self.assertIsInstance(threading._active[tid], threading._DummyThread)
254 self.assertTrue(threading._active[tid].is_alive())
255 self.assertRegex(repr(threading._active[tid]), '_DummyThread')
256 del threading._active[tid]
272 tid = threading.get_ident()
298 worker_started = threading.Event()
299 worker_saw_exception = threading.Event()
301 class Worker(threading.Thread):
303 self.id = threading.get_ident()
351 raise threading.ThreadError()
352 _start_new_thread = threading._start_new_thread
353 threading._start_new_thread = fail_new_thread
355 t = threading.Thread(target=lambda: None)
356 self.assertRaises(threading.ThreadError, t.start)
358 t in threading._limbo,
361 threading._start_new_thread = _start_new_thread
398 # Avoid a deadlock when sys.settrace steps into threading._shutdown
400 import sys, threading
409 t = threading.Thread(target=killer)
415 threading.current_thread()
423 # Raising SystemExit skipped threading._shutdown
425 import threading
434 threading.Thread(target=child).start()
443 # threading.enumerate() after it has been join()ed.
444 enum = threading.enumerate
449 t = threading.Thread(target=lambda: None)
464 self.thread = threading.Thread(target=self._run,
494 t = threading.Thread()
508 e = threading.Event()
512 cond = threading.Condition()
518 threading.activeCount()
520 threading.currentThread()
523 t = threading.Thread()
529 t = threading.Thread()
531 t = threading.Thread(daemon=False)
533 t = threading.Thread(daemon=True)
538 # bpo-42350: Calling os.fork() after threading._shutdown() must
546 # Import the threading module to register its "at fork" callback
547 import threading
557 # exit_handler() will be called after threading._shutdown()
569 import _thread, threading, os, time
573 threading.current_thread()
577 evt = threading.Event()
580 assert threading.active_count() == 2, threading.active_count()
582 assert threading.active_count() == 1, threading.active_count()
602 t = threading.Thread(target=lambda: None)
613 main = threading.main_thread()
615 self.assertEqual(main.ident, threading.current_thread().ident)
616 self.assertEqual(main.ident, threading.get_ident())
619 self.assertNotEqual(threading.main_thread().ident,
620 threading.current_thread().ident)
621 th = threading.Thread(target=f)
629 import os, threading
634 main = threading.main_thread()
636 print(main.ident == threading.current_thread().ident)
637 print(main.ident == threading.get_ident())
651 import os, threading, sys
657 main = threading.main_thread()
659 print(main.ident == threading.current_thread().ident)
660 print(main.ident == threading.get_ident())
667 th = threading.Thread(target=func)
680 import gc, threading
682 main_thread = threading.current_thread()
683 assert main_thread is threading.main_thread() # sanity check
691 threading.current_thread() is main_thread,
692 threading.main_thread() is main_thread,
693 threading.enumerate() == [main_thread])
706 # bpo-36402: Py_Finalize() calls threading._shutdown() which must wait
713 import threading
725 tls = threading.local()
734 threading.Thread(target=f).start()
751 t = threading.Thread(target=f)
784 t = threading.Thread(target=f)
805 bs = threading.BoundedSemaphore(limit)
806 threads = [threading.Thread(target=bs.acquire)
812 threads = [threading.Thread(target=bs.release)
846 threading.settrace(noop_trace)
864 old_trace = threading.gettrace()
866 threading.settrace(noop_trace)
867 trace_func = threading.gettrace()
870 threading.settrace(old_trace)
874 old_profile = threading.getprofile()
876 threading.setprofile(fn)
877 self.assertEqual(fn, threading.getprofile())
879 threading.setprofile(old_profile)
885 event = threading.Event()
886 thread = threading.Thread(target=event.wait, daemon=daemon)
893 self.assertIn(tstate_lock, threading._shutdown_locks)
895 self.assertNotIn(tstate_lock, threading._shutdown_locks)
903 self.assertNotIn(tstate_lock, threading._shutdown_locks)
909 import threading
915 thread_dict = threading.local()
936 thread = threading.Thread(target=target)
946 threading.Thread(target=noop).start()
954 msg = (b'DeprecationWarning: The threading debug '
960 # bpo-1596321: If the threading module is first import from a thread
961 # different than the main thread, threading._shutdown() must handle
971 import threading
974 if 'threading' in sys.modules:
975 raise Exception('threading is already imported')
979 # wait until the threading module is imported
983 if 'threading' not in sys.modules:
984 raise Exception('threading is not imported')
997 import sys, os, time, threading
1016 t = threading.Thread(target=joiningfunc,
1017 args=(threading.current_thread(),))
1038 t = threading.Thread(target=joiningfunc,
1039 args=(threading.current_thread(),))
1054 main_thread = threading.current_thread()
1063 t = threading.Thread(target=joiningfunc,
1069 w = threading.Thread(target=worker)
1084 import threading
1097 thread_has_run.add(threading.current_thread())
1102 new_thread = threading.Thread(target=random_io)
1133 t = threading.Thread(target=do_fork_and_wait)
1147 t = threading.Thread(target=lambda : time.sleep(0.3))
1181 import threading
1194 threading.Thread(target=f).start()
1211 import threading
1222 tls = threading.local()
1231 threading.Thread(target=f).start()
1243 import threading
1250 threading.Thread(target=f, daemon=True).start()
1267 thread = threading.Thread()
1273 current_thread = threading.current_thread()
1277 thread = threading.Thread()
1281 thread = threading.Thread()
1287 lock = threading.Lock()
1298 import threading
1309 w = threading.Thread(target=outer)
1324 import threading
1334 t = threading.Thread(target=run)
1352 import threading
1362 t = threading.Thread(target=run)
1381 import threading
1392 t = threading.Thread(target=run)
1407 class Issue27558(threading.Thread):
1433 threading.Thread(target=modify_file)
1441 class ThreadRunFail(threading.Thread):
1465 # threading.excepthook called with thread=None: log the thread
1471 args = threading.ExceptHookArgs([*sys.exc_info(), None])
1473 threading.excepthook(args)
1479 self.assertIn(f'Exception in thread {threading.get_ident()}:\n', stderr)
1485 class ThreadExit(threading.Thread):
1489 # threading.excepthook() silently ignores SystemExit
1505 with support.swap_attr(threading, 'excepthook', hook):
1528 with support.swap_attr(threading, 'excepthook', threading_hook), \
1536 'Exception in threading.excepthook:\n')
1551 with support.swap_attr(threading, 'excepthook', threading_hook):
1553 threading.excepthook = threading.__excepthook__
1566 self.callback_event = threading.Event()
1571 timer1 = threading.Timer(0.01, self._callback_spy)
1577 timer2 = threading.Timer(0.01, self._callback_spy)
1590 locktype = staticmethod(threading.Lock)
1593 locktype = staticmethod(threading._PyRLock)
1595 @unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C')
1597 locktype = staticmethod(threading._CRLock)
1600 eventtype = staticmethod(threading.Event)
1604 locktype = staticmethod(threading.Condition)
1607 condtype = staticmethod(threading.Condition)
1610 semtype = staticmethod(threading.Semaphore)
1613 semtype = staticmethod(threading.BoundedSemaphore)
1616 barriertype = staticmethod(threading.Barrier)
1625 support.check__all__(self, threading, ('threading', '_thread'),
1658 t = threading.Thread(target=call_interrupt)
1700 t = threading.Thread(target=worker,args=(started, cont, interrupted))
1713 import threading
1718 threading._register_atexit(run_last)
1726 import threading
1730 threading._register_atexit(mock)
1733 threading._shutdown()
1743 import threading
1749 threading._register_atexit(func)
1751 threading._register_atexit(run_last)