Files
2026-05-05 00:44:37 +00:00

287 lines
10 KiB
Python

from abc import ABC, abstractmethod
from .constants import (
DEFAULT_LOCATION,
THREAD_STATUS_HAS_GIL,
THREAD_STATUS_ON_CPU,
THREAD_STATUS_GIL_REQUESTED,
THREAD_STATUS_UNKNOWN,
THREAD_STATUS_HAS_EXCEPTION,
_INTERNAL_FRAME_SUFFIXES,
)
try:
from _remote_debugging import FrameInfo
except ImportError:
# Fallback definition if _remote_debugging is not available
FrameInfo = None
def normalize_location(location):
"""Normalize location to a 4-tuple format.
Args:
location: tuple (lineno, end_lineno, col_offset, end_col_offset),
an integer line number, or None
Returns:
tuple: (lineno, end_lineno, col_offset, end_col_offset)
"""
if location is None:
return DEFAULT_LOCATION
if isinstance(location, int):
return (location, location, -1, -1)
return location
def extract_lineno(location):
"""Extract lineno from location.
Args:
location: tuple (lineno, end_lineno, col_offset, end_col_offset),
an integer line number, or None
Returns:
int: The line number (0 for synthetic frames)
"""
if location is None:
return 0
if isinstance(location, int):
return location
return location[0]
def _is_internal_frame(frame):
if isinstance(frame, tuple):
filename = frame[0] if frame else ""
else:
filename = getattr(frame, "filename", "")
if not filename:
return False
return filename.endswith(_INTERNAL_FRAME_SUFFIXES)
def filter_internal_frames(frames):
if not frames:
return frames
return [f for f in frames if not _is_internal_frame(f)]
def _build_task_graph(awaited_info_list):
task_map = {}
child_to_parent = {} # child_id -> (selected_parent_id, parent_count)
all_task_ids = set()
all_parent_ids = set()
for awaited_info in awaited_info_list:
thread_id = awaited_info.thread_id
for task_info in awaited_info.awaited_by:
task_id = task_info.task_id
task_map[task_id] = (task_info, thread_id)
all_task_ids.add(task_id)
if task_info.awaited_by:
parent_ids = [p.task_name for p in task_info.awaited_by]
parent_count = len(parent_ids)
all_parent_ids.update(parent_ids)
selected_parent = min(parent_ids) if parent_count > 1 else parent_ids[0]
child_to_parent[task_id] = (selected_parent, parent_count)
return task_map, child_to_parent, all_task_ids, all_parent_ids
def _find_leaf_tasks(all_task_ids, all_parent_ids):
return all_task_ids - all_parent_ids
def _build_linear_stacks(leaf_task_ids, task_map, child_to_parent):
for leaf_id in leaf_task_ids:
frames = []
visited = set()
current_id = leaf_id
thread_id = None
while current_id is not None:
if current_id in visited:
break
visited.add(current_id)
if current_id not in task_map:
break
task_info, tid = task_map[current_id]
if thread_id is None:
thread_id = tid
if task_info.coroutine_stack:
for coro_info in task_info.coroutine_stack:
for frame in coro_info.call_stack:
frames.append(frame)
parent_info = child_to_parent.get(current_id)
task_name = task_info.task_name or "Task-" + str(task_info.task_id)
if parent_info:
selected_parent, parent_count = parent_info
if parent_count > 1:
task_name = f"{task_name} ({parent_count} parents)"
frames.append(FrameInfo(("<task>", None, task_name, None)))
current_id = selected_parent
else:
frames.append(FrameInfo(("<task>", None, task_name, None)))
current_id = None
if frames and thread_id is not None:
yield frames, thread_id, leaf_id
def iter_async_frames(awaited_info_list):
task_map, child_to_parent, all_task_ids, all_parent_ids = _build_task_graph(awaited_info_list)
leaf_task_ids = _find_leaf_tasks(all_task_ids, all_parent_ids)
yield from _build_linear_stacks(leaf_task_ids, task_map, child_to_parent)
class Collector(ABC):
@abstractmethod
def collect(self, stack_frames, timestamps_us=None):
"""Collect profiling data from stack frames.
Args:
stack_frames: List of InterpreterInfo objects
timestamps_us: Optional list of timestamps in microseconds. If provided
(from binary replay with RLE batching), use these instead of current
time. If None, collectors should use time.monotonic() or similar.
The list may contain multiple timestamps when samples are batched
together (same stack, different times).
"""
def collect_failed_sample(self):
"""Collect data about a failed sample attempt."""
@abstractmethod
def export(self, filename):
"""Export collected data to a file."""
@staticmethod
def _filter_internal_frames(frames):
return filter_internal_frames(frames)
def _iter_all_frames(self, stack_frames, skip_idle=False):
for interpreter_info in stack_frames:
for thread_info in interpreter_info.threads:
# skip_idle now means: skip if thread is not actively running
# A thread is "active" if it has the GIL OR is on CPU
if skip_idle:
status_flags = thread_info.status
has_gil = bool(status_flags & THREAD_STATUS_HAS_GIL)
on_cpu = bool(status_flags & THREAD_STATUS_ON_CPU)
if not (has_gil or on_cpu):
continue
frames = thread_info.frame_info
if frames:
# Filter out internal profiler frames from the bottom of the stack
frames = self._filter_internal_frames(frames)
if frames:
yield frames, thread_info.thread_id
def _iter_async_frames(self, awaited_info_list):
yield from iter_async_frames(awaited_info_list)
def _iter_stacks(self, stack_frames, skip_idle=False):
"""Yield (frames, thread_id) for all stacks, handling both sync and async modes."""
if stack_frames and hasattr(stack_frames[0], "awaited_by"):
for frames, thread_id, _ in iter_async_frames(stack_frames):
if frames:
yield frames, thread_id
else:
for frames, thread_id in self._iter_all_frames(stack_frames, skip_idle=skip_idle):
if frames:
yield frames, thread_id
def _is_gc_frame(self, frame):
if isinstance(frame, tuple):
funcname = frame[2] if len(frame) >= 3 else ""
else:
funcname = getattr(frame, "funcname", "")
return "<GC>" in funcname or "gc_collect" in funcname
def _collect_thread_status_stats(self, stack_frames):
"""Collect aggregate and per-thread status statistics from a sample.
Returns:
tuple: (aggregate_status_counts, has_gc_frame, per_thread_stats)
- aggregate_status_counts: dict with has_gil, on_cpu, has_exception, etc.
- has_gc_frame: bool indicating if any thread has GC frames
- per_thread_stats: dict mapping thread_id to per-thread counts
"""
status_counts = {
"has_gil": 0,
"on_cpu": 0,
"gil_requested": 0,
"unknown": 0,
"has_exception": 0,
"total": 0,
}
has_gc_frame = False
per_thread_stats = {}
for interpreter_info in stack_frames:
threads = getattr(interpreter_info, "threads", [])
for thread_info in threads:
status_counts["total"] += 1
# Track thread status using bit flags
status_flags = getattr(thread_info, "status", 0)
if status_flags & THREAD_STATUS_HAS_GIL:
status_counts["has_gil"] += 1
if status_flags & THREAD_STATUS_ON_CPU:
status_counts["on_cpu"] += 1
if status_flags & THREAD_STATUS_GIL_REQUESTED:
status_counts["gil_requested"] += 1
if status_flags & THREAD_STATUS_UNKNOWN:
status_counts["unknown"] += 1
if status_flags & THREAD_STATUS_HAS_EXCEPTION:
status_counts["has_exception"] += 1
# Track per-thread statistics
thread_id = getattr(thread_info, "thread_id", None)
if thread_id is not None:
if thread_id not in per_thread_stats:
per_thread_stats[thread_id] = {
"has_gil": 0,
"on_cpu": 0,
"gil_requested": 0,
"unknown": 0,
"has_exception": 0,
"total": 0,
"gc_samples": 0,
}
thread_stats = per_thread_stats[thread_id]
thread_stats["total"] += 1
if status_flags & THREAD_STATUS_HAS_GIL:
thread_stats["has_gil"] += 1
if status_flags & THREAD_STATUS_ON_CPU:
thread_stats["on_cpu"] += 1
if status_flags & THREAD_STATUS_GIL_REQUESTED:
thread_stats["gil_requested"] += 1
if status_flags & THREAD_STATUS_UNKNOWN:
thread_stats["unknown"] += 1
if status_flags & THREAD_STATUS_HAS_EXCEPTION:
thread_stats["has_exception"] += 1
# Check for GC frames in this thread
frames = getattr(thread_info, "frame_info", None)
if frames:
for frame in frames:
if self._is_gc_frame(frame):
thread_stats["gc_samples"] += 1
has_gc_frame = True
break
return status_counts, has_gc_frame, per_thread_stats