Créer un worker asynchrone de bout en bout (capstone)

Objectifs

À la fin de ce tutoriel, vous saurez :

  • O1. Concevoir le modèle de données SeaORM pour une table de jobs avec une state machine à 5 états.
  • O2. Implémenter la boucle d'un worker Tokio avec sémaphore de concurrence et timeout.
  • O3. Tester l'ensemble du cycle (enqueue → exécution → SSE) avec des tests d'intégration hermétiques.

Pré-requis

  • Technique : tutoriels 01 et 02 complétés ; notions de Tokio (async/await, mpsc, spawn) ; PostgreSQL local ou testcontainer disponible.
  • Pédagogique : tutoriel 02-premiere-contribution complété.
  • Temps estimé : ~120 minutes.

Vue d'ensemble

Ce tutoriel est un capstone : vous combinez des recettes connues (migration, service, handler) sans guide pas-à-pas détaillé. L'objectif est de reproduire le patron du worker d'import (import_service + import_worker) pour un cas générique appelé notify_worker — un worker qui envoie des notifications externes de manière asynchrone.

flowchart LR
    UI[Handler POST\n/notify] -->|create_job| DB[(notify_jobs)]
    UI -->|try_send| Chan[mpsc NotifyTask]
    UI -->|302| Page[GET /notify/:id]
    Chan --> Worker[NotifyWorker\nSémaphore N]
    Worker -->|update status| DB
    Worker -->|HTTP POST externe| External[Endpoint distant]
    Page -. SSE .-> DB

Le patron est identique à celui utilisé pour l'import de dépôts externe (voir crates/gitrust-core/src/services/import_service.rs) et pour le worker CI. Vous n'inventez rien : vous combinez des pièces existantes.

Modèle mental : pensez à une file de caissiers de supermarché. La file d'attente (mpsc channel) accumule les clients (tâches). Le nombre de caisses ouvertes est le sémaphore. Chaque caissier (worker goroutine) traite un client et met à jour le tableau d'affichage (DB + SSE).

Étape 1 : Concevoir la state machine

Avant d'écrire une ligne de code, formalisez la state machine de votre job.

stateDiagram-v2
    [*] --> pending : create_job()
    pending --> running : worker démarre
    pending --> cancelled : POST /cancel
    running --> success : HTTP 200 reçu
    running --> failed : erreur ou timeout
    running --> cancelled : flag DB checked

Les 5 états sont : pending, running, success, failed, cancelled.

Règles de transition : - Seul pending peut être annulé avant démarrage. - running peut être annulé via un flag en DB vérifié par le worker. - success et failed sont terminaux.

Checkpoint : dessinez la state machine sur papier avant de continuer. Répondez à : « que se passe-t-il si le serveur redémarre pendant un job running ? » (réponse : au redémarrage, les jobs running doivent être marqués failed avec le message « server restarted »).

Étape 2 : Créer la migration SeaORM

Créez le fichier de migration :

crates/gitrust-core/src/migrations/m20260501_000023_create_notify_jobs.rs
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
    async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
        manager
            .create_table(
                Table::create()
                    .table(NotifyJobs::Table)
                    .if_not_exists()
                    .col(ColumnDef::new(NotifyJobs::Id).uuid().not_null().primary_key())
                    .col(ColumnDef::new(NotifyJobs::OwnerId).uuid().not_null())
                    .col(ColumnDef::new(NotifyJobs::TargetUrl).text().not_null())
                    .col(ColumnDef::new(NotifyJobs::Payload).text().not_null())
                    .col(
                        ColumnDef::new(NotifyJobs::Status)
                            .string()
                            .not_null()
                            .default("pending"),
                    )
                    .col(ColumnDef::new(NotifyJobs::ErrorMessage).text().null())
                    .col(ColumnDef::new(NotifyJobs::StartedAt).timestamp_with_time_zone().null())
                    .col(ColumnDef::new(NotifyJobs::FinishedAt).timestamp_with_time_zone().null())
                    .col(
                        ColumnDef::new(NotifyJobs::CreatedAt)
                            .timestamp_with_time_zone()
                            .not_null(),
                    )
                    .col(
                        ColumnDef::new(NotifyJobs::UpdatedAt)
                            .timestamp_with_time_zone()
                            .not_null(),
                    )
                    .to_owned(),
            )
            .await?;

        manager
            .create_index(
                Index::create()
                    .table(NotifyJobs::Table)
                    .name("notify_jobs_owner_status_idx")
                    .col(NotifyJobs::OwnerId)
                    .col(NotifyJobs::Status)
                    .to_owned(),
            )
            .await
    }

    async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
        manager
            .drop_table(Table::drop().table(NotifyJobs::Table).to_owned())
            .await
    }
}

