Files
SpacetimeDB/smoketests/docker.py
Noa 3cc59de66c Add typescript quickstart smoketest (#3463)
# Description of Changes

Based on #3461, which fixes a bug encountered in the quickstart.

# API and ABI breaking changes

<!-- If this is an API or ABI breaking change, please apply the
corresponding GitHub label. -->
n/a

# Expected complexity level and risk

<!--
How complicated do you think these changes are? Grade on a scale from 1
to 5,
where 1 is a trivial change, and 5 is a deep-reaching and complex
change.

This complexity rating applies not only to the complexity apparent in
the diff,
but also to its interactions with existing and future code.

If you answered more than a 2, explain what is complex about the PR,
and what other components it interacts with in potentially concerning
ways. -->
1

# Testing

<!-- Describe any testing you've done, and any testing you'd like your
reviewers to do,
so that you're confident that all the changes work as expected! -->

- [x] Yes
- [ ] <!-- maybe a test you want a reviewer to do, so they can check it
off when they're satisfied. -->
2025-10-31 16:30:11 +00:00

209 lines
7.6 KiB
Python

import json
import os
import subprocess
import time
from dataclasses import dataclass
from typing import List, Optional, Callable
from urllib.request import urlopen
from . import COMPOSE_FILE
def restart_docker():
"""
Restart all containers defined in the current `COMPOSE_FILE`.
Checks that all spacetimedb containers are up and running after the restart.
If they're not up after a couple of retries, throws an `Exception`.
"""
print("Restarting containers")
docker = DockerManager(COMPOSE_FILE)
docker.compose("restart")
containers = docker.list_spacetimedb_containers()
if not containers:
raise Exception("No spacetimedb containers found")
# Ensure all nodes are running.
attempts = 0
while attempts < 10:
attempts += 1
containers_alive = {
container.name: container.is_running(docker, spacetimedb_ping_url)
for container in containers
}
if all(containers_alive.values()):
# sleep a bit more to allow for leader election etc
# TODO: make ping endpoint consider all server state
time.sleep(2)
return
else:
time.sleep(1)
raise Exception(f"Not all containers are up and running: {containers_alive!r}")
def spacetimedb_ping_url(port: int) -> str:
return f"http://127.0.0.1:{port}/v1/ping"
@dataclass
class DockerContainer:
"""Represents a Docker container with its basic properties."""
id: str
name: str
def host_ports(self, docker) -> set[int]:
"""
Collect all host ports of this container.
Host ports are ports on the host that are bound to ports of the
container.
If the container is not currently running, an empty set is returned.
"""
host_ports = set()
info = docker.inspect_container(self)
for ports in info.get('NetworkSettings', {}).get('Ports', {}).values():
if ports:
for ip_and_port in ports:
host_port = ip_and_port.get("HostPort")
if host_port:
host_ports.add(host_port)
return host_ports
def is_running(self, docker, ping_url: Callable[[int], str]) -> bool:
"""
Check if the container is running.
`ping_url` takes a port number and returns a URL string that can be used
to determine if the host is running by returning a 200 status.
If `self.host_ports()` returns a non-empty set, and one `ping_url`
request is successful, the container is considered running.
"""
host_ports = self.host_ports(docker)
for port in host_ports:
url = ping_url(port)
print(f"Trying {url} ... ", end='', flush=True)
try:
with urlopen(url, timeout=0.2) as response:
if response.status == 200:
print("ok")
return True
except Exception as e:
print(f"error: {e}")
continue
print(f"container {self.name} not running")
return False
class DockerManager:
"""Manages all Docker and Docker Compose operations."""
def __init__(self, compose_file: str, **config):
self.compose_file = compose_file
self.network_name = config.get('network_name') or \
os.getenv('DOCKER_NETWORK_NAME', 'private_spacetime_cloud')
self.control_db_container = config.get('control_db_container') or \
os.getenv('CONTROL_DB_CONTAINER', 'node')
self.spacetime_cli_bin = config.get('spacetime_cli_bin') or \
os.getenv('SPACETIME_CLI_BIN', 'spacetimedb-cloud')
def _execute_command(self, *args: str) -> str:
"""Execute a Docker command and return its output."""
try:
result = subprocess.run(
args,
capture_output=True,
text=True,
check=True
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
print(f"Command failed: {e.stderr}")
raise
except Exception as e:
print(f"Unexpected error: {str(e)}")
raise
def compose(self, *args: str) -> str:
"""Execute a `docker compose` command."""
return self._execute_command("docker", "compose", "-f", self.compose_file, *args)
def docker(self, *args: str) -> str:
"""Execute a `docker` command."""
return self._execute_command("docker", *args)
def list_containers(self, *filters) -> List[DockerContainer]:
"""
List the containers of the current compose file and return as DockerContainer objects.
All containers are considered, even if not running ('-a' flag).
The containers may be filtered by 'filters' ('--filter' option).
"""
# Use -a so we don't miss a crashed or killed container
# when checking for readiness.
cmd = ["ps", "-a"]
# Restrict to the current compose file.
compose_file = os.path.abspath(COMPOSE_FILE)
cmd.extend(["--filter", f"label=com.docker.compose.project.config_files={compose_file}"])
# Apply additional filters.
for f in filters:
cmd.extend(["--filter", f])
# Output only the fields we need for `DockerContainer`.
cmd.extend(["--format", "{{.ID}} {{.Names}}"])
output = self.docker(*cmd)
containers = []
for line in output.splitlines():
if line.strip():
container_id, name = line.split(maxsplit=1)
containers.append(DockerContainer(id=container_id, name=name))
return containers
def list_spacetimedb_containers(self) -> List[DockerContainer]:
"""List all containers running spacetimedb."""
return self.list_containers("label=app=spacetimedb")
def inspect_container(self, container: DockerContainer):
"""Run the `inspect` command for `container`, returning the parsed JSON dict."""
info = self.docker("inspect", container.name)
return json.loads(info)[0]
def get_container_by_name(self, name: str) -> Optional[DockerContainer]:
"""Find a container by name pattern."""
return next(
(c for c in self.list_containers() if name in c.name),
None
)
def kill_container(self, container_id: str):
"""Kill a container by ID."""
print(f"Killing container {container_id}")
self.docker("kill", container_id)
def start_container(self, container_id: str):
"""Start a container by ID."""
print(f"Starting container {container_id}")
self.docker("start", container_id)
def disconnect_container(self, container_id: str):
"""Disconnect a container from the network."""
print(f"Disconnecting container {container_id}")
self.docker("network", "disconnect", self.network_name, container_id)
print(f"Disconnected container {container_id}")
def connect_container(self, container_id: str):
"""Connect a container to the network."""
print(f"Connecting container {container_id}")
self.docker("network", "connect", self.network_name, container_id)
print(f"Connected container {container_id}")
def generate_root_token(self) -> str:
"""Generate a root token using spacetimedb-cloud."""
return self.compose(
"exec", self.control_db_container, self.spacetime_cli_bin, "token", "gen",
"--subject=placeholder-node-id",
"--jwt-priv-key", "/etc/spacetimedb/keys/id_ecdsa").split('|')[1]