Comprendre le patron worker async de gitrust¶
Ce que vous allez comprendre¶
- Analyser pourquoi les opérations longues (import, export, envoi de mails) ne peuvent pas s'exécuter dans le cycle requête/réponse HTTP.
- Décrire comment le patron worker async de gitrust combine
mpsc,Semaphore, et SSE pour traiter des tâches longues avec retour de progression. - Évaluer les garanties de durabilité offertes par le modèle « état en base + canal en mémoire ».
Le problème concret¶
L'import d'un dépôt GitHub peut prendre 30 secondes. Une requête HTTP a un timeout. Si le serveur redémarre pendant l'import, la tâche est perdue. Si 50 utilisateurs lancent un import simultanément sans limite, le serveur sature.
Le même problème se pose pour l'envoi de mails, la génération de SBOM, ou toute opération dont la durée dépasse quelques secondes.
L'analogie¶
Le patron ressemble à une cuisine de restaurant avec un tableau de commandes :
- Le serveur (handler HTTP) prend la commande, l'inscrit sur le tableau (DB), et dit au client « votre numéro est le 42, suivez l'avancement ici ».
- La cuisine (worker) lit le tableau en continu, choisit la prochaine commande, la prépare derrière un comptoir limité à N plans de travail (Semaphore).
- Le tableau lumineux (SSE) indique l'état en temps réel au client sans qu'il ait besoin de redemander.
Si la cuisine ferme brutalement (crash), les commandes « en cours » sur le tableau sont remises en attente au redémarrage — elles ne disparaissent pas.
Le modèle¶
Vue d'ensemble du patron¶
flowchart LR
Handler[Handler HTTP\nPOST /import]
DB[(Table\nimport_jobs)]
Channel[mpsc::channel\nJobId]
Worker[NotifyWorker\ntokio::spawn]
Semaphore[Semaphore\nmax N slots]
Executor[execute_job()\nlogique métier]
SSE[SSE Handler\nGET /import/:id/stream]
Client[Navigateur\nhx-ext='sse']
Handler -->|"INSERT status=Pending"| DB
Handler -->|"tx.send(job_id)"| Channel
Channel -->|"rx.recv()"| Worker
Worker -->|"acquire()"| Semaphore
Semaphore -->|"permit"| Executor
Executor -->|"UPDATE status=..."| DB
DB -->|"SELECT"| SSE
SSE -->|"data: "| Client
La table de persistance¶
Chaque type de job a sa propre table. Le schéma canonique pour un import :
CREATE TABLE import_jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id),
repository_id UUID REFERENCES repositories(id),
status VARCHAR(20) NOT NULL DEFAULT 'pending',
-- 'pending' | 'running' | 'completed' | 'failed'
source_url TEXT NOT NULL,
error_message TEXT,
progress_pct SMALLINT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ
);
Invariant fondamental : le statut en base est la source de vérité. Si le processus redémarre, un job running sans process vivant est remis à pending au démarrage.
L'état en mémoire : le canal mpsc¶
// crates/gitrust-web/src/workers/import_worker.rs
use tokio::sync::mpsc;
#[derive(Clone)]
pub struct ImportJobSender(pub mpsc::Sender<Uuid>);
pub struct ImportWorker {
db: DatabaseConnection,
rx: mpsc::Receiver<Uuid>,
semaphore: Arc<Semaphore>,
}
impl ImportWorker {
pub fn new(db: DatabaseConnection) -> (Self, ImportJobSender) {
let (tx, rx) = mpsc::channel(100); // buffer de 100 IDs en attente
let semaphore = Arc::new(Semaphore::new(4)); // max 4 imports simultanés
let worker = Self { db, rx, semaphore };
(worker, ImportJobSender(tx))
}
pub async fn run(mut self) {
while let Some(job_id) = self.rx.recv().await {
let db = self.db.clone();
let permit = Arc::clone(&self.semaphore)
.acquire_owned()
.await
.expect("semaphore closed");
tokio::spawn(async move {
let _permit = permit; // libéré quand le spawn se termine
if let Err(e) = execute_import(&db, job_id).await {
tracing::error!(?job_id, error = %e, "import job failed");
let _ = ImportJobRepository::mark_failed(&db, job_id, &e.to_string()).await;
}
});
}
}
}
Le Semaphore borne la concurrence. Même si 100 IDs arrivent dans le canal, seuls 4 execute_import s'exécutent en parallèle. Les autres attendent leur tour dans le canal.
Le handler HTTP — découpler la réponse de l'exécution¶
// crates/gitrust-web/src/routes/import.rs
pub async fn post_import(
State(db): State<DatabaseConnection>,
State(sender): State<ImportJobSender>,
user: AuthUser,
Form(form): Form<ImportForm>,
) -> Result<impl IntoResponse, AppError> {
// 1. Valider et persister
let job = ImportJobRepository::create(&db, user.user_id, &form.source_url).await?;
// 2. Envoyer l'ID dans le canal (non-bloquant grâce au buffer)
sender.0.send(job.id).await.map_err(|_| AppError::Internal("worker unavailable".into()))?;
// 3. Répondre immédiatement avec la page de suivi
Ok(Redirect::to(&format!("/import/{}/progress", job.id)))
}
Le handler retourne avant que l'import ait commencé. Le navigateur est redirigé vers une page de progression.
Le worker — exécution avec transitions d'état¶
async fn execute_import(db: &DatabaseConnection, job_id: Uuid) -> anyhow::Result<()> {
// Transition : pending → running
ImportJobRepository::mark_running(db, job_id).await?;
// Récupérer le job
let job = ImportJobRepository::find(db, job_id).await?
.ok_or_else(|| anyhow!("job not found: {}", job_id))?;
// Étape 1/3 — cloner
ImportJobRepository::update_progress(db, job_id, 10).await?;
let temp_dir = git_clone(&job.source_url).await?;
// Étape 2/3 — créer le dépôt gitrust
ImportJobRepository::update_progress(db, job_id, 50).await?;
let repo = RepositoryService::create_from_import(db, &job, &temp_dir).await?;
// Étape 3/3 — finaliser
ImportJobRepository::update_progress(db, job_id, 90).await?;
finalize_import(db, job.id, repo.id).await?;
// Transition : running → completed
ImportJobRepository::mark_completed(db, job_id, repo.id).await?;
Ok(())
}
Chaque update_progress écrit en base. Si le process redémarre entre deux étapes, la progression est connue — mais le job doit être relancé depuis zéro (pas de reprise partielle dans ce modèle).
SSE — retour de progression vers le navigateur¶
// crates/gitrust-web/src/routes/import.rs
pub async fn stream_import_progress(
Path(job_id): Path<Uuid>,
State(db): State<DatabaseConnection>,
user: AuthUser,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let stream = async_stream::stream! {
loop {
let job = ImportJobRepository::find(&db, job_id).await
.unwrap_or(None);
match job {
None => {
yield Ok(Event::default().data("<p>Job introuvable.</p>"));
break;
}
Some(ref j) if j.status == "completed" => {
yield Ok(Event::default()
.data(format!("<p>Import terminé — <a href='/{}/{}'>Voir le dépôt</a></p>",
j.owner_login, j.repo_slug)));
break;
}
Some(ref j) if j.status == "failed" => {
yield Ok(Event::default()
.data(format!("<p class='text-error'>Erreur : {}</p>",
j.error_message.as_deref().unwrap_or("inconnue"))));
break;
}
Some(ref j) => {
yield Ok(Event::default()
.data(format!("<progress class='progress' value='{}' max='100'></progress>",
j.progress_pct)));
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
}
};
Sse::new(stream).keep_alive(KeepAlive::default())
}
Le template HTMX consomme ce flux :
<!-- templates/import/progress.html -->
<div
hx-ext="sse"
sse-connect="/import/{{ job_id }}/stream"
sse-swap="message"
hx-swap="innerHTML"
id="progress-container"
>
<progress class="progress" value="0" max="100"></progress>
</div>
Récupération au redémarrage¶
Au démarrage de l'application, gitrust remet en pending les jobs bloqués en running :
// crates/gitrust-web/src/main.rs (dans la fonction de démarrage)
async fn recover_stuck_jobs(db: &DatabaseConnection, sender: &ImportJobSender) {
let stuck = ImportJobRepository::find_by_status(db, "running").await
.unwrap_or_default();
for job in stuck {
tracing::warn!(?job.id, "recovering stuck import job");
let _ = ImportJobRepository::reset_to_pending(db, job.id).await;
let _ = sender.0.send(job.id).await;
}
}
Cette fonction s'exécute une fois au démarrage, avant que le serveur HTTP accepte des connexions.
Enregistrement dans main.rs¶
// crates/gitrust-web/src/main.rs
let (import_worker, import_sender) = ImportWorker::new(db.clone());
// Démarrer le worker dans un task dédié
tokio::spawn(async move { import_worker.run().await });
// Récupérer les jobs bloqués
recover_stuck_jobs(&db, &import_sender).await;
// Injecter le sender dans l'état Axum
let app = Router::new()
// ... routes ...
.with_state(AppState { db, import_sender, /* ... */ });
Décisions et compromis¶
Durabilité : base de données vs queue dédiée¶
| Approche | Durabilité | Complexité opérationnelle |
|---|---|---|
| Table SQL (gitrust) | Au niveau PostgreSQL | Nulle (déjà présent) |
| Redis + Bull/Sidekiq | Haute (AOF/RDB) | Redis à opérer |
| RabbitMQ / NATS | Très haute | Broker supplémentaire |
| In-memory seulement | Nulle (perdu au restart) | Nulle |
Gitrust choisit la table SQL : la durabilité est suffisante (perte maximale = 1 transaction), et PostgreSQL est déjà requis. Un broker externe serait disproportionné pour une forge auto-hébergée à petite échelle.
Concurrence : Semaphore vs pool de workers¶
Le Semaphore(N) est plus simple qu'un pool de threads ou de tâches fixes. Il permet au runtime Tokio de gérer l'ordonnancement. L'inconvénient : si un job prend 30 secondes et que N=4, les 5e et 6e jobs attendent dans le canal, pas dans un pool observable.
Pour des jobs très longs (> 5 minutes), un pool de workers avec heartbeat serait plus robuste — mais ce cas ne se présente pas dans gitrust aujourd'hui.
SSE vs WebSocket vs polling¶
| Méthode | Cas d'usage |
|---|---|
Polling HTMX (hx-trigger="every 2s") |
Statut simple, peu de mises à jour |
| SSE (gitrust) | Flux unidirectionnel continu (logs, progression) |
| WebSocket | Bidirectionnel (chat, collaboration temps réel) |
SSE est suffisant pour tous les workers actuels de gitrust (import, CI logs). WebSocket ajouterait une complexité d'état bidirectionnel non nécessaire.
Vérifier votre compréhension¶
-
Le serveur gitrust redémarre pendant l'import d'un dépôt. Le job était en statut
runningavecprogress_pct = 60. Que se passe-t-il exactement au redémarrage ? L'utilisateur verra-t-il la progression reprendre à 60 % ou à 0 % ? -
Le
Semaphoreest initialisé à 4 slots. 10 utilisateurs lancent un import simultanément. Combien detokio::spawns'exécutent en parallèle ? Où attendent les 6 jobs restants ? Que se passe-t-il si le serveur redémarre à cet instant ?