Comprendre le patron worker async de gitrust

Ce que vous allez comprendre

  • Analyser pourquoi les opérations longues (import, export, envoi de mails) ne peuvent pas s'exécuter dans le cycle requête/réponse HTTP.
  • Décrire comment le patron worker async de gitrust combine mpsc, Semaphore, et SSE pour traiter des tâches longues avec retour de progression.
  • Évaluer les garanties de durabilité offertes par le modèle « état en base + canal en mémoire ».

Le problème concret

L'import d'un dépôt GitHub peut prendre 30 secondes. Une requête HTTP a un timeout. Si le serveur redémarre pendant l'import, la tâche est perdue. Si 50 utilisateurs lancent un import simultanément sans limite, le serveur sature.

Le même problème se pose pour l'envoi de mails, la génération de SBOM, ou toute opération dont la durée dépasse quelques secondes.

L'analogie

Le patron ressemble à une cuisine de restaurant avec un tableau de commandes :

  • Le serveur (handler HTTP) prend la commande, l'inscrit sur le tableau (DB), et dit au client « votre numéro est le 42, suivez l'avancement ici ».
  • La cuisine (worker) lit le tableau en continu, choisit la prochaine commande, la prépare derrière un comptoir limité à N plans de travail (Semaphore).
  • Le tableau lumineux (SSE) indique l'état en temps réel au client sans qu'il ait besoin de redemander.

Si la cuisine ferme brutalement (crash), les commandes « en cours » sur le tableau sont remises en attente au redémarrage — elles ne disparaissent pas.

Le modèle

Vue d'ensemble du patron

flowchart LR
    Handler[Handler HTTP\nPOST /import]
    DB[(Table\nimport_jobs)]
    Channel[mpsc::channel\nJobId]
    Worker[NotifyWorker\ntokio::spawn]
    Semaphore[Semaphore\nmax N slots]
    Executor[execute_job()\nlogique métier]
    SSE[SSE Handler\nGET /import/:id/stream]
    Client[Navigateur\nhx-ext='sse']

    Handler -->|"INSERT status=Pending"| DB
    Handler -->|"tx.send(job_id)"| Channel
    Channel -->|"rx.recv()"| Worker
    Worker -->|"acquire()"| Semaphore
    Semaphore -->|"permit"| Executor
    Executor -->|"UPDATE status=..."| DB
    DB -->|"SELECT"| SSE
    SSE -->|"data: "| Client

La table de persistance

Chaque type de job a sa propre table. Le schéma canonique pour un import :

CREATE TABLE import_jobs (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id         UUID NOT NULL REFERENCES users(id),
    repository_id   UUID REFERENCES repositories(id),
    status          VARCHAR(20) NOT NULL DEFAULT 'pending',
    -- 'pending' | 'running' | 'completed' | 'failed'
    source_url      TEXT NOT NULL,
    error_message   TEXT,
    progress_pct    SMALLINT NOT NULL DEFAULT 0,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    started_at      TIMESTAMPTZ,
    completed_at    TIMESTAMPTZ
);

Invariant fondamental : le statut en base est la source de vérité. Si le processus redémarre, un job running sans process vivant est remis à pending au démarrage.

L'état en mémoire : le canal mpsc

// crates/gitrust-web/src/workers/import_worker.rs

use tokio::sync::mpsc;

#[derive(Clone)]
pub struct ImportJobSender(pub mpsc::Sender<Uuid>);

pub struct ImportWorker {
    db: DatabaseConnection,
    rx: mpsc::Receiver<Uuid>,
    semaphore: Arc<Semaphore>,
}

impl ImportWorker {
    pub fn new(db: DatabaseConnection) -> (Self, ImportJobSender) {
        let (tx, rx) = mpsc::channel(100); // buffer de 100 IDs en attente
        let semaphore = Arc::new(Semaphore::new(4)); // max 4 imports simultanés
        let worker = Self { db, rx, semaphore };
        (worker, ImportJobSender(tx))
    }