#[derive(DeriveIden)]
enum NotifyJobs {
    Table,
    Id,
    OwnerId,
    TargetUrl,
    Payload,
    Status,
    ErrorMessage,
    StartedAt,
    FinishedAt,
    CreatedAt,
    UpdatedAt,
}

Enregistrez la migration dans crates/gitrust-core/src/migrations/mod.rs en ajoutant m20260501_000023_create_notify_jobs::Migration à la liste du Migrator.

Test rouge à faire passer :

#[tokio::test]
async fn migration_creates_notify_jobs_table() {
    let db = setup_test_db().await;
    // Si la migration s'est appliquée, la requête ne renvoie pas d'erreur.
    let count: i64 = db
        .query_one(Statement::from_string(
            DatabaseBackend::Postgres,
            "SELECT COUNT(*) FROM notify_jobs".to_owned(),
        ))
        .await
        .expect("La table notify_jobs doit exister")
        .unwrap()
        .try_get("", "count")
        .unwrap();
    assert_eq!(count, 0);
}

Checkpoint : cargo test --package gitrust-core migration_creates_notify_jobs_table passe au vert.

Étape 3 : Définir les DTOs et l'enum de statut

Créez crates/gitrust-core/src/dto/notify_dto.rs :

use serde::{Deserialize, Serialize};

/// Statut d'un job de notification — state machine à 5 états.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum NotifyStatus {
    Pending,
    Running,
    Success,
    Failed,
    Cancelled,
}

impl NotifyStatus {
    pub fn as_str(&self) -> &'static str {
        match self {
            Self::Pending => "pending",
            Self::Running => "running",
            Self::Success => "success",
            Self::Failed => "failed",
            Self::Cancelled => "cancelled",
        }
    }

    pub fn from_str(s: &str) -> Option<Self> {
        match s {
            "pending" => Some(Self::Pending),
            "running" => Some(Self::Running),
            "success" => Some(Self::Success),
            "failed" => Some(Self::Failed),
            "cancelled" => Some(Self::Cancelled),
            _ => None,
        }
    }
}

/// Payload envoyé via SSE au client.
#[derive(Debug, Serialize)]
pub struct NotifyJobSse {
    pub status: String,
    pub error_message: Option<String>,
}

/// Input pour créer un job.
#[derive(Debug, Deserialize)]
pub struct CreateNotifyJobInput {
    pub target_url: String,
    pub payload: String,
}

Tests round-trip obligatoires :

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn status_round_trip() {
        for (s, expected) in [
            ("pending", NotifyStatus::Pending),
            ("running", NotifyStatus::Running),
            ("success", NotifyStatus::Success),
            ("failed", NotifyStatus::Failed),
            ("cancelled", NotifyStatus::Cancelled),
        ] {
            let status = NotifyStatus::from_str(s).expect("doit parser");
            assert_eq!(status, expected);
            assert_eq!(status.as_str(), s);
        }
    }

    #[test]
    fn unknown_status_returns_none() {
        assert!(NotifyStatus::from_str("unknown").is_none());
    }
}

Checkpoint : cargo test --package gitrust-core status_round_trip passe.

Étape 4 : Implémenter le service

Créez crates/gitrust-core/src/services/notify_service.rs. Le service ne fait que orchestrer la DB — aucune logique métier IO ici :

use chrono::Utc;
use sea_orm::{ActiveModelTrait, ActiveValue::Set, DatabaseConnection, EntityTrait};
use uuid::Uuid;

use crate::{
    dto::notify_dto::{CreateNotifyJobInput, NotifyStatus},
    error::GitrustError,
    models::notify_job,
};

pub struct NotifyService;

impl NotifyService {
    /// Crée un job en statut `pending` et retourne son ID.
    pub async fn create_job(
        db: &DatabaseConnection,
        owner_id: Uuid,
        input: CreateNotifyJobInput,
    ) -> Result<Uuid, GitrustError> {
        let id = Uuid::new_v4();
        let now = Utc::now();
        let model = notify_job::ActiveModel {
            id: Set(id),
            owner_id: Set(owner_id),
            target_url: Set(input.target_url),
            payload: Set(input.payload),
            status: Set(NotifyStatus::Pending.as_str().to_owned()),
            error_message: Set(None),
            started_at: Set(None),
            finished_at: Set(None),
            created_at: Set(now),
            updated_at: Set(now),
        };
        model.insert(db).await.map_err(GitrustError::Database)?;
        Ok(id)
    }

