Files
livekit/scripts/fec/plot_fec.py
2026-06-10 14:34:06 -07:00

374 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
"""Render time-series plots and a summary for FlexFEC test harness runs.
Inputs are run directories produced by run_fec_test.sh, each containing:
frames.csv - per received frame: recv_wall_us,frame_id,user_timestamp_us,width,height
prom.tsv - 1s prometheus samples: wall_us<TAB>metric{labels}<TAB>value
events.csv - shaper events: wall_us,event (burst_on / burst_off / ...)
meta.env - run parameters incl. T0_US/T1_US measurement window
Usage:
plot_fec.py --run <dir> [--run <dir2>] --out <dir>
With two runs the first is treated as the baseline and the second as the FEC
run, both are overlaid and a comparison table is printed.
"""
import argparse
import csv
import os
import re
import statistics
import sys
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt # noqa: E402
FLEXFEC_METRIC = "livekit_flexfec_packet_total"
METRIC_RE = re.compile(r"^(?P<name>[a-zA-Z0-9_]+)(?:\{(?P<labels>.*)\})?$")
class Run:
def __init__(self, path):
self.path = path
self.name = os.path.basename(os.path.normpath(path))
self.meta = self._read_meta()
self.t0 = int(self.meta.get("T0_US", 0))
self.t1 = int(self.meta.get("T1_US", 1 << 62))
self.frames = self._read_frames()
self.prom = self._read_prom()
self.bursts = self._read_bursts()
def _read_meta(self):
meta = {}
path = os.path.join(self.path, "meta.env")
if os.path.exists(path):
with open(path) as f:
for line in f:
if "=" in line:
k, v = line.strip().split("=", 1)
meta[k] = v
return meta
def _in_window(self, ts):
return self.t0 <= ts <= self.t1
def rel(self, ts):
return (ts - self.t0) / 1e6
def _read_frames(self):
frames = []
path = os.path.join(self.path, "frames.csv")
if not os.path.exists(path):
return frames
with open(path) as f:
for row in csv.DictReader(f):
try:
recv = int(row["recv_wall_us"])
except (KeyError, ValueError):
continue
if not self._in_window(recv):
continue
frames.append(
{
"recv": recv,
"frame_id": int(row["frame_id"]) if row.get("frame_id") else None,
"user_ts": int(row["user_timestamp_us"]) if row.get("user_timestamp_us") else None,
}
)
frames.sort(key=lambda fr: fr["recv"])
return frames
def _read_prom(self):
"""metric family -> label key -> [(wall_us, value)]"""
series = {}
path = os.path.join(self.path, "prom.tsv")
if not os.path.exists(path):
return series
with open(path) as f:
for line in f:
parts = line.rstrip("\n").split("\t")
if len(parts) != 3:
continue
ts, metric, value = parts
m = METRIC_RE.match(metric)
if not m:
continue
try:
ts, value = int(ts), float(value)
except ValueError:
continue
family = m.group("name")
labels = m.group("labels") or ""
series.setdefault(family, {}).setdefault(labels, []).append((ts, value))
return series
def _read_bursts(self):
"""[(start_rel_s, end_rel_s)] of shaper burst windows"""
bursts, start = [], None
path = os.path.join(self.path, "events.csv")
if not os.path.exists(path):
return bursts
with open(path) as f:
for line in f:
parts = line.strip().split(",", 1)
if len(parts) != 2:
continue
ts, event = int(parts[0]), parts[1]
if event == "burst_on":
start = self.rel(ts)
elif event == "burst_off" and start is not None:
bursts.append((start, self.rel(ts)))
start = None
if start is not None:
bursts.append((start, self.rel(self.t1 if self.t1 < (1 << 62) else start)))
return bursts
# ---------- derived series ----------
def latency_series(self):
xs, ys = [], []
for fr in self.frames:
if fr["user_ts"]:
xs.append(self.rel(fr["recv"]))
ys.append((fr["recv"] - fr["user_ts"]) / 1e3)
return xs, ys
def frame_gaps(self):
"""[(rel_s, missing_count)] where frame ids skipped between consecutive frames"""
gaps, prev = [], None
for fr in self.frames:
fid = fr["frame_id"]
if fid is None:
continue
if prev is not None and fid > prev + 1:
gaps.append((self.rel(fr["recv"]), fid - prev - 1))
prev = fid
return gaps
def fps_series(self):
buckets = {}
for fr in self.frames:
buckets[int(self.rel(fr["recv"]))] = buckets.get(int(self.rel(fr["recv"])), 0) + 1
xs = sorted(buckets)
return xs, [buckets[x] for x in xs]
def counter_rate(self, family, label_filter=None):
"""summed per-second rate across label sets of a counter family"""
fam = self.prom.get(family, {})
per_ts = {}
for labels, samples in fam.items():
if label_filter and label_filter not in labels:
continue
samples = [s for s in samples if self._in_window(s[0])]
for (t_a, v_a), (t_b, v_b) in zip(samples, samples[1:]):
dt = (t_b - t_a) / 1e6
if dt <= 0:
continue
key = int(self.rel(t_b))
per_ts[key] = per_ts.get(key, 0.0) + max(0.0, v_b - v_a) / dt
xs = sorted(per_ts)
return xs, [per_ts[x] for x in xs]
def counter_total(self, family, label_filter=None):
"""delta of a summed counter family over the measurement window"""
total = 0.0
for labels, samples in self.prom.get(family, {}).items():
if label_filter and label_filter not in labels:
continue
samples = [s for s in samples if self._in_window(s[0])]
if len(samples) >= 2:
total += samples[-1][1] - samples[0][1]
return total
def flexfec_state_rate(self, state):
return self.counter_rate(FLEXFEC_METRIC, f'state="{state}"')
def flexfec_state_total(self, state):
return self.counter_total(FLEXFEC_METRIC, f'state="{state}"')
# ---------- summary ----------
def summary(self):
_, lat = self.latency_series()
gaps = self.frame_gaps()
frame_ids = [fr["frame_id"] for fr in self.frames if fr["frame_id"] is not None]
expected = (max(frame_ids) - min(frame_ids) + 1) if frame_ids else 0
lost = sum(n for _, n in gaps)
def pct(p):
if not lat:
return float("nan")
data = sorted(lat)
return data[min(len(data) - 1, int(len(data) * p))]
return {
"frames_received": len(self.frames),
"frames_expected": expected,
"frames_lost": lost,
"gap_events": len(gaps),
"latency_mean_ms": statistics.fmean(lat) if lat else float("nan"),
"latency_p50_ms": pct(0.50),
"latency_p95_ms": pct(0.95),
"latency_p99_ms": pct(0.99),
"fec_received": self.flexfec_state_total("received"),
"fec_recovery_attempts": self.flexfec_state_total("recovery_attempt"),
"fec_recovered": self.flexfec_state_total("recovered"),
"fec_recovery_failed": self.flexfec_state_total("recovery_failed"),
"fec_unused": self.flexfec_state_total("unused"),
"fec_invalid": self.flexfec_state_total("invalid"),
"nack_total": self.counter_total("livekit_nack_total"),
"packet_loss_total": self.counter_total("livekit_packet_loss_total"),
}
def shade_bursts(ax, bursts):
for start, end in bursts:
ax.axvspan(start, end, color="red", alpha=0.08, lw=0)
def plot(runs, out_dir):
colors = {"baseline": "#888888", "fec": "#1f77b4"}
fig, axes = plt.subplots(4, 1, figsize=(14, 16), sharex=True)
ax_lat, ax_fec, ax_loss, ax_fps = axes
bursts = runs[-1].bursts
for run in runs:
color = colors.get(run.name, None)
xs, ys = run.latency_series()
ax_lat.plot(xs, ys, ".", markersize=2.5, label=f"{run.name} latency", color=color, alpha=0.7)
for x, n in run.frame_gaps():
ax_lat.axvline(x, color=color or "red", alpha=0.5, lw=min(0.5 + n * 0.3, 3))
shade_bursts(ax_lat, bursts)
ax_lat.set_ylabel("capture→receive latency (ms)")
ax_lat.set_title("Frame latency (vertical lines: frame-id gaps = lost frames, red shading: loss bursts)")
ax_lat.legend(loc="upper right", fontsize=8)
ax_lat.grid(alpha=0.3)
fec_runs = [r for r in runs if r.flexfec_state_total("received") > 0] or runs[-1:]
for run in fec_runs:
for state, style in [
("received", dict(color="#1f77b4", lw=1)),
("recovered", dict(color="#2ca02c", lw=1.8)),
("recovery_failed", dict(color="#d62728", lw=1.2)),
("unused", dict(color="#9467bd", lw=0.8, alpha=0.6)),
]:
xs, ys = run.flexfec_state_rate(state)
ax_fec.plot(xs, ys, label=f"{run.name} {state}/s", **style)
shade_bursts(ax_fec, bursts)
ax_fec.set_ylabel("FEC packets/s")
ax_fec.set_title("FlexFEC at the SFU")
ax_fec.legend(loc="upper right", fontsize=8)
ax_fec.grid(alpha=0.3)
for run in runs:
color = colors.get(run.name, None)
xs, ys = run.counter_rate("livekit_nack_total")
ax_loss.plot(xs, ys, label=f"{run.name} nack/s", color=color, lw=1.2)
xs, ys = run.counter_rate("livekit_packet_loss_total")
ax_loss.plot(xs, ys, label=f"{run.name} packet_loss/s", color=color, lw=1.2, ls="--", alpha=0.7)
shade_bursts(ax_loss, bursts)
ax_loss.set_ylabel("packets/s")
ax_loss.set_title("NACK and reported packet loss")
ax_loss.legend(loc="upper right", fontsize=8)
ax_loss.grid(alpha=0.3)
for run in runs:
color = colors.get(run.name, None)
xs, ys = run.fps_series()
ax_fps.plot(xs, ys, label=f"{run.name} fps", color=color, lw=1.2)
shade_bursts(ax_fps, bursts)
ax_fps.set_ylabel("frames/s")
ax_fps.set_xlabel("seconds since measurement start")
ax_fps.set_title("Delivered frame rate at subscriber")
ax_fps.legend(loc="lower right", fontsize=8)
ax_fps.grid(alpha=0.3)
meta = runs[-1].meta
if meta.get("DEBUG_DROP", "0") not in ("0", ""):
profile = "uniform {}% loss injected at SFU receive".format(meta["DEBUG_DROP"])
elif meta.get("SHAPING") == "0":
profile = "no loss"
else:
profile = "base loss {} / burst {} for {}s every {}s".format(
meta.get("BASE_LOSS", "?"),
meta.get("BURST_LOSS", "?"),
meta.get("BURST_LEN", "?"),
meta.get("BURST_EVERY", "?"),
)
fig.suptitle(
"FlexFEC publisher→SFU recovery — {}, codec {}".format(profile, meta.get("CODEC", "?")),
fontsize=12,
)
fig.tight_layout(rect=(0, 0, 1, 0.985))
out_path = os.path.join(out_dir, "fec_report.png")
fig.savefig(out_path, dpi=130)
return out_path
def print_summary(runs):
summaries = [(run.name, run.summary()) for run in runs]
keys = [
("frames_received", "{:.0f}"),
("frames_expected", "{:.0f}"),
("frames_lost", "{:.0f}"),
("gap_events", "{:.0f}"),
("latency_mean_ms", "{:.1f}"),
("latency_p50_ms", "{:.1f}"),
("latency_p95_ms", "{:.1f}"),
("latency_p99_ms", "{:.1f}"),
("fec_received", "{:.0f}"),
("fec_recovery_attempts", "{:.0f}"),
("fec_recovered", "{:.0f}"),
("fec_recovery_failed", "{:.0f}"),
("fec_unused", "{:.0f}"),
("fec_invalid", "{:.0f}"),
("nack_total", "{:.0f}"),
("packet_loss_total", "{:.0f}"),
]
name_w = 24
header = "metric".ljust(name_w) + "".join(name.rjust(16) for name, _ in summaries)
print(header)
print("-" * len(header))
for key, fmt in keys:
row = key.ljust(name_w)
for _, summary in summaries:
row += fmt.format(summary[key]).rjust(16)
print(row)
if len(summaries) == 2:
base, fec = summaries[0][1], summaries[1][1]
print()
if fec["frames_lost"] < base["frames_lost"]:
print(
"frames lost reduced {} -> {} with FlexFEC".format(
int(base["frames_lost"]), int(fec["frames_lost"])
)
)
if fec["fec_recovered"] > 0:
print("SFU recovered {} packets via FlexFEC".format(int(fec["fec_recovered"])))
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--run", action="append", required=True, help="run directory (repeatable)")
parser.add_argument("--out", required=True, help="output directory for the report")
args = parser.parse_args()
runs = [Run(path) for path in args.run]
for run in runs:
if not run.frames:
print(f"warning: no frames recorded for {run.path}", file=sys.stderr)
out_path = plot(runs, args.out)
print(f"report: {out_path}")
print()
print_summary(runs)
if __name__ == "__main__":
main()