|
21 | 21 | from unittest import TestCase, main |
22 | 22 | import asyncio |
23 | 23 | import logging |
| 24 | +import threading |
24 | 25 | from pulsar import Client |
25 | 26 |
|
26 | 27 | class CustomLoggingTest(TestCase): |
@@ -49,6 +50,35 @@ async def async_get(value): |
49 | 50 |
|
50 | 51 | client.close() |
51 | 52 |
|
| 53 | + def test_logger_thread_leaks(self): |
| 54 | + def _do_connect(close): |
| 55 | + logger = logging.getLogger(str(threading.current_thread().ident)) |
| 56 | + logger.setLevel(logging.INFO) |
| 57 | + client = Client( |
| 58 | + service_url="pulsar://localhost:6650", |
| 59 | + io_threads=4, |
| 60 | + message_listener_threads=4, |
| 61 | + operation_timeout_seconds=1, |
| 62 | + log_conf_file_path=None, |
| 63 | + authentication=None, |
| 64 | + logger=logger, |
| 65 | + ) |
| 66 | + client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test") |
| 67 | + if close: |
| 68 | + client.close() |
| 69 | + |
| 70 | + for should_close in (True, False): |
| 71 | + self.assertEqual(threading.active_count(), 1, "Explicit close: {}; baseline is 1 thread".format(should_close)) |
| 72 | + _do_connect(should_close) |
| 73 | + self.assertEqual(threading.active_count(), 1, "Explicit close: {}; synchronous connect doesn't leak threads".format(should_close)) |
| 74 | + threads = [] |
| 75 | + for _ in range(10): |
| 76 | + threads.append(threading.Thread(target=_do_connect, args=(should_close))) |
| 77 | + threads[-1].start() |
| 78 | + for thread in threads: |
| 79 | + thread.join() |
| 80 | + assert threading.active_count() == 1, "Explicit close: {}; threaded connect in parallel doesn't leak threads".format(should_close) |
| 81 | + |
52 | 82 | if __name__ == '__main__': |
53 | 83 | logging.basicConfig(level=logging.DEBUG) |
54 | 84 | main() |
0 commit comments