    /// Passe le job en `running`.
    pub async fn mark_running(db: &DatabaseConnection, id: Uuid) -> Result<(), GitrustError> {
        Self::update_status(db, id, NotifyStatus::Running, None).await
    }

    /// Passe le job en `success`.
    pub async fn mark_success(db: &DatabaseConnection, id: Uuid) -> Result<(), GitrustError> {
        Self::update_status(db, id, NotifyStatus::Success, None).await
    }

    /// Passe le job en `failed` avec un message d'erreur.
    pub async fn mark_failed(
        db: &DatabaseConnection,
        id: Uuid,
        error: &str,
    ) -> Result<(), GitrustError> {
        Self::update_status(db, id, NotifyStatus::Failed, Some(error)).await
    }

    /// Passe le job en `cancelled`.
    pub async fn mark_cancelled(db: &DatabaseConnection, id: Uuid) -> Result<(), GitrustError> {
        Self::update_status(db, id, NotifyStatus::Cancelled, None).await
    }

    /// Vérifie si le job a été annulé (pour le poll du worker).
    pub async fn is_cancelled(
        db: &DatabaseConnection,
        id: Uuid,
    ) -> Result<bool, GitrustError> {
        let job = notify_job::Entity::find_by_id(id)
            .one(db)
            .await
            .map_err(GitrustError::Database)?
            .ok_or_else(|| GitrustError::NotFound("job introuvable".into()))?;
        Ok(job.status == NotifyStatus::Cancelled.as_str())
    }

    async fn update_status(
        db: &DatabaseConnection,
        id: Uuid,
        status: NotifyStatus,
        error: Option<&str>,
    ) -> Result<(), GitrustError> {
        use sea_orm::ActiveModelTrait;
        let now = Utc::now();
        let mut model: notify_job::ActiveModel = notify_job::Entity::find_by_id(id)
            .one(db)
            .await
            .map_err(GitrustError::Database)?
            .ok_or_else(|| GitrustError::NotFound("job introuvable".into()))?
            .into();

        model.status = Set(status.as_str().to_owned());
        model.updated_at = Set(now);
        model.error_message = Set(error.map(str::to_owned));

        if status == NotifyStatus::Running {
            model.started_at = Set(Some(now));
        }
        if matches!(status, NotifyStatus::Success | NotifyStatus::Failed | NotifyStatus::Cancelled) {
            model.finished_at = Set(Some(now));
        }

        model.update(db).await.map_err(GitrustError::Database)?;
        Ok(())
    }
}

Étape 5 : Implémenter le worker

Le worker tourne dans un tokio::spawn séparé. Il lit les tâches depuis un canal mpsc et les exécute avec un sémaphore de concurrence :

// crates/gitrust-core/src/services/notify_worker.rs

use std::sync::Arc;

use sea_orm::DatabaseConnection;
use tokio::sync::{mpsc, Semaphore};
use uuid::Uuid;

use super::notify_service::NotifyService;

/// Tâche envoyée via le canal mpsc au worker.
pub struct NotifyTask {
    pub job_id: Uuid,
    pub target_url: String,
    pub payload: String,
}

pub struct NotifyWorkerConfig {
    /// Nombre maximal de jobs concurrents.
    pub max_concurrent: usize,
    /// Timeout en secondes par job.
    pub timeout_secs: u64,
}

/// Boucle principale du worker. À appeler avec `tokio::spawn`.
pub async fn run(
    db: DatabaseConnection,
    mut rx: mpsc::Receiver<NotifyTask>,
    config: NotifyWorkerConfig,
) {
    let semaphore = Arc::new(Semaphore::new(config.max_concurrent));

    while let Some(task) = rx.recv().await {
        let permit = semaphore.clone().acquire_owned().await.expect("semaphore fermé");
        let db = db.clone();
        let timeout = config.timeout_secs;

        tokio::spawn(async move {
            let _permit = permit; // libéré à la fin du bloc
            execute_task(&db, task, timeout).await;
        });
    }
}

