Files
SpacetimeDB/smoketests/docker.py
Kim Altintop 8a2e9c373b smoketests: Make gathering container ports None-safe (#2991)
Python has a funny way of dealing with absent values.

Fixes (again) restart tests to handle slowly restarting containers.

# Expected complexity level and risk

1

# Testing

n/a
2025-07-28 08:31:28 +00:00

205 lines
7.5 KiB
Python

import json
import os
import subprocess
import time
from dataclasses import dataclass
from typing import List, Optional, Callable
from urllib.request import urlopen
from . import COMPOSE_FILE
def restart_docker():
"""
Restart all containers defined in the current `COMPOSE_FILE`.
Checks that all spacetimedb containers are up and running after the restart.
If they're not up after a couple of retries, throws an `Exception`.
"""
print("Restarting containers")
docker = DockerManager(COMPOSE_FILE)
docker.compose("restart")
containers = docker.list_spacetimedb_containers()
if not containers:
raise Exception("No spacetimedb containers found")
# Ensure all nodes are running.
attempts = 0
while attempts < 5:
attempts += 1
if all(container.is_running(docker, spacetimedb_ping_url) for container in containers):
# sleep a bit more to allow for leader election etc
# TODO: make ping endpoint consider all server state
time.sleep(2)
return
else:
time.sleep(1)
raise Exception("Not all containers are up and running")
def spacetimedb_ping_url(port: int) -> str:
return f"http://127.0.0.1:{port}/v1/ping"
@dataclass
class DockerContainer:
"""Represents a Docker container with its basic properties."""
id: str
name: str
def host_ports(self, docker) -> set[int]:
"""
Collect all host ports of this container.
Host ports are ports on the host that are bound to ports of the
container.
If the container is not currently running, an empty set is returned.
"""
host_ports = set()
info = docker.inspect_container(self)
for ports in info.get('NetworkSettings', {}).get('Ports', {}).values():
if ports:
for ip_and_port in ports:
host_port = ip_and_port.get("HostPort")
if host_port:
host_ports.add(host_port)
return host_ports
def is_running(self, docker, ping_url: Callable[[int], str]) -> bool:
"""
Check if the container is running.
`ping_url` takes a port number and returns a URL string that can be used
to determine if the host is running by returning a 200 status.
If `self.host_ports()` returns a non-empty set, and one `ping_url`
request is successful, the container is considered running.
"""
host_ports = self.host_ports(docker)
for port in host_ports:
url = ping_url(port)
print(f"Trying {url} ... ", end='', flush=True)
try:
with urlopen(url, timeout=0.2) as response:
if response.status == 200:
print("ok")
return True
except Exception as e:
print(f"error: {e}")
continue
print(f"container {self.name} not running")
return False
class DockerManager:
"""Manages all Docker and Docker Compose operations."""
def __init__(self, compose_file: str, **config):
self.compose_file = compose_file
self.network_name = config.get('network_name') or \
os.getenv('DOCKER_NETWORK_NAME', 'private_spacetime_cloud')
self.control_db_container = config.get('control_db_container') or \
os.getenv('CONTROL_DB_CONTAINER', 'node')
self.spacetime_cli_bin = config.get('spacetime_cli_bin') or \
os.getenv('SPACETIME_CLI_BIN', 'spacetimedb-cloud')
def _execute_command(self, *args: str) -> str:
"""Execute a Docker command and return its output."""
try:
result = subprocess.run(
args,
capture_output=True,
text=True,
check=True
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
print(f"Command failed: {e.stderr}")
raise
except Exception as e:
print(f"Unexpected error: {str(e)}")
raise
def compose(self, *args: str) -> str:
"""Execute a `docker compose` command."""
return self._execute_command("docker", "compose", "-f", self.compose_file, *args)
def docker(self, *args: str) -> str:
"""Execute a `docker` command."""
return self._execute_command("docker", *args)
def list_containers(self, *filters) -> List[DockerContainer]:
"""
List the containers of the current compose file and return as DockerContainer objects.
All containers are considered, even if not running ('-a' flag).
The containers may be filtered by 'filters' ('--filter' option).
"""
# Use -a so we don't miss a crashed or killed container
# when checking for readiness.
cmd = ["ps", "-a"]
# Restrict to the current compose file.
compose_file = os.path.abspath(COMPOSE_FILE)
cmd.extend(["--filter", f"label=com.docker.compose.project.config_files={compose_file}"])
# Apply additional filters.
for f in filters:
cmd.extend(["--filter", f])
# Output only the fields we need for `DockerContainer`.
cmd.extend(["--format", "{{.ID}} {{.Names}}"])
output = self.docker(*cmd)
containers = []
for line in output.splitlines():
if line.strip():
container_id, name = line.split(maxsplit=1)
containers.append(DockerContainer(id=container_id, name=name))
return containers
def list_spacetimedb_containers(self) -> List[DockerContainer]:
"""List all containers running spacetimedb."""
return self.list_containers("label=app=spacetimedb")
def inspect_container(self, container: DockerContainer):
"""Run the `inspect` command for `container`, returning the parsed JSON dict."""
info = self.docker("inspect", container.name)
return json.loads(info)[0]
def get_container_by_name(self, name: str) -> Optional[DockerContainer]:
"""Find a container by name pattern."""
return next(
(c for c in self.list_containers() if name in c.name),
None
)
def kill_container(self, container_id: str):
"""Kill a container by ID."""
print(f"Killing container {container_id}")
self.docker("kill", container_id)
def start_container(self, container_id: str):
"""Start a container by ID."""
print(f"Starting container {container_id}")
self.docker("start", container_id)
def disconnect_container(self, container_id: str):
"""Disconnect a container from the network."""
print(f"Disconnecting container {container_id}")
self.docker("network", "disconnect", self.network_name, container_id)
print(f"Disconnected container {container_id}")
def connect_container(self, container_id: str):
"""Connect a container to the network."""
print(f"Connecting container {container_id}")
self.docker("network", "connect", self.network_name, container_id)
print(f"Connected container {container_id}")
def generate_root_token(self) -> str:
"""Generate a root token using spacetimedb-cloud."""
return self.compose(
"exec", self.control_db_container, self.spacetime_cli_bin, "token", "gen",
"--subject=placeholder-node-id",
"--jwt-priv-key", "/etc/spacetimedb/keys/id_ecdsa").split('|')[1]