Python

Orchestrateur de jobs séquentiels avec journal

Enchaîne les étapes d'un pipeline (extract, transform, load, checks) via subprocess, s'arrête à la première erreur et imprime le journal d'exécution : code retour, durée, état par étape.

Cas d'usage

Remplacer un .bat fragile : un orchestrateur lisible qui dit exactement quelle étape a cassé et en combien de temps.

Prérequis

Python 3.9+ (bibliothèque standard)

Python
import subprocess
import time

JOBS = [("extraction", ["python", "etl/extract.py"]),
        ("transformation", ["python", "etl/transform.py"]),
        ("chargement", ["python", "etl/load.py"]),
        ("contrôles", ["python", "etl/checks.py"])]

journal = []
for nom, cmd in JOBS:
    t0 = time.perf_counter()
    r = subprocess.run(cmd, capture_output=True, text=True)
    journal.append((nom, r.returncode, time.perf_counter() - t0))
    if r.returncode != 0:
        print(f"ARRÊT sur « {nom} » : {r.stderr.strip()[:64]}")
        break

print(f"{'étape':<16} {'code':>5} {'durée':>8}  état")
print("-" * 40)
for nom, code, duree in journal:
    print(f"{nom:<16} {code:>5} {duree:>7.1f}s  {'OK' if code == 0 else 'ÉCHEC'}")
print("-" * 40)
complet = len(journal) == len(JOBS) and all(c == 0 for _, c, _ in journal)
print("pipeline TERMINÉ" if complet else "pipeline INTERROMPU — voir l'étape en échec")

Résultat

ARRÊT sur « chargement » : sqlite3.OperationalError: database is locked
étape             code    durée  état
----------------------------------------
extraction           0    12.4s  OK
transformation       0     8.7s  OK
chargement           1     0.3s  ÉCHEC
----------------------------------------
pipeline INTERROMPU — voir l'étape en échec
subprocessPipelineOrchestrationBatch

Snippets liés

Retour au Data Lab