PM: Bottleneck Detection

From OnnoWiki
Revision as of 07:00, 13 September 2025 by Onnowpurbo (talk | contribs) (Created page with "Mantap. Aku siapkan **script siap-pakai** untuk mendeteksi bottleneck dari **file XES terlampir** (`/mnt/data/BPIC_2012_A.xes`) memakai **PM4Py**. Script menghitung: * **Serv...")
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search

Mantap. Aku siapkan **script siap-pakai** untuk mendeteksi bottleneck dari **file XES terlampir** (`/mnt/data/BPIC_2012_A.xes`) memakai **PM4Py**. Script menghitung:

  • **Service time** per aktivitas dari pasangan `lifecycle:start`–`complete`
  • **Waiting time** antar-aktivitas di dalam tiap case
  • Ringkasan metrik (mean/median/p95/count) + **bottleneck score** (gabungan z-score service & waiting)
  • Ekspor ke **CSV** dan tampilkan **TOP-N bottleneck**

> **Install (sekali saja)** > > ```bash > pip install pm4py pandas numpy > ```

---

      1. 1) Script utama — `bottleneck_pm4py.py`

```python

  1. !/usr/bin/env python3
  2. -*- coding: utf-8 -*-

import argparse import sys from pathlib import Path import numpy as np import pandas as pd

from pm4py.objects.log.importer.xes import importer as xes_importer from pm4py import convert_to_dataframe


def pair_start_complete(df: pd.DataFrame) -> pd.DataFrame:

   """Pasangkan start/complete per (case, activity) -> satu baris per eksekusi aktivitas."""
   if "lifecycle:transition" not in df.columns:
       return pd.DataFrame(columns=["case", "activity", "start_time", "complete_time", "service_sec"])
   dfl = df.dropna(subset=["lifecycle:transition"]).copy()
   dfl["transition"] = dfl["lifecycle:transition"].str.lower()
   dfl = dfl[dfl["transition"].isin(["start", "complete"])]
   if dfl.empty:
       return pd.DataFrame(columns=["case", "activity", "start_time", "complete_time", "service_sec"])
   dfl = dfl.sort_values(["case:concept:name", "concept:name", "time:timestamp", "transition"]).copy()
   dfl["start_rank"] = dfl["transition"].eq("start").groupby(
       [dfl["case:concept:name"], dfl["concept:name"]]
   ).cumsum()
   dfl["complete_rank"] = dfl["transition"].eq("complete").groupby(
       [dfl["case:concept:name"], dfl["concept:name"]]
   ).cumsum()
   starts = dfl[dfl["transition"] == "start"].rename(columns={"time:timestamp": "start_time"})
   comps  = dfl[dfl["transition"] == "complete"].rename(columns={"time:timestamp": "complete_time"})
   merged = pd.merge(
       starts"case:concept:name", "concept:name", "start_rank", "start_time",
       comps"case:concept:name", "concept:name", "complete_rank", "complete_time",
       left_on=["case:concept:name", "concept:name", "start_rank"],
       right_on=["case:concept:name", "concept:name", "complete_rank"],
       how="inner",
   ).rename(columns={"case:concept:name": "case", "concept:name": "activity"})
   merged["service_sec"] = (merged["complete_time"] - merged["start_time"]).dt.total_seconds()
   merged = merged[(merged["service_sec"] >= 0) & np.isfinite(merged["service_sec"])]
   return merged"case", "activity", "start_time", "complete_time", "service_sec"


def compute_waiting_times(df: pd.DataFrame, exec_df: pd.DataFrame) -> pd.DataFrame:

   """
   Hitung waiting time antar aktivitas di tiap case.
   - Jika ada start/complete: tunggu = start(curr) - complete(prev)
   - Jika tidak ada lifecycle: tunggu = time(curr) - time(prev)
   """
   has_lifecycle = "lifecycle:transition" in df.columns and \
       df["lifecycle:transition"].str.lower().isin(["start", "complete"]).any()
   rows = []
   if has_lifecycle and not exec_df.empty:
       per_case = exec_df.sort_values(["case", "start_time"])
       for case, g in per_case.groupby("case"):
           g = g.sort_values("start_time")
           prev_complete, prev_act = None, None
           for _, r in g.iterrows():
               if prev_complete is not None:
                   wt = (r["start_time"] - prev_complete).total_seconds()
                   if wt >= 0:
                       rows.append({"case": case, "from_activity": prev_act,
                                    "to_activity": r["activity"], "waiting_sec": wt})
               prev_complete, prev_act = r["complete_time"], r["activity"]
   else:
       # fallback tanpa lifecycle
       df2 = df.sort_values(["case:concept:name", "time:timestamp"])
       for case, g in df2.groupby("case:concept:name"):
           g = g.sort_values("time:timestamp")
           prev_time, prev_act = None, None
           for _, r in g.iterrows():
               if prev_time is not None:
                   wt = (r["time:timestamp"] - prev_time).total_seconds()
                   if wt >= 0:
                       rows.append({"case": case, "from_activity": prev_act,
                                    "to_activity": r["concept:name"], "waiting_sec": wt})
               prev_time, prev_act = r["time:timestamp"], r["concept:name"]
   if not rows:
       return pd.DataFrame(columns=["case", "from_activity", "to_activity", "waiting_sec"])
   w = pd.DataFrame(rows)
   return w"case", "from_activity", "to_activity", "waiting_sec"


def zscore(series: pd.Series) -> pd.Series:

   mu = np.nanmean(series)
   sd = np.nanstd(series, ddof=0)
   if sd == 0 or np.isnan(sd):
       return pd.Series(np.zeros(len(series)), index=series.index)
   return (series - mu) / sd


def main():

   ap = argparse.ArgumentParser(description="Bottleneck Detection from XES using PM4Py")
   ap.add_argument("xes_path", type=str, help="Path ke file .xes")
   ap.add_argument("--top", type=int, default=10, help="Top-N bottleneck yang ditampilkan (default 10)")
   ap.add_argument("--out", type=str, default="bottlenecks_summary.csv", help="Output CSV ringkasan")
   args = ap.parse_args()
   xes_path = Path(args.xes_path)
   if not xes_path.exists():
       print(f"[ERROR] File tidak ditemukan: {xes_path}", file=sys.stderr)
       sys.exit(1)
   # 1) Load XES -> DataFrame
   log = xes_importer.apply(str(xes_path))
   df = convert_to_dataframe(log)
   for c in ["case:concept:name", "concept:name", "time:timestamp"]:
       if c not in df.columns:
           print(f"[ERROR] Kolom wajib hilang di event log: {c}", file=sys.stderr)
           sys.exit(1)
   df["time:timestamp"] = pd.to_datetime(df["time:timestamp"], errors="coerce")
   df = df.dropna(subset=["time:timestamp"]).copy()
   # 2) Service time per aktivitas
   exec_df = pair_start_complete(df)
   # 3) Waiting time antar aktivitas
   wait_df = compute_waiting_times(df, exec_df)
   # 4) Agregasi per aktivitas
   if not exec_df.empty:
       service_stats = exec_df.groupby("activity")["service_sec"].agg(
           service_mean_sec="mean",
           service_median_sec="median",
           service_p95_sec=lambda x: np.nanpercentile(x, 95),
           service_count="count",
       ).reset_index()
   else:
       service_stats = pd.DataFrame(columns=["activity","service_mean_sec","service_median_sec","service_p95_sec","service_count"])
   if not wait_df.empty:
       wait_stats = wait_df.groupby("to_activity")["waiting_sec"].agg(
           wait_mean_sec="mean",
           wait_median_sec="median",
           wait_p95_sec=lambda x: np.nanpercentile(x, 95),
           wait_count="count",
       ).reset_index().rename(columns={"to_activity": "activity"})
   else:
       wait_stats = pd.DataFrame(columns=["activity","wait_mean_sec","wait_median_sec","wait_p95_sec","wait_count"])
   summary = pd.merge(service_stats, wait_stats, on="activity", how="outer").fillna(0)
   # 5) Skor bottleneck (gabungan z-score)
   summary["z_service"]      = zscore(summary["service_mean_sec"])
   summary["z_wait"]         = zscore(summary["wait_mean_sec"])
   summary["z_service_p95"]  = zscore(summary["service_p95_sec"])
   summary["z_wait_p95"]     = zscore(summary["wait_p95_sec"])
   summary["bottleneck_score"] = (
       0.4 * summary["z_service"] +
       0.4 * summary["z_wait"] +
       0.1 * summary["z_service_p95"] +
       0.1 * summary["z_wait_p95"]
   )
   summary_sorted = summary.sort_values("bottleneck_score", ascending=False)
   summary_sorted.to_csv(args.out, index=False)
   # 6) Tampilkan Top-N ringkas
   n = min(args.top, len(summary_sorted))
   print(f"\n=== TOP {n} BOTTLENECK ACTIVITIES ===")
   if n == 0:
       print("Tidak ada aktivitas terdeteksi. Cek struktur log Anda.")
       sys.exit(0)
   for _, r in summary_sorted.head(n).iterrows():
       print(
           f"- {r['activity']}: score={r['bottleneck_score']:.3f} | "
           f"svc_mean={r['service_mean_sec']:.1f}s (p95={r['service_p95_sec']:.1f}s, n={int(r['service_count'])}) | "
           f"wait_mean={r['wait_mean_sec']:.1f}s (p95={r['wait_p95_sec']:.1f}s, n={int(r['wait_count'])})"
       )
   print(f"\nCSV ringkasan disimpan ke: {args.out}")
   print("Prioritaskan aktivitas dengan score & p95 tinggi, terutama jika wait_mean besar (antrian).")


if __name__ == "__main__":

   main()

```

    • Jalankan ke file terlampir** (path sesuai unggahan Anda):

