هماهنگکننده jobهای ترتیبی با ژورنال
مراحل یه پایپلاین (extract، transform، load، checks) رو با subprocess پشتسرهم اجرا میکنه، با اولین خطا وایمیسه و ژورنال اجرا رو چاپ میکنه: کد بازگشت، مدت، وضعیت هر مرحله.
کاربرد
جایگزین یه .bat شکننده: یه هماهنگکنندهی خونا که دقیقاً میگه کدوم مرحله شکست و توی چند ثانیه.
پیشنیازها
Python 3.9+ (bibliothèque standard)
Python
import subprocess
import time
JOBS = [("extraction", ["python", "etl/extract.py"]),
("transformation", ["python", "etl/transform.py"]),
("chargement", ["python", "etl/load.py"]),
("contrôles", ["python", "etl/checks.py"])]
journal = []
for nom, cmd in JOBS:
t0 = time.perf_counter()
r = subprocess.run(cmd, capture_output=True, text=True)
journal.append((nom, r.returncode, time.perf_counter() - t0))
if r.returncode != 0:
print(f"ARRÊT sur « {nom} » : {r.stderr.strip()[:64]}")
break
print(f"{'étape':<16} {'code':>5} {'durée':>8} état")
print("-" * 40)
for nom, code, duree in journal:
print(f"{nom:<16} {code:>5} {duree:>7.1f}s {'OK' if code == 0 else 'ÉCHEC'}")
print("-" * 40)
complet = len(journal) == len(JOBS) and all(c == 0 for _, c, _ in journal)
print("pipeline TERMINÉ" if complet else "pipeline INTERROMPU — voir l'étape en échec")نتیجه
ARRÊT sur « chargement » : sqlite3.OperationalError: database is locked étape code durée état ---------------------------------------- extraction 0 12.4s OK transformation 0 8.7s OK chargement 1 0.3s ÉCHEC ---------------------------------------- pipeline INTERROMPU — voir l'étape en échec
subprocessPipelineOrchestrationBatch