Files
unshackle/unshackle/core/api/download_manager.py
2026-02-02 20:51:09 -07:00

662 lines
25 KiB
Python

import asyncio
import json
import logging
import os
import sys
import tempfile
import threading
import uuid
from contextlib import suppress
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Callable, Dict, List, Optional
log = logging.getLogger("download_manager")
class JobStatus(Enum):
QUEUED = "queued"
DOWNLOADING = "downloading"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class DownloadJob:
"""Represents a download job with all its parameters and status."""
job_id: str
status: JobStatus
created_time: datetime
service: str
title_id: str
parameters: Dict[str, Any]
# Progress tracking
started_time: Optional[datetime] = None
completed_time: Optional[datetime] = None
progress: float = 0.0
# Results and error info
output_files: List[str] = field(default_factory=list)
error_message: Optional[str] = None
error_details: Optional[str] = None
error_code: Optional[str] = None
error_traceback: Optional[str] = None
worker_stderr: Optional[str] = None
# Cancellation support
cancel_event: threading.Event = field(default_factory=threading.Event)
def to_dict(self, include_full_details: bool = False) -> Dict[str, Any]:
"""Convert job to dictionary for JSON response."""
result = {
"job_id": self.job_id,
"status": self.status.value,
"created_time": self.created_time.isoformat(),
"service": self.service,
"title_id": self.title_id,
"progress": self.progress,
}
if include_full_details:
result.update(
{
"parameters": self.parameters,
"started_time": self.started_time.isoformat() if self.started_time else None,
"completed_time": self.completed_time.isoformat() if self.completed_time else None,
"output_files": self.output_files,
"error_message": self.error_message,
"error_details": self.error_details,
"error_code": self.error_code,
"error_traceback": self.error_traceback,
"worker_stderr": self.worker_stderr,
}
)
return result
def _perform_download(
job_id: str,
service: str,
title_id: str,
params: Dict[str, Any],
cancel_event: Optional[threading.Event] = None,
progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None,
) -> List[str]:
"""Execute the synchronous download logic for a job."""
def _check_cancel(stage: str):
if cancel_event and cancel_event.is_set():
raise Exception(f"Job was cancelled {stage}")
from contextlib import redirect_stderr, redirect_stdout
from io import StringIO
_check_cancel("before execution started")
# Import dl.py components lazily to avoid circular deps during module import
import click
import yaml
from unshackle.commands.dl import dl
from unshackle.core.config import config
from unshackle.core.services import Services
from unshackle.core.utils.click_types import ContextData
from unshackle.core.utils.collections import merge_dict
log.info(f"Starting sync download for job {job_id}")
# Load service configuration
service_config_path = Services.get_path(service) / config.filenames.config
if service_config_path.exists():
service_config = yaml.safe_load(service_config_path.read_text(encoding="utf8"))
else:
service_config = {}
merge_dict(config.services.get(service), service_config)
from unshackle.commands.dl import dl as dl_command
ctx = click.Context(dl_command.cli)
ctx.invoked_subcommand = service
ctx.obj = ContextData(config=service_config, cdm=None, proxy_providers=[], profile=params.get("profile"))
ctx.params = {
"proxy": params.get("proxy"),
"no_proxy": params.get("no_proxy", False),
"profile": params.get("profile"),
"tag": params.get("tag"),
"tmdb_id": params.get("tmdb_id"),
"tmdb_name": params.get("tmdb_name", False),
"tmdb_year": params.get("tmdb_year", False),
}
dl_instance = dl(
ctx=ctx,
no_proxy=params.get("no_proxy", False),
profile=params.get("profile"),
proxy=params.get("proxy"),
tag=params.get("tag"),
tmdb_id=params.get("tmdb_id"),
tmdb_name=params.get("tmdb_name", False),
tmdb_year=params.get("tmdb_year", False),
)
service_module = Services.load(service)
_check_cancel("before service instantiation")
try:
import inspect
service_init_params = inspect.signature(service_module.__init__).parameters
service_ctx = click.Context(click.Command(service))
service_ctx.parent = ctx
service_ctx.obj = ctx.obj
service_kwargs = {}
if "title" in service_init_params:
service_kwargs["title"] = title_id
for key, value in params.items():
if key in service_init_params and key not in ["service", "title_id"]:
service_kwargs[key] = value
for param_name, param_info in service_init_params.items():
if param_name not in service_kwargs and param_name not in ["self", "ctx"]:
if param_info.default is inspect.Parameter.empty:
if param_name == "movie":
service_kwargs[param_name] = "/movies/" in title_id
elif param_name == "meta_lang":
service_kwargs[param_name] = None
else:
log.warning(f"Unknown required parameter '{param_name}' for service {service}, using None")
service_kwargs[param_name] = None
service_instance = service_module(service_ctx, **service_kwargs)
except Exception as exc: # noqa: BLE001 - propagate meaningful failure
log.error(f"Failed to create service instance: {exc}")
raise
original_download_dir = config.directories.downloads
_check_cancel("before download execution")
stdout_capture = StringIO()
stderr_capture = StringIO()
# Simple progress tracking if callback provided
if progress_callback:
# Report initial progress
progress_callback({"progress": 0.0, "status": "starting"})
# Simple approach: report progress at key points
original_result = dl_instance.result
def result_with_progress(*args, **kwargs):
try:
# Report that download started
progress_callback({"progress": 5.0, "status": "downloading"})
# Call original method
result = original_result(*args, **kwargs)
# Report completion
progress_callback({"progress": 100.0, "status": "completed"})
return result
except Exception as e:
progress_callback({"progress": 0.0, "status": "failed", "error": str(e)})
raise
dl_instance.result = result_with_progress
try:
with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture):
dl_instance.result(
service=service_instance,
quality=params.get("quality", []),
vcodec=params.get("vcodec"),
acodec=params.get("acodec"),
vbitrate=params.get("vbitrate"),
abitrate=params.get("abitrate"),
range_=params.get("range", ["SDR"]),
channels=params.get("channels"),
no_atmos=params.get("no_atmos", False),
split_audio=params.get("split_audio"),
wanted=params.get("wanted", []),
latest_episode=params.get("latest_episode", False),
lang=params.get("lang", ["orig"]),
v_lang=params.get("v_lang", []),
a_lang=params.get("a_lang", []),
s_lang=params.get("s_lang", ["all"]),
require_subs=params.get("require_subs", []),
forced_subs=params.get("forced_subs", False),
exact_lang=params.get("exact_lang", False),
sub_format=params.get("sub_format"),
video_only=params.get("video_only", False),
audio_only=params.get("audio_only", False),
subs_only=params.get("subs_only", False),
chapters_only=params.get("chapters_only", False),
no_subs=params.get("no_subs", False),
no_audio=params.get("no_audio", False),
no_chapters=params.get("no_chapters", False),
audio_description=params.get("audio_description", False),
slow=params.get("slow", False),
list_=False,
list_titles=False,
skip_dl=params.get("skip_dl", False),
export=params.get("export"),
cdm_only=params.get("cdm_only"),
no_proxy=params.get("no_proxy", False),
no_folder=params.get("no_folder", False),
no_source=params.get("no_source", False),
no_mux=params.get("no_mux", False),
workers=params.get("workers"),
downloads=params.get("downloads", 1),
best_available=params.get("best_available", False),
)
except SystemExit as exc:
if exc.code != 0:
stdout_str = stdout_capture.getvalue()
stderr_str = stderr_capture.getvalue()
log.error(f"Download exited with code {exc.code}")
log.error(f"Stdout: {stdout_str}")
log.error(f"Stderr: {stderr_str}")
raise Exception(f"Download failed with exit code {exc.code}")
except Exception as exc: # noqa: BLE001 - propagate to caller
stdout_str = stdout_capture.getvalue()
stderr_str = stderr_capture.getvalue()
log.error(f"Download execution failed: {exc}")
log.error(f"Stdout: {stdout_str}")
log.error(f"Stderr: {stderr_str}")
raise
log.info(f"Download completed for job {job_id}, files in {original_download_dir}")
return []
class DownloadQueueManager:
"""Manages download job queue with configurable concurrency limits."""
def __init__(self, max_concurrent_downloads: int = 2, job_retention_hours: int = 24):
self.max_concurrent_downloads = max_concurrent_downloads
self.job_retention_hours = job_retention_hours
self._jobs: Dict[str, DownloadJob] = {}
self._job_queue: asyncio.Queue = asyncio.Queue()
self._active_downloads: Dict[str, asyncio.Task] = {}
self._download_processes: Dict[str, asyncio.subprocess.Process] = {}
self._job_temp_files: Dict[str, Dict[str, str]] = {}
self._workers_started = False
self._shutdown_event = asyncio.Event()
log.info(
f"Initialized download queue manager: max_concurrent={max_concurrent_downloads}, retention_hours={job_retention_hours}"
)
def create_job(self, service: str, title_id: str, **parameters) -> DownloadJob:
"""Create a new download job and add it to the queue."""
job_id = str(uuid.uuid4())
job = DownloadJob(
job_id=job_id,
status=JobStatus.QUEUED,
created_time=datetime.now(),
service=service,
title_id=title_id,
parameters=parameters,
)
self._jobs[job_id] = job
self._job_queue.put_nowait(job)
log.info(f"Created download job {job_id} for {service}:{title_id}")
return job
def get_job(self, job_id: str) -> Optional[DownloadJob]:
"""Get job by ID."""
return self._jobs.get(job_id)
def list_jobs(self) -> List[DownloadJob]:
"""List all jobs."""
return list(self._jobs.values())
def cancel_job(self, job_id: str) -> bool:
"""Cancel a job if it's queued or downloading."""
job = self._jobs.get(job_id)
if not job:
return False
if job.status == JobStatus.QUEUED:
job.status = JobStatus.CANCELLED
job.cancel_event.set() # Signal cancellation
log.info(f"Cancelled queued job {job_id}")
return True
elif job.status == JobStatus.DOWNLOADING:
# Set the cancellation event first - this will be checked by the download thread
job.cancel_event.set()
job.status = JobStatus.CANCELLED
log.info(f"Signaled cancellation for downloading job {job_id}")
# Cancel the active download task
task = self._active_downloads.get(job_id)
if task:
task.cancel()
log.info(f"Cancelled download task for job {job_id}")
process = self._download_processes.get(job_id)
if process:
try:
process.terminate()
log.info(f"Terminated worker process for job {job_id}")
except ProcessLookupError:
log.debug(f"Worker process for job {job_id} already exited")
return True
return False
def cleanup_old_jobs(self) -> int:
"""Remove jobs older than retention period."""
cutoff_time = datetime.now() - timedelta(hours=self.job_retention_hours)
jobs_to_remove = []
for job_id, job in self._jobs.items():
if job.status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]:
if job.completed_time and job.completed_time < cutoff_time:
jobs_to_remove.append(job_id)
elif not job.completed_time and job.created_time < cutoff_time:
jobs_to_remove.append(job_id)
for job_id in jobs_to_remove:
del self._jobs[job_id]
if jobs_to_remove:
log.info(f"Cleaned up {len(jobs_to_remove)} old jobs")
return len(jobs_to_remove)
async def start_workers(self):
"""Start worker tasks to process the download queue."""
if self._workers_started:
return
self._workers_started = True
# Start worker tasks
for i in range(self.max_concurrent_downloads):
asyncio.create_task(self._download_worker(f"worker-{i}"))
# Start cleanup task
asyncio.create_task(self._cleanup_worker())
log.info(f"Started {self.max_concurrent_downloads} download workers")
async def shutdown(self):
"""Shutdown the queue manager and cancel all active downloads."""
log.info("Shutting down download queue manager")
self._shutdown_event.set()
# Cancel all active downloads
for task in self._active_downloads.values():
task.cancel()
# Terminate worker processes
for job_id, process in list(self._download_processes.items()):
try:
process.terminate()
except ProcessLookupError:
log.debug(f"Worker process for job {job_id} already exited during shutdown")
for job_id, process in list(self._download_processes.items()):
try:
await asyncio.wait_for(process.wait(), timeout=5)
except asyncio.TimeoutError:
log.warning(f"Worker process for job {job_id} did not exit, killing")
process.kill()
await process.wait()
finally:
self._download_processes.pop(job_id, None)
# Clean up any remaining temp files
for paths in self._job_temp_files.values():
for path in paths.values():
try:
os.remove(path)
except OSError:
pass
self._job_temp_files.clear()
# Wait for workers to finish
if self._active_downloads:
await asyncio.gather(*self._active_downloads.values(), return_exceptions=True)
async def _download_worker(self, worker_name: str):
"""Worker task that processes jobs from the queue."""
log.debug(f"Download worker {worker_name} started")
while not self._shutdown_event.is_set():
try:
# Wait for a job or shutdown signal
job = await asyncio.wait_for(self._job_queue.get(), timeout=1.0)
if job.status == JobStatus.CANCELLED:
continue
# Start processing the job
job.status = JobStatus.DOWNLOADING
job.started_time = datetime.now()
log.info(f"Worker {worker_name} starting job {job.job_id}")
# Create download task
download_task = asyncio.create_task(self._execute_download(job))
self._active_downloads[job.job_id] = download_task
try:
await download_task
except asyncio.CancelledError:
job.status = JobStatus.CANCELLED
log.info(f"Job {job.job_id} was cancelled")
except Exception as e:
job.status = JobStatus.FAILED
job.error_message = str(e)
log.error(f"Job {job.job_id} failed: {e}")
finally:
job.completed_time = datetime.now()
if job.job_id in self._active_downloads:
del self._active_downloads[job.job_id]
except asyncio.TimeoutError:
continue
except Exception as e:
log.error(f"Worker {worker_name} error: {e}")
async def _execute_download(self, job: DownloadJob):
"""Execute the actual download for a job."""
log.info(f"Executing download for job {job.job_id}")
try:
output_files = await self._run_download_async(job)
job.status = JobStatus.COMPLETED
job.output_files = output_files
job.progress = 100.0
log.info(f"Download completed for job {job.job_id}: {len(output_files)} files")
except Exception as e:
import traceback
from unshackle.core.api.errors import categorize_exception
job.status = JobStatus.FAILED
job.error_message = str(e)
job.error_details = str(e)
api_error = categorize_exception(
e, context={"service": job.service, "title_id": job.title_id, "job_id": job.job_id}
)
job.error_code = api_error.error_code.value
job.error_traceback = traceback.format_exc()
log.error(f"Download failed for job {job.job_id}: {e}")
raise
async def _run_download_async(self, job: DownloadJob) -> List[str]:
"""Invoke a worker subprocess to execute the download."""
payload = {
"job_id": job.job_id,
"service": job.service,
"title_id": job.title_id,
"parameters": job.parameters,
}
payload_fd, payload_path = tempfile.mkstemp(prefix=f"unshackle_job_{job.job_id}_", suffix="_payload.json")
os.close(payload_fd)
result_fd, result_path = tempfile.mkstemp(prefix=f"unshackle_job_{job.job_id}_", suffix="_result.json")
os.close(result_fd)
progress_fd, progress_path = tempfile.mkstemp(prefix=f"unshackle_job_{job.job_id}_", suffix="_progress.json")
os.close(progress_fd)
with open(payload_path, "w", encoding="utf-8") as handle:
json.dump(payload, handle)
process = await asyncio.create_subprocess_exec(
sys.executable,
"-m",
"unshackle.core.api.download_worker",
payload_path,
result_path,
progress_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
self._download_processes[job.job_id] = process
self._job_temp_files[job.job_id] = {"payload": payload_path, "result": result_path, "progress": progress_path}
communicate_task = asyncio.create_task(process.communicate())
stdout_bytes = b""
stderr_bytes = b""
try:
while True:
done, _ = await asyncio.wait({communicate_task}, timeout=0.5)
if communicate_task in done:
stdout_bytes, stderr_bytes = communicate_task.result()
break
# Check for progress updates
try:
if os.path.exists(progress_path):
with open(progress_path, "r", encoding="utf-8") as handle:
progress_data = json.load(handle)
if "progress" in progress_data:
new_progress = float(progress_data["progress"])
if new_progress != job.progress:
job.progress = new_progress
log.info(f"Job {job.job_id} progress updated: {job.progress}%")
except (FileNotFoundError, json.JSONDecodeError, ValueError) as e:
log.debug(f"Could not read progress for job {job.job_id}: {e}")
if job.cancel_event.is_set() or job.status == JobStatus.CANCELLED:
log.info(f"Cancellation detected for job {job.job_id}, terminating worker process")
process.terminate()
try:
await asyncio.wait_for(communicate_task, timeout=5)
except asyncio.TimeoutError:
log.warning(f"Worker process for job {job.job_id} did not terminate, killing")
process.kill()
await asyncio.wait_for(communicate_task, timeout=5)
raise asyncio.CancelledError("Job was cancelled")
returncode = process.returncode
stdout = stdout_bytes.decode("utf-8", errors="ignore")
stderr = stderr_bytes.decode("utf-8", errors="ignore")
if stdout.strip():
log.debug(f"Worker stdout for job {job.job_id}: {stdout.strip()}")
if stderr.strip():
log.warning(f"Worker stderr for job {job.job_id}: {stderr.strip()}")
job.worker_stderr = stderr.strip()
result_data: Optional[Dict[str, Any]] = None
try:
with open(result_path, "r", encoding="utf-8") as handle:
result_data = json.load(handle)
except FileNotFoundError:
log.error(f"Result file missing for job {job.job_id}")
except json.JSONDecodeError as exc:
log.error(f"Failed to parse worker result for job {job.job_id}: {exc}")
if returncode != 0:
message = result_data.get("message") if result_data else "unknown error"
if result_data:
job.error_details = result_data.get("error_details", message)
job.error_code = result_data.get("error_code")
raise Exception(f"Worker exited with code {returncode}: {message}")
if not result_data or result_data.get("status") != "success":
message = result_data.get("message") if result_data else "worker did not report success"
if result_data:
job.error_details = result_data.get("error_details", message)
job.error_code = result_data.get("error_code")
raise Exception(f"Worker failure: {message}")
return result_data.get("output_files", [])
finally:
if not communicate_task.done():
communicate_task.cancel()
with suppress(asyncio.CancelledError):
await communicate_task
self._download_processes.pop(job.job_id, None)
temp_paths = self._job_temp_files.pop(job.job_id, {})
for path in temp_paths.values():
try:
os.remove(path)
except OSError:
pass
def _execute_download_sync(self, job: DownloadJob) -> List[str]:
"""Execute download synchronously using existing dl.py logic."""
return _perform_download(job.job_id, job.service, job.title_id, job.parameters.copy(), job.cancel_event)
async def _cleanup_worker(self):
"""Worker that periodically cleans up old jobs."""
while not self._shutdown_event.is_set():
try:
await asyncio.sleep(3600) # Run every hour
self.cleanup_old_jobs()
except Exception as e:
log.error(f"Cleanup worker error: {e}")
# Global instance
download_manager: Optional[DownloadQueueManager] = None
def get_download_manager() -> DownloadQueueManager:
"""Get the global download manager instance."""
global download_manager
if download_manager is None:
# Load configuration from unshackle config
from unshackle.core.config import config
max_concurrent = getattr(config, "max_concurrent_downloads", 2)
retention_hours = getattr(config, "download_job_retention_hours", 24)
download_manager = DownloadQueueManager(max_concurrent, retention_hours)
return download_manager