    pub async fn run(mut self) {
        while let Some(job_id) = self.rx.recv().await {
            let db = self.db.clone();
            let permit = Arc::clone(&self.semaphore)
                .acquire_owned()
                .await
                .expect("semaphore closed");

            tokio::spawn(async move {
                let _permit = permit; // libéré quand le spawn se termine
                if let Err(e) = execute_import(&db, job_id).await {
                    tracing::error!(?job_id, error = %e, "import job failed");
                    let _ = ImportJobRepository::mark_failed(&db, job_id, &e.to_string()).await;
                }
            });
        }
    }
}

Le Semaphore borne la concurrence. Même si 100 IDs arrivent dans le canal, seuls 4 execute_import s'exécutent en parallèle. Les autres attendent leur tour dans le canal.

Le handler HTTP — découpler la réponse de l'exécution

// crates/gitrust-web/src/routes/import.rs

pub async fn post_import(
    State(db): State<DatabaseConnection>,
    State(sender): State<ImportJobSender>,
    user: AuthUser,
    Form(form): Form<ImportForm>,
) -> Result<impl IntoResponse, AppError> {
    // 1. Valider et persister
    let job = ImportJobRepository::create(&db, user.user_id, &form.source_url).await?;

    // 2. Envoyer l'ID dans le canal (non-bloquant grâce au buffer)
    sender.0.send(job.id).await.map_err(|_| AppError::Internal("worker unavailable".into()))?;

    // 3. Répondre immédiatement avec la page de suivi
    Ok(Redirect::to(&format!("/import/{}/progress", job.id)))
}

Le handler retourne avant que l'import ait commencé. Le navigateur est redirigé vers une page de progression.

Le worker — exécution avec transitions d'état

async fn execute_import(db: &DatabaseConnection, job_id: Uuid) -> anyhow::Result<()> {
    // Transition : pending → running
    ImportJobRepository::mark_running(db, job_id).await?;

    // Récupérer le job
    let job = ImportJobRepository::find(db, job_id).await?
        .ok_or_else(|| anyhow!("job not found: {}", job_id))?;

    // Étape 1/3 — cloner
    ImportJobRepository::update_progress(db, job_id, 10).await?;
    let temp_dir = git_clone(&job.source_url).await?;

    // Étape 2/3 — créer le dépôt gitrust
    ImportJobRepository::update_progress(db, job_id, 50).await?;
    let repo = RepositoryService::create_from_import(db, &job, &temp_dir).await?;

    // Étape 3/3 — finaliser
    ImportJobRepository::update_progress(db, job_id, 90).await?;
    finalize_import(db, job.id, repo.id).await?;

    // Transition : running → completed
    ImportJobRepository::mark_completed(db, job_id, repo.id).await?;
    Ok(())
}

Chaque update_progress écrit en base. Si le process redémarre entre deux étapes, la progression est connue — mais le job doit être relancé depuis zéro (pas de reprise partielle dans ce modèle).

SSE — retour de progression vers le navigateur

// crates/gitrust-web/src/routes/import.rs

pub async fn stream_import_progress(
    Path(job_id): Path<Uuid>,
    State(db): State<DatabaseConnection>,
    user: AuthUser,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let stream = async_stream::stream! {
        loop {
            let job = ImportJobRepository::find(&db, job_id).await
                .unwrap_or(None);

            match job {
                None => {
                    yield Ok(Event::default().data("<p>Job introuvable.</p>"));
                    break;
                }
                Some(ref j) if j.status == "completed" => {
                    yield Ok(Event::default()
                        .data(format!("<p>Import terminé — <a href='/{}/{}'>Voir le dépôt</a></p>",
                            j.owner_login, j.repo_slug)));
                    break;
                }
                Some(ref j) if j.status == "failed" => {
                    yield Ok(Event::default()
                        .data(format!("<p class='text-error'>Erreur : {}</p>",
                            j.error_message.as_deref().unwrap_or("inconnue"))));
                    break;
                }
                Some(ref j) => {
                    yield Ok(Event::default()
                        .data(format!("<progress class='progress' value='{}' max='100'></progress>",
                            j.progress_pct)));
                    tokio::time::sleep(Duration::from_millis(500)).await;
                }
            }
        }
    };

    Sse::new(stream).keep_alive(KeepAlive::default())
}

