Créer un worker asynchrone de bout en bout (capstone)¶
Objectifs¶
À la fin de ce tutoriel, vous saurez :
- O1. Concevoir le modèle de données SeaORM pour une table de jobs avec une state machine à 5 états.
- O2. Implémenter la boucle d'un worker Tokio avec sémaphore de concurrence et timeout.
- O3. Tester l'ensemble du cycle (enqueue → exécution → SSE) avec des tests d'intégration hermétiques.
Pré-requis¶
- Technique : tutoriels 01 et 02 complétés ; notions de Tokio (async/await,
mpsc,spawn) ; PostgreSQL local ou testcontainer disponible. - Pédagogique : tutoriel
02-premiere-contributioncomplété. - Temps estimé : ~120 minutes.
Vue d'ensemble¶
Ce tutoriel est un capstone : vous combinez des recettes connues (migration, service, handler) sans guide pas-à-pas détaillé. L'objectif est de reproduire le patron du worker d'import (import_service + import_worker) pour un cas générique appelé notify_worker — un worker qui envoie des notifications externes de manière asynchrone.
flowchart LR
UI[Handler POST\n/notify] -->|create_job| DB[(notify_jobs)]
UI -->|try_send| Chan[mpsc NotifyTask]
UI -->|302| Page[GET /notify/:id]
Chan --> Worker[NotifyWorker\nSémaphore N]
Worker -->|update status| DB
Worker -->|HTTP POST externe| External[Endpoint distant]
Page -. SSE .-> DB
Le patron est identique à celui utilisé pour l'import de dépôts externe (voir crates/gitrust-core/src/services/import_service.rs) et pour le worker CI. Vous n'inventez rien : vous combinez des pièces existantes.
Modèle mental : pensez à une file de caissiers de supermarché. La file d'attente (
mpsc channel) accumule les clients (tâches). Le nombre de caisses ouvertes est le sémaphore. Chaque caissier (worker goroutine) traite un client et met à jour le tableau d'affichage (DB + SSE).
Étape 1 : Concevoir la state machine¶
Avant d'écrire une ligne de code, formalisez la state machine de votre job.
stateDiagram-v2
[*] --> pending : create_job()
pending --> running : worker démarre
pending --> cancelled : POST /cancel
running --> success : HTTP 200 reçu
running --> failed : erreur ou timeout
running --> cancelled : flag DB checked
Les 5 états sont : pending, running, success, failed, cancelled.
Règles de transition :
- Seul pending peut être annulé avant démarrage.
- running peut être annulé via un flag en DB vérifié par le worker.
- success et failed sont terminaux.
Checkpoint : dessinez la state machine sur papier avant de continuer. Répondez à : « que se passe-t-il si le serveur redémarre pendant un job running ? » (réponse : au redémarrage, les jobs running doivent être marqués failed avec le message « server restarted »).
Étape 2 : Créer la migration SeaORM¶
Créez le fichier de migration :
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(NotifyJobs::Table)
.if_not_exists()
.col(ColumnDef::new(NotifyJobs::Id).uuid().not_null().primary_key())
.col(ColumnDef::new(NotifyJobs::OwnerId).uuid().not_null())
.col(ColumnDef::new(NotifyJobs::TargetUrl).text().not_null())
.col(ColumnDef::new(NotifyJobs::Payload).text().not_null())
.col(
ColumnDef::new(NotifyJobs::Status)
.string()
.not_null()
.default("pending"),
)
.col(ColumnDef::new(NotifyJobs::ErrorMessage).text().null())
.col(ColumnDef::new(NotifyJobs::StartedAt).timestamp_with_time_zone().null())
.col(ColumnDef::new(NotifyJobs::FinishedAt).timestamp_with_time_zone().null())
.col(
ColumnDef::new(NotifyJobs::CreatedAt)
.timestamp_with_time_zone()
.not_null(),
)
.col(
ColumnDef::new(NotifyJobs::UpdatedAt)
.timestamp_with_time_zone()
.not_null(),
)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.table(NotifyJobs::Table)
.name("notify_jobs_owner_status_idx")
.col(NotifyJobs::OwnerId)
.col(NotifyJobs::Status)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(NotifyJobs::Table).to_owned())
.await
}
}
#[derive(DeriveIden)]
enum NotifyJobs {
Table,
Id,
OwnerId,
TargetUrl,
Payload,
Status,
ErrorMessage,
StartedAt,
FinishedAt,
CreatedAt,
UpdatedAt,
}
Enregistrez la migration dans crates/gitrust-core/src/migrations/mod.rs en ajoutant m20260501_000023_create_notify_jobs::Migration à la liste du Migrator.
Test rouge à faire passer :
#[tokio::test]
async fn migration_creates_notify_jobs_table() {
let db = setup_test_db().await;
// Si la migration s'est appliquée, la requête ne renvoie pas d'erreur.
let count: i64 = db
.query_one(Statement::from_string(
DatabaseBackend::Postgres,
"SELECT COUNT(*) FROM notify_jobs".to_owned(),
))
.await
.expect("La table notify_jobs doit exister")
.unwrap()
.try_get("", "count")
.unwrap();
assert_eq!(count, 0);
}
Checkpoint : cargo test --package gitrust-core migration_creates_notify_jobs_table passe au vert.
Étape 3 : Définir les DTOs et l'enum de statut¶
Créez crates/gitrust-core/src/dto/notify_dto.rs :
use serde::{Deserialize, Serialize};
/// Statut d'un job de notification — state machine à 5 états.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum NotifyStatus {
Pending,
Running,
Success,
Failed,
Cancelled,
}
impl NotifyStatus {
pub fn as_str(&self) -> &'static str {
match self {
Self::Pending => "pending",
Self::Running => "running",
Self::Success => "success",
Self::Failed => "failed",
Self::Cancelled => "cancelled",
}
}
pub fn from_str(s: &str) -> Option<Self> {
match s {
"pending" => Some(Self::Pending),
"running" => Some(Self::Running),
"success" => Some(Self::Success),
"failed" => Some(Self::Failed),
"cancelled" => Some(Self::Cancelled),
_ => None,
}
}
}
/// Payload envoyé via SSE au client.
#[derive(Debug, Serialize)]
pub struct NotifyJobSse {
pub status: String,
pub error_message: Option<String>,
}
/// Input pour créer un job.
#[derive(Debug, Deserialize)]
pub struct CreateNotifyJobInput {
pub target_url: String,
pub payload: String,
}
Tests round-trip obligatoires :
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn status_round_trip() {
for (s, expected) in [
("pending", NotifyStatus::Pending),
("running", NotifyStatus::Running),
("success", NotifyStatus::Success),
("failed", NotifyStatus::Failed),
("cancelled", NotifyStatus::Cancelled),
] {
let status = NotifyStatus::from_str(s).expect("doit parser");
assert_eq!(status, expected);
assert_eq!(status.as_str(), s);
}
}
#[test]
fn unknown_status_returns_none() {
assert!(NotifyStatus::from_str("unknown").is_none());
}
}
Checkpoint : cargo test --package gitrust-core status_round_trip passe.
Étape 4 : Implémenter le service¶
Créez crates/gitrust-core/src/services/notify_service.rs. Le service ne fait que orchestrer la DB — aucune logique métier IO ici :
use chrono::Utc;
use sea_orm::{ActiveModelTrait, ActiveValue::Set, DatabaseConnection, EntityTrait};
use uuid::Uuid;
use crate::{
dto::notify_dto::{CreateNotifyJobInput, NotifyStatus},
error::GitrustError,
models::notify_job,
};
pub struct NotifyService;
impl NotifyService {
/// Crée un job en statut `pending` et retourne son ID.
pub async fn create_job(
db: &DatabaseConnection,
owner_id: Uuid,
input: CreateNotifyJobInput,
) -> Result<Uuid, GitrustError> {
let id = Uuid::new_v4();
let now = Utc::now();
let model = notify_job::ActiveModel {
id: Set(id),
owner_id: Set(owner_id),
target_url: Set(input.target_url),
payload: Set(input.payload),
status: Set(NotifyStatus::Pending.as_str().to_owned()),
error_message: Set(None),
started_at: Set(None),
finished_at: Set(None),
created_at: Set(now),
updated_at: Set(now),
};
model.insert(db).await.map_err(GitrustError::Database)?;
Ok(id)
}
/// Passe le job en `running`.
pub async fn mark_running(db: &DatabaseConnection, id: Uuid) -> Result<(), GitrustError> {
Self::update_status(db, id, NotifyStatus::Running, None).await
}
/// Passe le job en `success`.
pub async fn mark_success(db: &DatabaseConnection, id: Uuid) -> Result<(), GitrustError> {
Self::update_status(db, id, NotifyStatus::Success, None).await
}
/// Passe le job en `failed` avec un message d'erreur.
pub async fn mark_failed(
db: &DatabaseConnection,
id: Uuid,
error: &str,
) -> Result<(), GitrustError> {
Self::update_status(db, id, NotifyStatus::Failed, Some(error)).await
}
/// Passe le job en `cancelled`.
pub async fn mark_cancelled(db: &DatabaseConnection, id: Uuid) -> Result<(), GitrustError> {
Self::update_status(db, id, NotifyStatus::Cancelled, None).await
}
/// Vérifie si le job a été annulé (pour le poll du worker).
pub async fn is_cancelled(
db: &DatabaseConnection,
id: Uuid,
) -> Result<bool, GitrustError> {
let job = notify_job::Entity::find_by_id(id)
.one(db)
.await
.map_err(GitrustError::Database)?
.ok_or_else(|| GitrustError::NotFound("job introuvable".into()))?;
Ok(job.status == NotifyStatus::Cancelled.as_str())
}
async fn update_status(
db: &DatabaseConnection,
id: Uuid,
status: NotifyStatus,
error: Option<&str>,
) -> Result<(), GitrustError> {
use sea_orm::ActiveModelTrait;
let now = Utc::now();
let mut model: notify_job::ActiveModel = notify_job::Entity::find_by_id(id)
.one(db)
.await
.map_err(GitrustError::Database)?
.ok_or_else(|| GitrustError::NotFound("job introuvable".into()))?
.into();
model.status = Set(status.as_str().to_owned());
model.updated_at = Set(now);
model.error_message = Set(error.map(str::to_owned));
if status == NotifyStatus::Running {
model.started_at = Set(Some(now));
}
if matches!(status, NotifyStatus::Success | NotifyStatus::Failed | NotifyStatus::Cancelled) {
model.finished_at = Set(Some(now));
}
model.update(db).await.map_err(GitrustError::Database)?;
Ok(())
}
}
Étape 5 : Implémenter le worker¶
Le worker tourne dans un tokio::spawn séparé. Il lit les tâches depuis un canal mpsc et les exécute avec un sémaphore de concurrence :
// crates/gitrust-core/src/services/notify_worker.rs
use std::sync::Arc;
use sea_orm::DatabaseConnection;
use tokio::sync::{mpsc, Semaphore};
use uuid::Uuid;
use super::notify_service::NotifyService;
/// Tâche envoyée via le canal mpsc au worker.
pub struct NotifyTask {
pub job_id: Uuid,
pub target_url: String,
pub payload: String,
}
pub struct NotifyWorkerConfig {
/// Nombre maximal de jobs concurrents.
pub max_concurrent: usize,
/// Timeout en secondes par job.
pub timeout_secs: u64,
}
/// Boucle principale du worker. À appeler avec `tokio::spawn`.
pub async fn run(
db: DatabaseConnection,
mut rx: mpsc::Receiver<NotifyTask>,
config: NotifyWorkerConfig,
) {
let semaphore = Arc::new(Semaphore::new(config.max_concurrent));
while let Some(task) = rx.recv().await {
let permit = semaphore.clone().acquire_owned().await.expect("semaphore fermé");
let db = db.clone();
let timeout = config.timeout_secs;
tokio::spawn(async move {
let _permit = permit; // libéré à la fin du bloc
execute_task(&db, task, timeout).await;
});
}
}
async fn execute_task(db: &DatabaseConnection, task: NotifyTask, timeout_secs: u64) {
// Vérifier si annulé avant de démarrer
match NotifyService::is_cancelled(db, task.job_id).await {
Ok(true) => return,
Err(e) => {
tracing::error!(job_id = %task.job_id, error = %e, "Erreur lecture job");
return;
}
Ok(false) => {}
}
if let Err(e) = NotifyService::mark_running(db, task.job_id).await {
tracing::error!(job_id = %task.job_id, error = %e, "Impossible de marquer running");
return;
}
let result = tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
send_notification(&task.target_url, &task.payload),
)
.await;
match result {
Ok(Ok(())) => {
if let Err(e) = NotifyService::mark_success(db, task.job_id).await {
tracing::error!(job_id = %task.job_id, error = %e, "Impossible de marquer success");
}
}
Ok(Err(e)) => {
let msg = format!("Erreur HTTP: {e}");
if let Err(db_err) = NotifyService::mark_failed(db, task.job_id, &msg).await {
tracing::error!(job_id = %task.job_id, error = %db_err, "Impossible de marquer failed");
}
}
Err(_elapsed) => {
let msg = format!("Timeout après {timeout_secs}s");
if let Err(e) = NotifyService::mark_failed(db, task.job_id, &msg).await {
tracing::error!(job_id = %task.job_id, error = %e, "Impossible de marquer failed (timeout)");
}
}
}
}
async fn send_notification(url: &str, payload: &str) -> Result<(), reqwest::Error> {
let client = reqwest::Client::new();
client
.post(url)
.header("Content-Type", "application/json")
.body(payload.to_owned())
.send()
.await?
.error_for_status()?;
Ok(())
}
Checkpoint : cargo build --package gitrust-core compile sans erreur.
Étape 6 : Ajouter le handler et la route SSE¶
Dans crates/gitrust-web/src/handlers/notify.rs, créez le handler POST qui enqueues le job et redirige, ainsi que le handler SSE :
use axum::{
extract::{Path, State},
response::{sse::{Event, KeepAlive, Sse}, Redirect},
Extension, Form,
};
use sea_orm::DatabaseConnection;
use tokio::time::{interval, Duration};
use tokio_stream::wrappers::IntervalStream;
use tokio_stream::StreamExt;
use uuid::Uuid;
use gitrust_core::{
dto::notify_dto::CreateNotifyJobInput,
services::{notify_service::NotifyService, notify_worker::NotifyTask},
};
use crate::auth::AuthUser;
pub type NotifySender = tokio::sync::mpsc::Sender<NotifyTask>;
/// POST /notify — crée un job et redirige vers la page de suivi.
pub async fn create_notify_job(
State(db): State<DatabaseConnection>,
Extension(tx): Extension<NotifySender>,
user: AuthUser,
Form(input): Form<CreateNotifyJobInput>,
) -> Result<Redirect, crate::error::AppError> {
let job_id = NotifyService::create_job(&db, user.user_id, input.clone()).await?;
// Essai d'envoi dans le canal — si plein, le job reste `pending` jusqu'au
// prochain redémarrage (les jobs `pending` peuvent être re-envoyés au démarrage).
let _ = tx.try_send(NotifyTask {
job_id,
target_url: input.target_url,
payload: input.payload,
});
Ok(Redirect::to(&format!("/notify/{job_id}")))
}
/// GET /notify/:id/stream — SSE : pousse les mises à jour de statut au client.
pub async fn notify_job_stream(
State(db): State<DatabaseConnection>,
Path(id): Path<Uuid>,
) -> Sse<impl tokio_stream::Stream<Item = Result<Event, std::convert::Infallible>>> {
let stream = IntervalStream::new(interval(Duration::from_secs(1)))
.map(move |_| {
let db = db.clone();
async move {
// Poll la DB pour l'état courant
use gitrust_core::models::notify_job;
use sea_orm::EntityTrait;
let job = notify_job::Entity::find_by_id(id)
.one(&db)
.await
.ok()
.flatten();
let data = match job {
Some(j) => format!(
r#"{{"status":"{}","error":{}}}"#,
j.status,
j.error_message
.as_deref()
.map(|e| format!(r#""{}""#, e.replace('"', "\\\"")))
.unwrap_or("null".to_owned())
),
None => r#"{"status":"not_found"}"#.to_owned(),
};
Ok(Event::default().data(data))
}
})
.then(|fut| fut);
Sse::new(stream).keep_alive(KeepAlive::default())
}
Enregistrez les routes dans routes.rs :
.route("/notify", post(handlers::notify::create_notify_job))
.route("/notify/:id", get(handlers::notify::notify_job_page))
.route("/notify/:id/stream", get(handlers::notify::notify_job_stream))
.route("/notify/:id/cancel", post(handlers::notify::cancel_notify_job))
Étape 7 : Intégrer le worker dans main.rs¶
Dans crates/gitrust-web/src/main.rs, initialisez le canal et démarrez le worker :
// Dans la fonction main(), après la construction de l'app :
let (notify_tx, notify_rx) = tokio::sync::mpsc::channel::<NotifyTask>(100);
let notify_db = db.clone();
tokio::spawn(gitrust_core::services::notify_worker::run(
notify_db,
notify_rx,
NotifyWorkerConfig {
max_concurrent: std::env::var("NOTIFY_MAX_CONCURRENT")
.unwrap_or("4".into())
.parse()
.unwrap_or(4),
timeout_secs: std::env::var("NOTIFY_TIMEOUT_SECS")
.unwrap_or("30".into())
.parse()
.unwrap_or(30),
},
));
// Injecter l'émetteur dans le router via Extension
let app = routes::routes(&static_path)
.layer(Extension(notify_tx))
.with_state(db);
Étape 8 : Tests d'intégration¶
Écrivez au minimum ces trois tests dans crates/gitrust-core/src/services/notify_service_test.rs :
#[tokio::test]
async fn create_job_inserts_with_pending_status() {
let db = setup_test_db().await;
let id = NotifyService::create_job(
&db,
Uuid::new_v4(),
CreateNotifyJobInput {
target_url: "https://example.com/hook".into(),
payload: r#"{"event":"test"}"#.into(),
},
)
.await
.expect("create_job doit réussir");
let job = notify_job::Entity::find_by_id(id)
.one(&db)
.await
.unwrap()
.unwrap();
assert_eq!(job.status, "pending");
assert!(job.started_at.is_none());
}
#[tokio::test]
async fn mark_running_sets_started_at() {
let db = setup_test_db().await;
let id = create_test_job(&db).await;
NotifyService::mark_running(&db, id).await.unwrap();
let job = notify_job::Entity::find_by_id(id).one(&db).await.unwrap().unwrap();
assert_eq!(job.status, "running");
assert!(job.started_at.is_some());
}
#[tokio::test]
async fn cancel_before_running_is_terminal() {
let db = setup_test_db().await;
let id = create_test_job(&db).await;
NotifyService::mark_cancelled(&db, id).await.unwrap();
let is_cancelled = NotifyService::is_cancelled(&db, id).await.unwrap();
assert!(is_cancelled);
}
Checkpoint : cargo test --package gitrust-core notify — tous les tests passent.
Récapitulatif¶
- O1 accompli en concevant la table
notify_jobsavec 5 états (pending/running/success/failed/cancelled) et la migration SeaORM correspondante. - O2 accompli en implémentant
notify_worker::runavecSemaphore,tokio::time::timeout, et lecture du flag d'annulation via la DB. - O3 accompli en écrivant les tests
create_job_inserts_with_pending_status,mark_running_sets_started_atetcancel_before_running_is_terminal, tous hermétiques (pas de réseau réel).
Et si ça ne marche pas¶
| Symptôme | Cause probable | Correction |
|---|---|---|
channel closed au démarrage |
Le Receiver est droppé avant le spawn du worker |
Vérifiez que tokio::spawn(run(..., notify_rx, ...)) est appelé avant que notify_rx ne soit droppé |
Le job reste pending indéfiniment |
Le canal est plein ou le worker n'est pas démarré | Ajoutez un log dans execute_task ; vérifiez que tokio::spawn est bien appelé dans main |
| SSE ne reçoit aucun événement | Le Content-Type: text/event-stream n'est pas reconnu |
Vérifiez que le handler retourne bien Sse<...> et non String |
Prochaine étape¶
Vous êtes désormais core contributor. Consultez les how-to pour les recettes quotidiennes :