async fn execute_task(db: &DatabaseConnection, task: NotifyTask, timeout_secs: u64) {
    // Vérifier si annulé avant de démarrer
    match NotifyService::is_cancelled(db, task.job_id).await {
        Ok(true) => return,
        Err(e) => {
            tracing::error!(job_id = %task.job_id, error = %e, "Erreur lecture job");
            return;
        }
        Ok(false) => {}
    }

    if let Err(e) = NotifyService::mark_running(db, task.job_id).await {
        tracing::error!(job_id = %task.job_id, error = %e, "Impossible de marquer running");
        return;
    }

    let result = tokio::time::timeout(
        std::time::Duration::from_secs(timeout_secs),
        send_notification(&task.target_url, &task.payload),
    )
    .await;

    match result {
        Ok(Ok(())) => {
            if let Err(e) = NotifyService::mark_success(db, task.job_id).await {
                tracing::error!(job_id = %task.job_id, error = %e, "Impossible de marquer success");
            }
        }
        Ok(Err(e)) => {
            let msg = format!("Erreur HTTP: {e}");
            if let Err(db_err) = NotifyService::mark_failed(db, task.job_id, &msg).await {
                tracing::error!(job_id = %task.job_id, error = %db_err, "Impossible de marquer failed");
            }
        }
        Err(_elapsed) => {
            let msg = format!("Timeout après {timeout_secs}s");
            if let Err(e) = NotifyService::mark_failed(db, task.job_id, &msg).await {
                tracing::error!(job_id = %task.job_id, error = %e, "Impossible de marquer failed (timeout)");
            }
        }
    }
}

async fn send_notification(url: &str, payload: &str) -> Result<(), reqwest::Error> {
    let client = reqwest::Client::new();
    client
        .post(url)
        .header("Content-Type", "application/json")
        .body(payload.to_owned())
        .send()
        .await?
        .error_for_status()?;
    Ok(())
}

Checkpoint : cargo build --package gitrust-core compile sans erreur.

Étape 6 : Ajouter le handler et la route SSE

Dans crates/gitrust-web/src/handlers/notify.rs, créez le handler POST qui enqueues le job et redirige, ainsi que le handler SSE :

use axum::{
    extract::{Path, State},
    response::{sse::{Event, KeepAlive, Sse}, Redirect},
    Extension, Form,
};
use sea_orm::DatabaseConnection;
use tokio::time::{interval, Duration};
use tokio_stream::wrappers::IntervalStream;
use tokio_stream::StreamExt;
use uuid::Uuid;

use gitrust_core::{
    dto::notify_dto::CreateNotifyJobInput,
    services::{notify_service::NotifyService, notify_worker::NotifyTask},
};

use crate::auth::AuthUser;

pub type NotifySender = tokio::sync::mpsc::Sender<NotifyTask>;

/// POST /notify — crée un job et redirige vers la page de suivi.
pub async fn create_notify_job(
    State(db): State<DatabaseConnection>,
    Extension(tx): Extension<NotifySender>,
    user: AuthUser,
    Form(input): Form<CreateNotifyJobInput>,
) -> Result<Redirect, crate::error::AppError> {
    let job_id = NotifyService::create_job(&db, user.user_id, input.clone()).await?;

    // Essai d'envoi dans le canal — si plein, le job reste `pending` jusqu'au
    // prochain redémarrage (les jobs `pending` peuvent être re-envoyés au démarrage).
    let _ = tx.try_send(NotifyTask {
        job_id,
        target_url: input.target_url,
        payload: input.payload,
    });

    Ok(Redirect::to(&format!("/notify/{job_id}")))
}

