mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-15 04:08:02 -04:00
8a2e9c373b
Python has a funny way of dealing with absent values. Fixes (again) restart tests to handle slowly restarting containers. # Expected complexity level and risk 1 # Testing n/a
205 lines
7.5 KiB
Python
205 lines
7.5 KiB
Python
import json
|
|
import os
|
|
import subprocess
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import List, Optional, Callable
|
|
from urllib.request import urlopen
|
|
|
|
from . import COMPOSE_FILE
|
|
|
|
|
|
def restart_docker():
|
|
"""
|
|
Restart all containers defined in the current `COMPOSE_FILE`.
|
|
|
|
Checks that all spacetimedb containers are up and running after the restart.
|
|
If they're not up after a couple of retries, throws an `Exception`.
|
|
"""
|
|
print("Restarting containers")
|
|
|
|
docker = DockerManager(COMPOSE_FILE)
|
|
docker.compose("restart")
|
|
containers = docker.list_spacetimedb_containers()
|
|
if not containers:
|
|
raise Exception("No spacetimedb containers found")
|
|
|
|
# Ensure all nodes are running.
|
|
attempts = 0
|
|
while attempts < 5:
|
|
attempts += 1
|
|
if all(container.is_running(docker, spacetimedb_ping_url) for container in containers):
|
|
# sleep a bit more to allow for leader election etc
|
|
# TODO: make ping endpoint consider all server state
|
|
time.sleep(2)
|
|
return
|
|
else:
|
|
time.sleep(1)
|
|
|
|
raise Exception("Not all containers are up and running")
|
|
|
|
def spacetimedb_ping_url(port: int) -> str:
|
|
return f"http://127.0.0.1:{port}/v1/ping"
|
|
|
|
@dataclass
|
|
class DockerContainer:
|
|
"""Represents a Docker container with its basic properties."""
|
|
id: str
|
|
name: str
|
|
|
|
def host_ports(self, docker) -> set[int]:
|
|
"""
|
|
Collect all host ports of this container.
|
|
|
|
Host ports are ports on the host that are bound to ports of the
|
|
container.
|
|
If the container is not currently running, an empty set is returned.
|
|
"""
|
|
host_ports = set()
|
|
info = docker.inspect_container(self)
|
|
for ports in info.get('NetworkSettings', {}).get('Ports', {}).values():
|
|
if ports:
|
|
for ip_and_port in ports:
|
|
host_port = ip_and_port.get("HostPort")
|
|
if host_port:
|
|
host_ports.add(host_port)
|
|
return host_ports
|
|
|
|
def is_running(self, docker, ping_url: Callable[[int], str]) -> bool:
|
|
"""
|
|
Check if the container is running.
|
|
|
|
`ping_url` takes a port number and returns a URL string that can be used
|
|
to determine if the host is running by returning a 200 status.
|
|
|
|
If `self.host_ports()` returns a non-empty set, and one `ping_url`
|
|
request is successful, the container is considered running.
|
|
"""
|
|
host_ports = self.host_ports(docker)
|
|
for port in host_ports:
|
|
url = ping_url(port)
|
|
print(f"Trying {url} ... ", end='', flush=True)
|
|
try:
|
|
with urlopen(url, timeout=0.2) as response:
|
|
if response.status == 200:
|
|
print("ok")
|
|
return True
|
|
except Exception as e:
|
|
print(f"error: {e}")
|
|
continue
|
|
|
|
print(f"container {self.name} not running")
|
|
return False
|
|
|
|
class DockerManager:
|
|
"""Manages all Docker and Docker Compose operations."""
|
|
|
|
def __init__(self, compose_file: str, **config):
|
|
self.compose_file = compose_file
|
|
self.network_name = config.get('network_name') or \
|
|
os.getenv('DOCKER_NETWORK_NAME', 'private_spacetime_cloud')
|
|
self.control_db_container = config.get('control_db_container') or \
|
|
os.getenv('CONTROL_DB_CONTAINER', 'node')
|
|
self.spacetime_cli_bin = config.get('spacetime_cli_bin') or \
|
|
os.getenv('SPACETIME_CLI_BIN', 'spacetimedb-cloud')
|
|
|
|
def _execute_command(self, *args: str) -> str:
|
|
"""Execute a Docker command and return its output."""
|
|
try:
|
|
result = subprocess.run(
|
|
args,
|
|
capture_output=True,
|
|
text=True,
|
|
check=True
|
|
)
|
|
return result.stdout.strip()
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Command failed: {e.stderr}")
|
|
raise
|
|
except Exception as e:
|
|
print(f"Unexpected error: {str(e)}")
|
|
raise
|
|
|
|
def compose(self, *args: str) -> str:
|
|
"""Execute a `docker compose` command."""
|
|
return self._execute_command("docker", "compose", "-f", self.compose_file, *args)
|
|
|
|
def docker(self, *args: str) -> str:
|
|
"""Execute a `docker` command."""
|
|
return self._execute_command("docker", *args)
|
|
|
|
def list_containers(self, *filters) -> List[DockerContainer]:
|
|
"""
|
|
List the containers of the current compose file and return as DockerContainer objects.
|
|
|
|
All containers are considered, even if not running ('-a' flag).
|
|
The containers may be filtered by 'filters' ('--filter' option).
|
|
"""
|
|
# Use -a so we don't miss a crashed or killed container
|
|
# when checking for readiness.
|
|
cmd = ["ps", "-a"]
|
|
|
|
# Restrict to the current compose file.
|
|
compose_file = os.path.abspath(COMPOSE_FILE)
|
|
cmd.extend(["--filter", f"label=com.docker.compose.project.config_files={compose_file}"])
|
|
|
|
# Apply additional filters.
|
|
for f in filters:
|
|
cmd.extend(["--filter", f])
|
|
|
|
# Output only the fields we need for `DockerContainer`.
|
|
cmd.extend(["--format", "{{.ID}} {{.Names}}"])
|
|
|
|
output = self.docker(*cmd)
|
|
containers = []
|
|
for line in output.splitlines():
|
|
if line.strip():
|
|
container_id, name = line.split(maxsplit=1)
|
|
containers.append(DockerContainer(id=container_id, name=name))
|
|
return containers
|
|
|
|
def list_spacetimedb_containers(self) -> List[DockerContainer]:
|
|
"""List all containers running spacetimedb."""
|
|
return self.list_containers("label=app=spacetimedb")
|
|
|
|
def inspect_container(self, container: DockerContainer):
|
|
"""Run the `inspect` command for `container`, returning the parsed JSON dict."""
|
|
info = self.docker("inspect", container.name)
|
|
return json.loads(info)[0]
|
|
|
|
def get_container_by_name(self, name: str) -> Optional[DockerContainer]:
|
|
"""Find a container by name pattern."""
|
|
return next(
|
|
(c for c in self.list_containers() if name in c.name),
|
|
None
|
|
)
|
|
|
|
def kill_container(self, container_id: str):
|
|
"""Kill a container by ID."""
|
|
print(f"Killing container {container_id}")
|
|
self.docker("kill", container_id)
|
|
|
|
def start_container(self, container_id: str):
|
|
"""Start a container by ID."""
|
|
print(f"Starting container {container_id}")
|
|
self.docker("start", container_id)
|
|
|
|
def disconnect_container(self, container_id: str):
|
|
"""Disconnect a container from the network."""
|
|
print(f"Disconnecting container {container_id}")
|
|
self.docker("network", "disconnect", self.network_name, container_id)
|
|
print(f"Disconnected container {container_id}")
|
|
|
|
def connect_container(self, container_id: str):
|
|
"""Connect a container to the network."""
|
|
print(f"Connecting container {container_id}")
|
|
self.docker("network", "connect", self.network_name, container_id)
|
|
print(f"Connected container {container_id}")
|
|
|
|
def generate_root_token(self) -> str:
|
|
"""Generate a root token using spacetimedb-cloud."""
|
|
return self.compose(
|
|
"exec", self.control_db_container, self.spacetime_cli_bin, "token", "gen",
|
|
"--subject=placeholder-node-id",
|
|
"--jwt-priv-key", "/etc/spacetimedb/keys/id_ecdsa").split('|')[1]
|