Le template HTMX consomme ce flux :

<!-- templates/import/progress.html -->
<div
  hx-ext="sse"
  sse-connect="/import/{{ job_id }}/stream"
  sse-swap="message"
  hx-swap="innerHTML"
  id="progress-container"
>
  <progress class="progress" value="0" max="100"></progress>
</div>

Récupération au redémarrage

Au démarrage de l'application, gitrust remet en pending les jobs bloqués en running :

// crates/gitrust-web/src/main.rs (dans la fonction de démarrage)

async fn recover_stuck_jobs(db: &DatabaseConnection, sender: &ImportJobSender) {
    let stuck = ImportJobRepository::find_by_status(db, "running").await
        .unwrap_or_default();

    for job in stuck {
        tracing::warn!(?job.id, "recovering stuck import job");
        let _ = ImportJobRepository::reset_to_pending(db, job.id).await;
        let _ = sender.0.send(job.id).await;
    }
}

Cette fonction s'exécute une fois au démarrage, avant que le serveur HTTP accepte des connexions.

Enregistrement dans main.rs

// crates/gitrust-web/src/main.rs

let (import_worker, import_sender) = ImportWorker::new(db.clone());

// Démarrer le worker dans un task dédié
tokio::spawn(async move { import_worker.run().await });

// Récupérer les jobs bloqués
recover_stuck_jobs(&db, &import_sender).await;

// Injecter le sender dans l'état Axum
let app = Router::new()
    // ... routes ...
    .with_state(AppState { db, import_sender, /* ... */ });

Décisions et compromis

Durabilité : base de données vs queue dédiée

Approche Durabilité Complexité opérationnelle
Table SQL (gitrust) Au niveau PostgreSQL Nulle (déjà présent)
Redis + Bull/Sidekiq Haute (AOF/RDB) Redis à opérer
RabbitMQ / NATS Très haute Broker supplémentaire
In-memory seulement Nulle (perdu au restart) Nulle

Gitrust choisit la table SQL : la durabilité est suffisante (perte maximale = 1 transaction), et PostgreSQL est déjà requis. Un broker externe serait disproportionné pour une forge auto-hébergée à petite échelle.

Concurrence : Semaphore vs pool de workers

Le Semaphore(N) est plus simple qu'un pool de threads ou de tâches fixes. Il permet au runtime Tokio de gérer l'ordonnancement. L'inconvénient : si un job prend 30 secondes et que N=4, les 5e et 6e jobs attendent dans le canal, pas dans un pool observable.

Pour des jobs très longs (> 5 minutes), un pool de workers avec heartbeat serait plus robuste — mais ce cas ne se présente pas dans gitrust aujourd'hui.

SSE vs WebSocket vs polling

Méthode Cas d'usage
Polling HTMX (hx-trigger="every 2s") Statut simple, peu de mises à jour
SSE (gitrust) Flux unidirectionnel continu (logs, progression)
WebSocket Bidirectionnel (chat, collaboration temps réel)

SSE est suffisant pour tous les workers actuels de gitrust (import, CI logs). WebSocket ajouterait une complexité d'état bidirectionnel non nécessaire.

Vérifier votre compréhension

  1. Le serveur gitrust redémarre pendant l'import d'un dépôt. Le job était en statut running avec progress_pct = 60. Que se passe-t-il exactement au redémarrage ? L'utilisateur verra-t-il la progression reprendre à 60 % ou à 0 % ?

  2. Le Semaphore est initialisé à 4 slots. 10 utilisateurs lancent un import simultanément. Combien de tokio::spawn s'exécutent en parallèle ? Où attendent les 6 jobs restants ? Que se passe-t-il si le serveur redémarre à cet instant ?

Pour aller plus loin