Python

Orquestador de jobs secuenciales con registro

Encadena las etapas de un pipeline (extract, transform, load, checks) mediante subprocess, se detiene en el primer error e imprime el registro de ejecucion: codigo de retorno, duracion y estado por etapa.

Requisitos

Python 3.9+ (bibliothèque standard)

Python
import subprocess
import time

JOBS = [("extraction", ["python", "etl/extract.py"]),
        ("transformation", ["python", "etl/transform.py"]),
        ("chargement", ["python", "etl/load.py"]),
        ("contrôles", ["python", "etl/checks.py"])]

journal = []
for nom, cmd in JOBS:
    t0 = time.perf_counter()
    r = subprocess.run(cmd, capture_output=True, text=True)
    journal.append((nom, r.returncode, time.perf_counter() - t0))
    if r.returncode != 0:
        print(f"ARRÊT sur « {nom} » : {r.stderr.strip()[:64]}")
        break

print(f"{'étape':<16} {'code':>5} {'durée':>8}  état")
print("-" * 40)
for nom, code, duree in journal:
    print(f"{nom:<16} {code:>5} {duree:>7.1f}s  {'OK' if code == 0 else 'ÉCHEC'}")
print("-" * 40)
complet = len(journal) == len(JOBS) and all(c == 0 for _, c, _ in journal)
print("pipeline TERMINÉ" if complet else "pipeline INTERROMPU — voir l'étape en échec")

Resultado

ARRÊT sur « chargement » : sqlite3.OperationalError: database is locked
étape             code    durée  état
----------------------------------------
extraction           0    12.4s  OK
transformation       0     8.7s  OK
chargement           1     0.3s  ÉCHEC
----------------------------------------
pipeline INTERROMPU — voir l'étape en échec
subprocessPipelineOrchestrationBatch

Snippets relacionados

Volver al Data Lab