Orquestador de jobs secuenciales con registro
Encadena las etapas de un pipeline (extract, transform, load, checks) mediante subprocess, se detiene en el primer error e imprime el registro de ejecucion: codigo de retorno, duracion y estado por etapa.
Requisitos
Python 3.9+ (bibliothèque standard)
Python
import subprocess
import time
JOBS = [("extraction", ["python", "etl/extract.py"]),
("transformation", ["python", "etl/transform.py"]),
("chargement", ["python", "etl/load.py"]),
("contrôles", ["python", "etl/checks.py"])]
journal = []
for nom, cmd in JOBS:
t0 = time.perf_counter()
r = subprocess.run(cmd, capture_output=True, text=True)
journal.append((nom, r.returncode, time.perf_counter() - t0))
if r.returncode != 0:
print(f"ARRÊT sur « {nom} » : {r.stderr.strip()[:64]}")
break
print(f"{'étape':<16} {'code':>5} {'durée':>8} état")
print("-" * 40)
for nom, code, duree in journal:
print(f"{nom:<16} {code:>5} {duree:>7.1f}s {'OK' if code == 0 else 'ÉCHEC'}")
print("-" * 40)
complet = len(journal) == len(JOBS) and all(c == 0 for _, c, _ in journal)
print("pipeline TERMINÉ" if complet else "pipeline INTERROMPU — voir l'étape en échec")Resultado
ARRÊT sur « chargement » : sqlite3.OperationalError: database is locked étape code durée état ---------------------------------------- extraction 0 12.4s OK transformation 0 8.7s OK chargement 1 0.3s ÉCHEC ---------------------------------------- pipeline INTERROMPU — voir l'étape en échec
subprocessPipelineOrchestrationBatch