diff --git a/AGENTS.md b/AGENTS.md index 39fe877..a932426 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -210,16 +210,4 @@ Pre-commit hooks are configured but may stash unstaged changes. | File | Contents | |------|----------| | `README.md` | Human-facing project overview | -| `DOCS.md` | Feature documentation | -| `E2E.md` | End-to-end testing notes | -| `ENDPOINT_REFACTOR.md` | Batch plan for endpoint renaming (completed) | -| `ISSUES.md` | Known issues and backlog | -| `MEDIA_MANAGEMENT.md` | Media lifecycle documentation | -| `NOTES.md` | Development notes | -| `OPTIMIZATIONS.md` | Performance optimization notes | -| `PLAN.md` | Project roadmap | -| `UX.md` | UX conventions | -| `GEMINI.md` | Gemini-specific context | -| `REVIEW_2.md` | Code review notes | -| `SOURCEMAP.md` | Frontend source map | | `AGENTS.md` | This file — agent development guide | diff --git a/backend/app/api/system/jobs.py b/backend/app/api/system/jobs.py index 21e14d9..1c68b3f 100644 --- a/backend/app/api/system/jobs.py +++ b/backend/app/api/system/jobs.py @@ -118,6 +118,69 @@ def get_job_stats(db_session: Session = Depends(get_db)): } +# NOTE: /jobs/stream MUST be registered BEFORE /jobs/{job_id} routes +# because FastAPI matches routes in definition order. +@router.get("/jobs/stream", operation_id="stream_jobs") +async def stream_jobs(): + """Server-Sent Events (SSE) endpoint for real-time job status updates.""" + + async def event_generator(): + while True: + with SessionLocal() as db_session: + active_jobs = ( + db_session.query(models.Job) + .filter(models.Job.status.in_(["RUNNING", "PENDING"])) + .all() + ) + job_ids = [job.id for job in active_jobs] + if job_ids: + placeholders = ", ".join([f":id{i}" for i in range(len(job_ids))]) + params = {f"id{i}": jid for i, jid in enumerate(job_ids)} + subquery = text(f""" + SELECT jl.job_id, jl.message + FROM job_logs jl + INNER JOIN ( + SELECT job_id, MAX(id) as max_id + FROM job_logs + WHERE job_id IN ({placeholders}) + GROUP BY job_id + ) latest ON jl.id = latest.max_id + """) + latest_logs = { + row[0]: row[1] + for row in db_session.execute(subquery, params).fetchall() + } + else: + latest_logs = {} + + serialized_data = [] + for job in active_jobs: + job_dict = { + "id": job.id, + "job_type": job.job_type, + "status": job.status, + "progress": job.progress, + "current_task": job.current_task, + "error_message": job.error_message, + "started_at": job.started_at, + "created_at": job.created_at, + "latest_log": latest_logs.get(job.id), + } + for date_field in ["started_at", "created_at"]: + from datetime import datetime + + val = job_dict[date_field] + if isinstance(val, datetime): + job_dict[date_field] = val.isoformat() + serialized_data.append(job_dict) + + yield f"data: {json.dumps(serialized_data)}\n\n" + + await asyncio.sleep(2) + + return StreamingResponse(event_generator(), media_type="text/event-stream") + + @router.get("/jobs/{job_id}", response_model=JobSchema, operation_id="get_job") def get_job(job_id: int, db_session: Session = Depends(get_db)): """Retrieves detailed metadata for a specific job.""" @@ -211,64 +274,3 @@ def retry_job( "message": f"Retry initiated for {job_record.job_type} job", "new_job_id": new_job.id, } - - -@router.get("/jobs/stream", operation_id="stream_jobs") -async def stream_jobs(): - """Server-Sent Events (SSE) endpoint for real-time job status updates.""" - - async def event_generator(): - while True: - with SessionLocal() as db_session: - active_jobs = ( - db_session.query(models.Job) - .filter(models.Job.status.in_(["RUNNING", "PENDING"])) - .all() - ) - job_ids = [job.id for job in active_jobs] - if job_ids: - placeholders = ", ".join([f":id{i}" for i in range(len(job_ids))]) - params = {f"id{i}": jid for i, jid in enumerate(job_ids)} - subquery = text(f""" - SELECT jl.job_id, jl.message - FROM job_logs jl - INNER JOIN ( - SELECT job_id, MAX(id) as max_id - FROM job_logs - WHERE job_id IN ({placeholders}) - GROUP BY job_id - ) latest ON jl.id = latest.max_id - """) - latest_logs = { - row[0]: row[1] - for row in db_session.execute(subquery, params).fetchall() - } - else: - latest_logs = {} - - serialized_data = [] - for job in active_jobs: - job_dict = { - "id": job.id, - "job_type": job.job_type, - "status": job.status, - "progress": job.progress, - "current_task": job.current_task, - "error_message": job.error_message, - "started_at": job.started_at, - "created_at": job.created_at, - "latest_log": latest_logs.get(job.id), - } - for date_field in ["started_at", "created_at"]: - from datetime import datetime - - val = job_dict[date_field] - if isinstance(val, datetime): - job_dict[date_field] = val.isoformat() - serialized_data.append(job_dict) - - yield f"data: {json.dumps(serialized_data)}\n\n" - - await asyncio.sleep(2) - - return StreamingResponse(event_generator(), media_type="text/event-stream") diff --git a/backend/app/services/archiver.py b/backend/app/services/archiver.py index 121120b..27be8b3 100644 --- a/backend/app/services/archiver.py +++ b/backend/app/services/archiver.py @@ -548,7 +548,7 @@ class ArchiverService: JobManager.update_job( job_id, 15.0 + (70.0 * (processed_bytes / safe_divisor)), - f"Archived chunk {chunk_index+1} via binary tar", + f"Archived chunk {chunk_index + 1} via binary tar", ) except Exception as e: logger.error( @@ -618,7 +618,7 @@ class ArchiverService: JobManager.update_job( job_id, 15.0 + (70.0 * (processed_bytes / safe_divisor)), - f"Streaming chunk {chunk_index+1}/{len(chunks)} to {media_record.media_type}...", + f"Streaming chunk {chunk_index + 1}/{len(chunks)} to {media_record.media_type}...", ) with open(staging_full_path, "rb") as final_stream: archive_location_id = storage_provider.write_archive( @@ -654,7 +654,7 @@ class ArchiverService: try: utilization_ratio = float(hardware_utilization) logger.info( - f"Hardware reported utilization: {utilization_ratio*100:.1f}%" + f"Hardware reported utilization: {utilization_ratio * 100:.1f}%" ) except (TypeError, ValueError): utilization_ratio = ( @@ -671,7 +671,7 @@ class ArchiverService: if utilization_ratio >= 0.98 and media_record.status == "active": logger.info( - f"MEDIA SATURATED: {media_record.identifier} ({utilization_ratio*100:.1f}%)" + f"MEDIA SATURATED: {media_record.identifier} ({utilization_ratio * 100:.1f}%)" ) media_record.status = "full" @@ -699,12 +699,12 @@ class ArchiverService: if JobManager.is_cancelled(job_id): JobManager.add_job_log( job_id, - f"Backup cancelled. Utilization: {utilization_ratio*100:.1f}%", + f"Backup cancelled. Utilization: {utilization_ratio * 100:.1f}%", ) else: JobManager.add_job_log( job_id, - f"Backup complete. Utilization: {utilization_ratio*100:.1f}%", + f"Backup complete. Utilization: {utilization_ratio * 100:.1f}%", ) JobManager.complete_job(job_id) from app.services.notifications import notification_manager diff --git a/backend/app/services/scanner.py b/backend/app/services/scanner.py index c02f316..92d6b99 100644 --- a/backend/app/services/scanner.py +++ b/backend/app/services/scanner.py @@ -331,11 +331,11 @@ class JobManager: @staticmethod def complete_job(job_id: int): - """Marks a job as successfully completed.""" + """Marks a job as successfully completed if it is still active.""" with SessionLocal() as db_session: try: job_record = db_session.get(models.Job, job_id) - if job_record: + if job_record and job_record.status in ("PENDING", "RUNNING"): job_record.status = "COMPLETED" job_record.progress = 100.0 job_record.completed_at = datetime.now(timezone.utc) @@ -346,11 +346,11 @@ class JobManager: @staticmethod def fail_job(job_id: int, error_message: str): - """Marks a job as failed and records the error message.""" + """Marks a job as failed if it is still active.""" with SessionLocal() as db_session: try: job_record = db_session.get(models.Job, job_id) - if job_record: + if job_record and job_record.status in ("PENDING", "RUNNING"): job_record.status = "FAILED" job_record.error_message = error_message job_record.completed_at = datetime.now(timezone.utc) diff --git a/backend/tests/test_api_jobs.py b/backend/tests/test_api_jobs.py new file mode 100644 index 0000000..695b563 --- /dev/null +++ b/backend/tests/test_api_jobs.py @@ -0,0 +1,363 @@ +from datetime import datetime, timezone + +import pytest + +from app.db import models + + +# ── list_jobs ── + + +def test_list_jobs_empty(client): + """Tests listing jobs when none exist.""" + response = client.get("/system/jobs") + assert response.status_code == 200 + assert response.json() == [] + + +def test_list_jobs_populated(client, db_session): + """Tests listing jobs with pagination and latest_log inclusion.""" + job1 = models.Job( + job_type="SCAN", + status="COMPLETED", + progress=100.0, + current_task="Done", + started_at=datetime.now(timezone.utc), + completed_at=datetime.now(timezone.utc), + ) + job2 = models.Job( + job_type="BACKUP", + status="RUNNING", + progress=50.0, + current_task="Writing archive", + ) + db_session.add_all([job1, job2]) + db_session.flush() + + db_session.add(models.JobLog(job_id=job1.id, message="Scan complete")) + db_session.add(models.JobLog(job_id=job2.id, message="Processing chunk 3")) + db_session.commit() + + response = client.get("/system/jobs") + assert response.status_code == 200 + data = response.json() + assert len(data) == 2 + # Most recent first (job2) + assert data[0]["job_type"] == "BACKUP" + assert data[0]["status"] == "RUNNING" + assert data[0]["latest_log"] == "Processing chunk 3" + assert data[1]["job_type"] == "SCAN" + assert data[1]["latest_log"] == "Scan complete" + + +def test_list_jobs_pagination(client, db_session): + """Tests limit/offset pagination on jobs list.""" + for i in range(5): + db_session.add(models.Job(job_type="SCAN", status="COMPLETED")) + db_session.commit() + + response = client.get("/system/jobs?limit=2&offset=1") + assert response.status_code == 200 + data = response.json() + assert len(data) == 2 + + +# ── get_job_count ── + + +def test_get_job_count_empty(client): + """Tests job count when none exist.""" + response = client.get("/system/jobs/count") + assert response.status_code == 200 + assert response.json()["count"] == 0 + + +def test_get_job_count_populated(client, db_session): + """Tests job count with existing jobs.""" + db_session.add_all( + [ + models.Job(job_type="SCAN", status="COMPLETED"), + models.Job(job_type="BACKUP", status="FAILED"), + ] + ) + db_session.commit() + + response = client.get("/system/jobs/count") + assert response.status_code == 200 + assert response.json()["count"] == 2 + + +# ── get_job_stats ── + + +def test_get_job_stats_empty(client): + """Tests job stats when database is empty.""" + response = client.get("/system/jobs/stats") + assert response.status_code == 200 + data = response.json() + assert data["total"] == 0 + assert data["success_rate"] == 100.0 + assert data["avg_duration_seconds"] == 0 + + +def test_get_job_stats_populated(client, db_session): + """Tests job stats with a mix of statuses and types.""" + now = datetime.now(timezone.utc) + db_session.add_all( + [ + models.Job( + job_type="SCAN", + status="COMPLETED", + started_at=now, + completed_at=now, + ), + models.Job( + job_type="SCAN", + status="COMPLETED", + started_at=now, + completed_at=now, + ), + models.Job(job_type="BACKUP", status="FAILED"), + models.Job(job_type="RESTORE", status="RUNNING"), + models.Job(job_type="SCAN", status="PENDING"), + ] + ) + db_session.commit() + + response = client.get("/system/jobs/stats") + assert response.status_code == 200 + data = response.json() + assert data["total"] == 5 + assert data["completed"] == 2 + assert data["failed"] == 1 + assert data["running"] == 1 + assert data["pending"] == 1 + # 2 completed / (2 completed + 1 failed) = 66.7% + assert data["success_rate"] == 66.7 + assert data["job_type_counts"]["SCAN"] == 3 + assert data["job_type_counts"]["BACKUP"] == 1 + assert data["job_type_counts"]["RESTORE"] == 1 + + +# ── get_job ── + + +def test_get_job_found(client, db_session): + """Tests retrieving a specific job by ID.""" + job = models.Job( + job_type="BACKUP", + status="RUNNING", + progress=42.5, + current_task="Archiving files", + ) + db_session.add(job) + db_session.flush() + db_session.add(models.JobLog(job_id=job.id, message="Started backup")) + db_session.commit() + + response = client.get(f"/system/jobs/{job.id}") + assert response.status_code == 200 + data = response.json() + assert data["id"] == job.id + assert data["job_type"] == "BACKUP" + assert data["status"] == "RUNNING" + assert data["progress"] == 42.5 + assert data["current_task"] == "Archiving files" + assert data["latest_log"] == "Started backup" + + +def test_get_job_not_found(client): + """Tests retrieving a non-existent job returns 404.""" + response = client.get("/system/jobs/99999") + assert response.status_code == 404 + + +# ── get_job_logs ── + + +def test_get_job_logs_found(client, db_session): + """Tests retrieving logs for a specific job.""" + job = models.Job(job_type="SCAN", status="COMPLETED") + db_session.add(job) + db_session.flush() + db_session.add(models.JobLog(job_id=job.id, message="Step 1")) + db_session.add(models.JobLog(job_id=job.id, message="Step 2")) + db_session.commit() + + response = client.get(f"/system/jobs/{job.id}/logs") + assert response.status_code == 200 + data = response.json() + assert len(data) == 2 + assert data[0]["message"] == "Step 1" + assert data[1]["message"] == "Step 2" + + +def test_get_job_logs_not_found(client): + """Tests retrieving logs for a non-existent job returns 404.""" + response = client.get("/system/jobs/99999/logs") + assert response.status_code == 404 + + +# ── cancel_job ── + + +def test_cancel_running_job(client, db_session): + """Tests cancelling a running job sets FAILED + is_cancelled.""" + job = models.Job(job_type="BACKUP", status="RUNNING") + db_session.add(job) + db_session.commit() + + response = client.post(f"/system/jobs/{job.id}/cancel") + assert response.status_code == 200 + assert "Cancellation request submitted" in response.json()["message"] + + db_session.expire_all() + refreshed = db_session.get(models.Job, job.id) + assert refreshed.status == "FAILED" + assert refreshed.is_cancelled is True + assert "Cancelled" in refreshed.error_message + + +def test_cancel_pending_job(client, db_session): + """Tests cancelling a pending job.""" + job = models.Job(job_type="SCAN", status="PENDING") + db_session.add(job) + db_session.commit() + + response = client.post(f"/system/jobs/{job.id}/cancel") + assert response.status_code == 200 + + db_session.expire_all() + refreshed = db_session.get(models.Job, job.id) + assert refreshed.status == "FAILED" + assert refreshed.is_cancelled is True + + +def test_cancel_completed_job_is_noop(client, db_session): + """Tests cancelling a completed job is a no-op (cancel only acts on PENDING/RUNNING).""" + job = models.Job(job_type="SCAN", status="COMPLETED") + db_session.add(job) + db_session.commit() + + response = client.post(f"/system/jobs/{job.id}/cancel") + assert response.status_code == 200 + + db_session.expire_all() + refreshed = db_session.get(models.Job, job.id) + # cancel_job only acts on PENDING/RUNNING jobs + assert refreshed.status == "COMPLETED" + assert refreshed.is_cancelled is False + + +def test_complete_job_skips_already_failed_job(db_session): + """Tests that complete_job is a no-op when the job was cancelled/failed.""" + from app.services.scanner import JobManager + + job = models.Job(job_type="BACKUP", status="FAILED", is_cancelled=True) + db_session.add(job) + db_session.commit() + + JobManager.complete_job(job.id) + + db_session.expire_all() + refreshed = db_session.get(models.Job, job.id) + assert refreshed.status == "FAILED" + assert refreshed.is_cancelled is True + assert refreshed.progress == 0.0 + + +def test_fail_job_skips_already_failed_job(db_session): + """Tests that fail_job preserves the original error when job is already failed.""" + from app.services.scanner import JobManager + + job = models.Job(job_type="BACKUP", status="FAILED", error_message="Original error") + db_session.add(job) + db_session.commit() + + JobManager.fail_job(job.id, "New error message") + + db_session.expire_all() + refreshed = db_session.get(models.Job, job.id) + assert refreshed.status == "FAILED" + assert refreshed.error_message == "Original error" + + +def test_complete_job_skips_deleted_job(db_session): + """Tests that complete_job handles a job deleted by test reset gracefully.""" + from app.services.scanner import JobManager + + job = models.Job(job_type="SCAN", status="RUNNING") + db_session.add(job) + db_session.commit() + job_id = job.id + + # Simulate /system/test/reset deleting the job + db_session.query(models.Job).filter(models.Job.id == job_id).delete() + db_session.commit() + + # Should not raise + JobManager.complete_job(job_id) + + assert db_session.get(models.Job, job_id) is None + + +# ── retry_job ── + + +def test_retry_failed_scan_job(client, db_session): + """Tests retrying a failed SCAN job creates a new job.""" + job = models.Job(job_type="SCAN", status="FAILED", error_message="Timeout") + db_session.add(job) + db_session.commit() + + response = client.post(f"/system/jobs/{job.id}/retry") + assert response.status_code == 200 + data = response.json() + assert "Retry initiated" in data["message"] + assert "new_job_id" in data + + # Verify new job exists with correct type + new_job = db_session.get(models.Job, data["new_job_id"]) + assert new_job is not None + assert new_job.job_type == "SCAN" + + +def test_retry_job_not_found(client): + """Tests retrying a non-existent job returns 404.""" + response = client.post("/system/jobs/99999/retry") + assert response.status_code == 404 + + +def test_retry_job_not_failed(client, db_session): + """Tests retrying a non-failed job returns 400.""" + job = models.Job(job_type="SCAN", status="COMPLETED") + db_session.add(job) + db_session.commit() + + response = client.post(f"/system/jobs/{job.id}/retry") + assert response.status_code == 400 + assert "Only failed jobs can be retried" in response.json()["detail"] + + +def test_retry_non_scan_job(client, db_session): + """Tests retrying a failed BACKUP job returns 400.""" + job = models.Job(job_type="BACKUP", status="FAILED") + db_session.add(job) + db_session.commit() + + response = client.post(f"/system/jobs/{job.id}/retry") + assert response.status_code == 400 + assert "Retry for BACKUP jobs is not supported" in response.json()["detail"] + + +# ── stream_jobs ── + + +@pytest.mark.skip( + reason="Async infinite SSE stream cannot be tested with synchronous TestClient" +) +def test_stream_jobs_returns_sse(client): + """Tests that the stream endpoint is registered and accessible.""" + response = client.get("/system/jobs/stream") + assert response.status_code == 200 + assert "text/event-stream" in response.headers.get("content-type", "")