gh-142927: Fix heatmap caller navigation for interior lines (#143180)

This commit is contained in:
Pablo Galindo Salgado
2026-01-01 19:05:59 +00:00
committed by GitHub
parent 5d133351c6
commit 513ae175bb
2 changed files with 124 additions and 3 deletions
+34 -3
View File
@@ -472,6 +472,10 @@ class HeatmapCollector(StackTraceCollector):
self.callers_graph = collections.defaultdict(set)
self.function_definitions = {}
# Map each sampled line to its function for proper caller lookup
# (filename, lineno) -> funcname
self.line_to_function = {}
# Edge counting for call path analysis
self.edge_samples = collections.Counter()
@@ -596,6 +600,10 @@ class HeatmapCollector(StackTraceCollector):
if funcname and (filename, funcname) not in self.function_definitions:
self.function_definitions[(filename, funcname)] = lineno
# Map this line to its function for caller/callee navigation
if funcname:
self.line_to_function[(filename, lineno)] = funcname
def _record_bytecode_sample(self, filename, lineno, opcode,
end_lineno=None, col_offset=None, end_col_offset=None,
weight=1):
@@ -1150,13 +1158,36 @@ class HeatmapCollector(StackTraceCollector):
return f"rgba({r}, {g}, {b}, {alpha})"
def _build_navigation_buttons(self, filename: str, line_num: int) -> str:
"""Build navigation buttons for callers/callees."""
"""Build navigation buttons for callers/callees.
- Callers: All lines in a function show who calls this function
- Callees: Only actual call site lines show what they call
"""
line_key = (filename, line_num)
caller_list = self._deduplicate_by_function(self.callers_graph.get(line_key, set()))
funcname = self.line_to_function.get(line_key)
# Get callers: look up by function definition line, not current line
# This ensures all lines in a function show who calls this function
if funcname:
func_def_line = self.function_definitions.get((filename, funcname), line_num)
func_def_key = (filename, func_def_line)
caller_list = self._deduplicate_by_function(self.callers_graph.get(func_def_key, set()))
else:
caller_list = self._deduplicate_by_function(self.callers_graph.get(line_key, set()))
# Get callees: only show for actual call site lines (not every line in function)
callee_list = self._deduplicate_by_function(self.call_graph.get(line_key, set()))
# Get edge counts for each caller/callee
callers_with_counts = self._get_edge_counts(line_key, caller_list, is_caller=True)
# For callers, use the function definition key for edge lookup
if funcname:
func_def_line = self.function_definitions.get((filename, funcname), line_num)
caller_edge_key = (filename, func_def_line)
else:
caller_edge_key = line_key
callers_with_counts = self._get_edge_counts(caller_edge_key, caller_list, is_caller=True)
# For callees, use the actual line key since that's where the call happens
callees_with_counts = self._get_edge_counts(line_key, callee_list, is_caller=False)
# Build navigation buttons with counts
+90
View File
@@ -367,6 +367,96 @@ class TestHeatmapCollectorProcessFrames(unittest.TestCase):
self.assertEqual(collector.file_samples['test.py'][10], 1)
def frame(filename, line, func):
"""Create a frame tuple: (filename, location, funcname, opcode)."""
return (filename, (line, line, -1, -1), func, None)
class TestHeatmapCollectorNavigationButtons(unittest.TestCase):
"""Test navigation button behavior for caller/callee relationships.
For every call stack:
- Root frames (entry points): only DOWN arrow (callees)
- Middle frames: both UP and DOWN arrows
- Leaf frames: only UP arrow (callers)
"""
def collect(self, *stacks):
"""Create collector and process frame stacks."""
collector = HeatmapCollector(sample_interval_usec=100)
for stack in stacks:
collector.process_frames(stack, thread_id=1)
return collector
def test_deep_call_stack_relationships(self):
"""Test root/middle/leaf navigation in a 5-level call stack."""
# Stack: root -> A -> B -> C -> leaf
stack = [
frame('leaf.py', 5, 'leaf'),
frame('c.py', 10, 'func_c'),
frame('b.py', 15, 'func_b'),
frame('a.py', 20, 'func_a'),
frame('root.py', 25, 'root'),
]
c = self.collect(stack)
# Root: only callees (no one calls it)
self.assertIn(('root.py', 25), c.call_graph)
self.assertNotIn(('root.py', 25), c.callers_graph)
# Middle frames: both callers and callees
for key in [('a.py', 20), ('b.py', 15), ('c.py', 10)]:
self.assertIn(key, c.call_graph)
self.assertIn(key, c.callers_graph)
# Leaf: only callers (doesn't call anyone)
self.assertNotIn(('leaf.py', 5), c.call_graph)
self.assertIn(('leaf.py', 5), c.callers_graph)
def test_all_lines_in_function_see_callers(self):
"""Test that interior lines map to their function for caller lookup."""
# Same function sampled at different lines (12, 15, 10)
c = self.collect(
[frame('mod.py', 12, 'my_func'), frame('caller.py', 100, 'caller')],
[frame('mod.py', 15, 'my_func'), frame('caller.py', 100, 'caller')],
[frame('mod.py', 10, 'my_func'), frame('caller.py', 100, 'caller')],
)
# All lines should map to same function
for line in [10, 12, 15]:
self.assertEqual(c.line_to_function[('mod.py', line)], 'my_func')
# Function definition line should have callers
func_def = c.function_definitions[('mod.py', 'my_func')]
self.assertIn(('mod.py', func_def), c.callers_graph)
def test_multiple_callers_and_callees(self):
"""Test multiple callers/callees are recorded correctly."""
# Two callers -> target, and caller -> two callees
c = self.collect(
[frame('target.py', 10, 'target'), frame('caller1.py', 20, 'c1')],
[frame('target.py', 10, 'target'), frame('caller2.py', 30, 'c2')],
[frame('callee1.py', 5, 'f1'), frame('dispatcher.py', 40, 'dispatch')],
[frame('callee2.py', 6, 'f2'), frame('dispatcher.py', 40, 'dispatch')],
)
# Target has 2 callers
callers = c.callers_graph[('target.py', 10)]
self.assertEqual({x[0] for x in callers}, {'caller1.py', 'caller2.py'})
# Dispatcher has 2 callees
callees = c.call_graph[('dispatcher.py', 40)]
self.assertEqual({x[0] for x in callees}, {'callee1.py', 'callee2.py'})
def test_edge_samples_counted(self):
"""Test that repeated calls accumulate edge counts."""
stack = [frame('callee.py', 10, 'callee'), frame('caller.py', 20, 'caller')]
c = self.collect(stack, stack, stack)
edge_key = (('caller.py', 20), ('callee.py', 10))
self.assertEqual(c.edge_samples[edge_key], 3)
class TestHeatmapCollectorExport(unittest.TestCase):
"""Test HeatmapCollector.export() method."""