Files
cpython/Lib/test/test_gc_stats.py
T
2026-05-04 22:14:45 +01:00

351 lines
12 KiB
Python

import gc
import os
import textwrap
import time
import unittest
from test.support import (
Py_GIL_DISABLED,
import_helper,
requires_gil_enabled,
requires_remote_subprocess_debugging,
)
from test.test_profiling.test_sampling_profiler.helpers import test_subprocess
try:
import _remote_debugging # noqa: F401
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
)
GC_STATS_FIELDS = (
"gen", "iid", "ts_start", "ts_stop", "collections", "collected",
"uncollectable", "candidates", "heap_size", "duration")
def get_interpreter_identifiers(gc_stats) -> tuple[int,...]:
return tuple(sorted({s.iid for s in gc_stats}))
def get_generations(gc_stats) -> tuple[int,int,int]:
generations = set()
for s in gc_stats:
generations.add(s.gen)
return tuple(sorted(generations))
def get_last_item(gc_stats, generation: int, iid: int):
item = None
for s in gc_stats:
if s.gen == generation and s.iid == iid:
if item is None or item.ts_start < s.ts_start:
item = s
return item
def has_local_process_debugging():
try:
return _remote_debugging.is_python_process(os.getpid())
except Exception:
return False
def check_gc_stats_fields(testcase, stats):
testcase.assertIsInstance(stats, list)
testcase.assertGreater(len(stats), 0)
for item in stats:
testcase.assertIsInstance(item, _remote_debugging.GCStatsInfo)
testcase.assertEqual(type(item).__match_args__, GC_STATS_FIELDS)
testcase.assertEqual(len(item), len(GC_STATS_FIELDS))
def gc_stats_counters_advanced(before_stats, after_stats, generations, iid):
for generation in generations:
before = get_last_item(before_stats, generation, iid)
after = get_last_item(after_stats, generation, iid)
if after is None or before is None:
return False
if after.duration <= before.duration:
return False
if after.candidates <= before.candidates:
return False
return True
@unittest.skipUnless(
has_local_process_debugging(), "requires local process debugging")
class TestLocalGCStats(unittest.TestCase):
_main_iid = 0 # main interpreter ID
def test_gc_stats_fields(self):
monitor = _remote_debugging.GCMonitor(os.getpid(), debug=True)
stats = monitor.get_gc_stats(all_interpreters=False)
check_gc_stats_fields(self, stats)
def test_module_get_gc_stats_fields(self):
stats = _remote_debugging.get_gc_stats(
os.getpid(), all_interpreters=False)
check_gc_stats_fields(self, stats)
def test_all_interpreters_filter_for_local_process(self):
interpreters = import_helper.import_module("concurrent.interpreters")
source = """
import gc
objects = []
obj = []
obj.append(obj)
objects.append(obj)
gc.collect(0)
gc.collect(1)
gc.collect(2)
"""
interp = interpreters.create()
try:
interp.exec(textwrap.dedent(source))
for generation in range(3):
gc.collect(generation)
main_stats = _remote_debugging.get_gc_stats(
os.getpid(), all_interpreters=False)
all_stats = _remote_debugging.get_gc_stats(
os.getpid(), all_interpreters=True)
finally:
interp.close()
self.assertEqual(get_interpreter_identifiers(main_stats), (0,))
self.assertIn(0, get_interpreter_identifiers(all_stats))
self.assertGreater(len(get_interpreter_identifiers(all_stats)), 1)
self.assertEqual(get_generations(main_stats), (0, 1, 2))
self.assertEqual(get_generations(all_stats), (0, 1, 2))
for iid in get_interpreter_identifiers(all_stats):
for generation in range(3):
self.assertIsNotNone(get_last_item(all_stats, generation, iid))
@unittest.skipUnless(Py_GIL_DISABLED, "requires free-threaded GC")
def test_gc_stats_counters_for_main_interpreter_free_threaded(self):
generations = (0, 1, 2)
before_stats = _remote_debugging.get_gc_stats(
os.getpid(), all_interpreters=False)
for generation in generations:
self.assertIsNotNone(
get_last_item(before_stats, generation, self._main_iid))
objects = []
for _ in range(1000):
obj = []
obj.append(obj)
objects.append(obj)
for generation in generations:
gc.collect(generation)
after_stats = _remote_debugging.get_gc_stats(
os.getpid(), all_interpreters=False)
self.assertTrue(
gc_stats_counters_advanced(
before_stats, after_stats, generations, self._main_iid),
(before_stats, after_stats)
)
@requires_remote_subprocess_debugging()
class TestGCStats(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls._main_iid = 0 # main interpreter ID
cls._main_interpreter_script = '''
import gc
import time
gc.collect(0)
gc.collect(1)
gc.collect(2)
_test_sock.sendall(b"working")
objects = []
while True:
if len(objects) > 100:
objects = []
obj = []
obj.append(obj)
objects.append(obj)
time.sleep(0.1)
gc.collect(0)
gc.collect(1)
gc.collect(2)
'''
cls._script = '''
import concurrent.interpreters as interpreters
import gc
import time
source = """if True:
import gc
if "objects" not in globals():
objects = []
if len(objects) > 100:
objects = []
obj = []
obj.append(obj)
objects.append(obj)
gc.collect(0)
gc.collect(1)
gc.collect(2)
"""
if {0}:
interp = interpreters.create()
interp.exec(source)
gc.collect(0)
gc.collect(1)
gc.collect(2)
_test_sock.sendall(b"working")
objects = []
while True:
if len(objects) > 100:
objects = []
obj = []
obj.append(obj)
objects.append(obj)
time.sleep(0.1)
if {0}:
interp.exec(source)
gc.collect(0)
gc.collect(1)
gc.collect(2)
'''
def _gc_stats_advanced(self, before_stats, after_stats, generations):
for generation in generations:
before = get_last_item(before_stats, generation, self._main_iid)
after = get_last_item(after_stats, generation, self._main_iid)
if after is None or before is None:
return False
if after.ts_stop <= before.ts_stop:
return False
return True
def _collect_gc_stats(self, script: str, all_interpreters: bool,
generations=(2,)):
with (test_subprocess(script, wait_for_working=True) as subproc):
monitor = _remote_debugging.GCMonitor(subproc.process.pid, debug=True)
before_stats = monitor.get_gc_stats(all_interpreters=all_interpreters)
for generation in generations:
before = get_last_item(before_stats, generation, self._main_iid)
self.assertIsNotNone(before)
after_stats = before_stats
for _ in range(10):
time.sleep(0.5)
after_stats = monitor.get_gc_stats(all_interpreters=all_interpreters)
if self._gc_stats_advanced(before_stats, after_stats, generations):
break
else:
self.fail(
f"GC stats for generations {generations!r} did not "
f"advance: {before_stats!r} -> {after_stats!r}"
)
return before_stats, after_stats
def _check_gc_stats(self, before, after):
self.assertIsNotNone(before)
self.assertIsNotNone(after)
self.assertGreater(after.collections, before.collections, (before, after))
self.assertGreater(after.ts_start, before.ts_start, (before, after))
self.assertGreater(after.ts_stop, before.ts_stop, (before, after))
self.assertGreater(after.duration, before.duration, (before, after))
self.assertGreater(after.candidates, before.candidates, (before, after))
# may not grow
self.assertGreaterEqual(after.collected, before.collected, (before, after))
self.assertGreaterEqual(after.uncollectable, before.uncollectable, (before, after))
def _check_interpreter_gc_stats(self, before_stats, after_stats):
before_iids = get_interpreter_identifiers(before_stats)
after_iids = get_interpreter_identifiers(after_stats)
self.assertEqual(before_iids, after_iids)
self.assertEqual(get_generations(before_stats), (0, 1, 2))
self.assertEqual(get_generations(after_stats), (0, 1, 2))
for iid in after_iids:
with self.subTest(f"interpreter id={iid}"):
before_last_items = (get_last_item(before_stats, 0, iid),
get_last_item(before_stats, 1, iid),
get_last_item(before_stats, 2, iid))
after_last_items = (get_last_item(after_stats, 0, iid),
get_last_item(after_stats, 1, iid),
get_last_item(after_stats, 2, iid))
for before, after in zip(before_last_items, after_last_items):
self._check_gc_stats(before, after)
def test_gc_stats_timestamps_for_main_interpreter(self):
script = textwrap.dedent(self._main_interpreter_script)
before_stats, after_stats = self._collect_gc_stats(
script, False, generations=(0, 1, 2))
for generation in range(3):
with self.subTest(generation=generation):
before = get_last_item(before_stats, generation, self._main_iid)
after = get_last_item(after_stats, generation, self._main_iid)
self.assertIsNotNone(before)
self.assertIsNotNone(after)
self.assertGreater(
after.collections, before.collections,
(before, after))
self.assertGreater(
after.ts_start, before.ts_start,
(before, after))
self.assertGreater(
after.ts_stop, before.ts_stop,
(before, after))
@requires_gil_enabled()
def test_gc_stats_for_main_interpreter(self):
script = textwrap.dedent(self._script.format(False))
before_stats, after_stats = self._collect_gc_stats(script, False)
self._check_interpreter_gc_stats(before_stats, after_stats)
@requires_gil_enabled()
def test_gc_stats_for_main_interpreter_if_subinterpreter_exists(self):
script = textwrap.dedent(self._script.format(True))
before_stats, after_stats = self._collect_gc_stats(script, False)
self._check_interpreter_gc_stats(before_stats, after_stats)
@requires_gil_enabled()
def test_gc_stats_for_all_interpreters(self):
script = textwrap.dedent(self._script.format(True))
before_stats, after_stats = self._collect_gc_stats(script, True)
before_iids = get_interpreter_identifiers(before_stats)
after_iids = get_interpreter_identifiers(after_stats)
self.assertGreater(len(before_iids), 1)
self.assertGreater(len(after_iids), 1)
self.assertEqual(before_iids, after_iids)
self._check_interpreter_gc_stats(before_stats, after_stats)