mirror of
https://github.com/python/cpython.git
synced 2026-05-06 12:49:07 -04:00
gh-149230: _remote_debugging: Fix async-aware for tasks in non-main threads (#149235)
This commit is contained in:
committed by
GitHub
parent
6f8c964dc0
commit
f025dba62e
@@ -1437,6 +1437,160 @@ class TestGetStackTrace(RemoteInspectionTestBase):
|
||||
finally:
|
||||
_cleanup_sockets(client_socket, server_socket)
|
||||
|
||||
@skip_if_not_supported
|
||||
@unittest.skipIf(
|
||||
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
|
||||
"Test only runs on Linux with process_vm_readv support",
|
||||
)
|
||||
def test_async_global_awaited_by_from_non_main_thread(self):
|
||||
port = find_unused_port()
|
||||
script = textwrap.dedent(
|
||||
f"""\
|
||||
import asyncio
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect(('localhost', {port}))
|
||||
|
||||
async def worker_main():
|
||||
task = asyncio.create_task(
|
||||
asyncio.sleep(10_000),
|
||||
name="worker task",
|
||||
)
|
||||
await asyncio.sleep(0)
|
||||
sock.sendall(f"ready:{{threading.get_native_id()}}\\n".encode())
|
||||
await task
|
||||
|
||||
def run_worker_loop():
|
||||
asyncio.run(worker_main())
|
||||
|
||||
threading.Thread(
|
||||
target=run_worker_loop,
|
||||
name="async-worker",
|
||||
daemon=True,
|
||||
).start()
|
||||
time.sleep(10_000)
|
||||
"""
|
||||
)
|
||||
|
||||
with os_helper.temp_dir() as work_dir:
|
||||
script_dir = os.path.join(work_dir, "script_pkg")
|
||||
os.mkdir(script_dir)
|
||||
|
||||
server_socket = _create_server_socket(port)
|
||||
script_name = _make_test_script(script_dir, "script", script)
|
||||
client_socket = None
|
||||
|
||||
try:
|
||||
with _managed_subprocess([sys.executable, script_name]) as p:
|
||||
client_socket, _ = server_socket.accept()
|
||||
server_socket.close()
|
||||
server_socket = None
|
||||
|
||||
response = _wait_for_signal(client_socket, b"ready:")
|
||||
worker_thread_id = int(
|
||||
response.split(b"ready:", 1)[1].splitlines()[0]
|
||||
)
|
||||
|
||||
for _ in busy_retry(SHORT_TIMEOUT):
|
||||
all_awaited_by = get_all_awaited_by(p.pid)
|
||||
if any(
|
||||
task.task_name == "worker task"
|
||||
for info in all_awaited_by
|
||||
if info.thread_id == worker_thread_id
|
||||
for task in info.awaited_by
|
||||
):
|
||||
break
|
||||
else:
|
||||
self.fail(
|
||||
"get_all_awaited_by() did not report "
|
||||
"the asyncio task from the non-main thread"
|
||||
)
|
||||
finally:
|
||||
_cleanup_sockets(client_socket, server_socket)
|
||||
|
||||
@skip_if_not_supported
|
||||
@unittest.skipIf(
|
||||
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
|
||||
"Test only runs on Linux with process_vm_readv support",
|
||||
)
|
||||
def test_async_remote_stack_trace_from_non_main_thread(self):
|
||||
port = find_unused_port()
|
||||
script = textwrap.dedent(
|
||||
f"""\
|
||||
import asyncio
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect(('localhost', {port}))
|
||||
|
||||
def blocking_call():
|
||||
sock.sendall(f"ready:{{threading.get_native_id()}}\\n".encode())
|
||||
time.sleep(10_000)
|
||||
|
||||
async def worker_task():
|
||||
await asyncio.sleep(0)
|
||||
blocking_call()
|
||||
|
||||
async def worker_main():
|
||||
task = asyncio.create_task(
|
||||
worker_task(),
|
||||
name="worker task",
|
||||
)
|
||||
await task
|
||||
|
||||
def run_worker_loop():
|
||||
asyncio.run(worker_main())
|
||||
|
||||
threading.Thread(
|
||||
target=run_worker_loop,
|
||||
name="async-worker",
|
||||
daemon=True,
|
||||
).start()
|
||||
time.sleep(10_000)
|
||||
"""
|
||||
)
|
||||
|
||||
with os_helper.temp_dir() as work_dir:
|
||||
script_dir = os.path.join(work_dir, "script_pkg")
|
||||
os.mkdir(script_dir)
|
||||
|
||||
server_socket = _create_server_socket(port)
|
||||
script_name = _make_test_script(script_dir, "script", script)
|
||||
client_socket = None
|
||||
|
||||
try:
|
||||
with _managed_subprocess([sys.executable, script_name]) as p:
|
||||
client_socket, _ = server_socket.accept()
|
||||
server_socket.close()
|
||||
server_socket = None
|
||||
|
||||
response = _wait_for_signal(client_socket, b"ready:")
|
||||
worker_thread_id = int(
|
||||
response.split(b"ready:", 1)[1].splitlines()[0]
|
||||
)
|
||||
|
||||
for _ in busy_retry(SHORT_TIMEOUT):
|
||||
stack_trace = get_async_stack_trace(p.pid)
|
||||
if any(
|
||||
task.task_name == "worker task"
|
||||
for info in stack_trace
|
||||
if info.thread_id == worker_thread_id
|
||||
for task in info.awaited_by
|
||||
):
|
||||
break
|
||||
else:
|
||||
self.fail(
|
||||
"get_async_stack_trace() did not report "
|
||||
"the running asyncio task from the non-main thread"
|
||||
)
|
||||
finally:
|
||||
_cleanup_sockets(client_socket, server_socket)
|
||||
|
||||
@skip_if_not_supported
|
||||
@unittest.skipIf(
|
||||
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
|
||||
|
||||
@@ -34,11 +34,11 @@ iterate_threads(
|
||||
|
||||
if (0 > _Py_RemoteDebug_PagedReadRemoteMemory(
|
||||
&unwinder->handle,
|
||||
unwinder->interpreter_addr + (uintptr_t)unwinder->debug_offsets.interpreter_state.threads_main,
|
||||
unwinder->interpreter_addr + (uintptr_t)unwinder->debug_offsets.interpreter_state.threads_head,
|
||||
sizeof(void*),
|
||||
&thread_state_addr))
|
||||
{
|
||||
set_exception_cause(unwinder, PyExc_RuntimeError, "Failed to read main thread state");
|
||||
set_exception_cause(unwinder, PyExc_RuntimeError, "Failed to read threads head");
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user