/// GET /notify/:id/stream — SSE : pousse les mises à jour de statut au client.
pub async fn notify_job_stream(
    State(db): State<DatabaseConnection>,
    Path(id): Path<Uuid>,
) -> Sse<impl tokio_stream::Stream<Item = Result<Event, std::convert::Infallible>>> {
    let stream = IntervalStream::new(interval(Duration::from_secs(1)))
        .map(move |_| {
            let db = db.clone();
            async move {
                // Poll la DB pour l'état courant
                use gitrust_core::models::notify_job;
                use sea_orm::EntityTrait;
                let job = notify_job::Entity::find_by_id(id)
                    .one(&db)
                    .await
                    .ok()
                    .flatten();

                let data = match job {
                    Some(j) => format!(
                        r#"{{"status":"{}","error":{}}}"#,
                        j.status,
                        j.error_message
                            .as_deref()
                            .map(|e| format!(r#""{}""#, e.replace('"', "\\\"")))
                            .unwrap_or("null".to_owned())
                    ),
                    None => r#"{"status":"not_found"}"#.to_owned(),
                };

                Ok(Event::default().data(data))
            }
        })
        .then(|fut| fut);

    Sse::new(stream).keep_alive(KeepAlive::default())
}

Enregistrez les routes dans routes.rs :

.route("/notify", post(handlers::notify::create_notify_job))
.route("/notify/:id", get(handlers::notify::notify_job_page))
.route("/notify/:id/stream", get(handlers::notify::notify_job_stream))
.route("/notify/:id/cancel", post(handlers::notify::cancel_notify_job))

Étape 7 : Intégrer le worker dans main.rs

Dans crates/gitrust-web/src/main.rs, initialisez le canal et démarrez le worker :

// Dans la fonction main(), après la construction de l'app :
let (notify_tx, notify_rx) = tokio::sync::mpsc::channel::<NotifyTask>(100);

let notify_db = db.clone();
tokio::spawn(gitrust_core::services::notify_worker::run(
    notify_db,
    notify_rx,
    NotifyWorkerConfig {
        max_concurrent: std::env::var("NOTIFY_MAX_CONCURRENT")
            .unwrap_or("4".into())
            .parse()
            .unwrap_or(4),
        timeout_secs: std::env::var("NOTIFY_TIMEOUT_SECS")
            .unwrap_or("30".into())
            .parse()
            .unwrap_or(30),
    },
));

// Injecter l'émetteur dans le router via Extension
let app = routes::routes(&static_path)
    .layer(Extension(notify_tx))
    .with_state(db);

Étape 8 : Tests d'intégration

Écrivez au minimum ces trois tests dans crates/gitrust-core/src/services/notify_service_test.rs :

#[tokio::test]
async fn create_job_inserts_with_pending_status() {
    let db = setup_test_db().await;
    let id = NotifyService::create_job(
        &db,
        Uuid::new_v4(),
        CreateNotifyJobInput {
            target_url: "https://example.com/hook".into(),
            payload: r#"{"event":"test"}"#.into(),
        },
    )
    .await
    .expect("create_job doit réussir");

    let job = notify_job::Entity::find_by_id(id)
        .one(&db)
        .await
        .unwrap()
        .unwrap();
    assert_eq!(job.status, "pending");
    assert!(job.started_at.is_none());
}

#[tokio::test]
async fn mark_running_sets_started_at() {
    let db = setup_test_db().await;
    let id = create_test_job(&db).await;

    NotifyService::mark_running(&db, id).await.unwrap();

    let job = notify_job::Entity::find_by_id(id).one(&db).await.unwrap().unwrap();
    assert_eq!(job.status, "running");
    assert!(job.started_at.is_some());
}

#[tokio::test]
async fn cancel_before_running_is_terminal() {
    let db = setup_test_db().await;
    let id = create_test_job(&db).await;

    NotifyService::mark_cancelled(&db, id).await.unwrap();

    let is_cancelled = NotifyService::is_cancelled(&db, id).await.unwrap();
    assert!(is_cancelled);
}

Checkpoint : cargo test --package gitrust-core notify — tous les tests passent.

Récapitulatif

  • O1 accompli en concevant la table notify_jobs avec 5 états (pending/running/success/failed/cancelled) et la migration SeaORM correspondante.
  • O2 accompli en implémentant notify_worker::run avec Semaphore, tokio::time::timeout, et lecture du flag d'annulation via la DB.
  • O3 accompli en écrivant les tests create_job_inserts_with_pending_status, mark_running_sets_started_at et cancel_before_running_is_terminal, tous hermétiques (pas de réseau réel).

Et si ça ne marche pas

Symptôme Cause probable Correction
channel closed au démarrage Le Receiver est droppé avant le spawn du worker Vérifiez que tokio::spawn(run(..., notify_rx, ...)) est appelé avant que notify_rx ne soit droppé
Le job reste pending indéfiniment Le canal est plein ou le worker n'est pas démarré Ajoutez un log dans execute_task ; vérifiez que tokio::spawn est bien appelé dans main
SSE ne reçoit aucun événement Le Content-Type: text/event-stream n'est pas reconnu Vérifiez que le handler retourne bien Sse<...> et non String

Prochaine étape

Vous êtes désormais core contributor. Consultez les how-to pour les recettes quotidiennes :