```bash python bottleneck_pm4py.py /mnt/data/BPIC_2012_A.xes --top 15 --out bottlenecks_BPICA.csv ```

---

      1. 2) (Opsional) Cross-check cepat pakai **Performance DFG** PM4Py

Ini alternatif ringkas untuk melihat **mean waiting time** antar-aktivitas (edge) dan **sojourn time** (di node). Cocok untuk sanity check bottleneck transisi.

```python

  1. !/usr/bin/env python3
  2. perf_dfg_quickcheck.py

from pm4py.objects.log.importer.xes import importer as xes_importer from pm4py.algo.discovery.dfg import algorithm as dfg_discovery from pm4py.statistics.sojourn_time.log import get as soj_get import pandas as pd import sys

xes = sys.argv[1] log = xes_importer.apply(xes)

  1. Mean performance DFG (edge durations)

perf_dfg = dfg_discovery.apply(log, variant=dfg_discovery.Variants.PERFORMANCE) df_edges = pd.DataFrame(

   [{"from": a, "to": b, "mean_sec": v} for (a, b), v in perf_dfg.items()]

).sort_values("mean_sec", ascending=False)

  1. Sojourn time per activity (durasi berada di node)

soj = soj_get.apply(log) # returns dict {activity: mean_seconds} df_nodes = pd.DataFrame(

   [{"activity": k, "sojourn_mean_sec": v} for k, v in soj.items()]

).sort_values("sojourn_mean_sec", ascending=False)

df_edges.to_csv("perf_dfg_edges.csv", index=False) df_nodes.to_csv("sojourn_nodes.csv", index=False)

print("Top 10 edges by mean_sec:") print(df_edges.head(10)) print("\nTop 10 activities by sojourn_mean_sec:") print(df_nodes.head(10)) ```

    • Jalankan:**

```bash python perf_dfg_quickcheck.py /mnt/data/BPIC_2012_A.xes ```

---

      1. Catatan penting
  • **Akurasi service time** bergantung pada hadirnya pasangan `lifecycle:start/complete`. Jika dataset hanya punya `complete`, fokuskan interpretasi pada **waiting antar event** (edge) dan **sojourn** (node).
  • **p95** membantu mengungkap **ekor panjang** (spikes jarang tapi berat) yang sering jadi bottleneck meski mean tidak terlalu tinggi.
  • Untuk investigasi mendalam, gabungkan hasil `bottlenecks_summary.csv` dengan **variasi per resource**, **per channel**, atau **per case attribute** (mis. `org:resource`, `org:role`, `application type`, dll.) lalu lakukan **groupby** tambahan.

Kalau mau, saya bisa lanjutkan dengan **visualisasi** (bar chart Top-N) atau **filter per resource**—tinggal sebutkan atribut yang ingin dianalisis.