Difference between revisions of "PM: Bottleneck Detection"

From OnnoWiki
Jump to navigation Jump to search
(Created page with "Mantap. Aku siapkan **script siap-pakai** untuk mendeteksi bottleneck dari **file XES terlampir** (`/mnt/data/BPIC_2012_A.xes`) memakai **PM4Py**. Script menghitung: * **Serv...")
 
 
Line 1: Line 1:
Mantap. Aku siapkan **script siap-pakai** untuk mendeteksi bottleneck dari **file XES terlampir** (`/mnt/data/BPIC_2012_A.xes`) memakai **PM4Py**.
+
Mantap. Aku siapkan '''script siap-pakai''' untuk mendeteksi bottleneck dari '''file XES terlampir''' (`/mnt/data/BPIC_2012_A.xes`) memakai '''PM4Py'''.
 
Script menghitung:
 
Script menghitung:
  
* **Service time** per aktivitas dari pasangan `lifecycle:start`–`complete`
+
* '''Service time''' per aktivitas dari pasangan `lifecycle:start`–`complete`
* **Waiting time** antar-aktivitas di dalam tiap case
+
* '''Waiting time''' antar-aktivitas di dalam tiap case
* Ringkasan metrik (mean/median/p95/count) + **bottleneck score** (gabungan z-score service & waiting)
+
* Ringkasan metrik (mean/median/p95/count) + '''bottleneck score''' (gabungan z-score service & waiting)
* Ekspor ke **CSV** dan tampilkan **TOP-N bottleneck**
+
* Ekspor ke '''CSV''' dan tampilkan '''TOP-N bottleneck'''
  
> **Install (sekali saja)**
+
==Install (sekali saja)==
>
 
> ```bash
 
> pip install pm4py pandas numpy
 
> ```
 
  
---
+
pip install pm4py pandas numpy
  
### 1) Script utama — `bottleneck_pm4py.py`
 
  
```python
+
==1) Script utama — `bottleneck_pm4py.py`==
#!/usr/bin/env python3
 
# -*- coding: utf-8 -*-
 
  
import argparse
+
#!/usr/bin/env python3
import sys
+
# -*- coding: utf-8 -*-
from pathlib import Path
+
import numpy as np
+
import argparse
import pandas as pd
+
import sys
 
+
from pathlib import Path
from pm4py.objects.log.importer.xes import importer as xes_importer
+
import numpy as np
from pm4py import convert_to_dataframe
+
import pandas as pd
 
+
 
+
from pm4py.objects.log.importer.xes import importer as xes_importer
def pair_start_complete(df: pd.DataFrame) -> pd.DataFrame:
+
from pm4py import convert_to_dataframe
    """Pasangkan start/complete per (case, activity) -> satu baris per eksekusi aktivitas."""
+
 
    if "lifecycle:transition" not in df.columns:
+
        return pd.DataFrame(columns=["case", "activity", "start_time", "complete_time", "service_sec"])
+
def pair_start_complete(df: pd.DataFrame) -> pd.DataFrame:
 
+
    """Pasangkan start/complete per (case, activity) -> satu baris per eksekusi aktivitas."""
    dfl = df.dropna(subset=["lifecycle:transition"]).copy()
+
    if "lifecycle:transition" not in df.columns:
    dfl["transition"] = dfl["lifecycle:transition"].str.lower()
+
        return pd.DataFrame(columns=["case", "activity", "start_time", "complete_time", "service_sec"])
    dfl = dfl[dfl["transition"].isin(["start", "complete"])]
+
    if dfl.empty:
+
    dfl = df.dropna(subset=["lifecycle:transition"]).copy()
        return pd.DataFrame(columns=["case", "activity", "start_time", "complete_time", "service_sec"])
+
    dfl["transition"] = dfl["lifecycle:transition"].str.lower()
 
+
    dfl = dfl[dfl["transition"].isin(["start", "complete"])]
    dfl = dfl.sort_values(["case:concept:name", "concept:name", "time:timestamp", "transition"]).copy()
+
    if dfl.empty:
    dfl["start_rank"] = dfl["transition"].eq("start").groupby(
+
        return pd.DataFrame(columns=["case", "activity", "start_time", "complete_time", "service_sec"])  
        [dfl["case:concept:name"], dfl["concept:name"]]
+
    ).cumsum()
+
    dfl = dfl.sort_values(["case:concept:name", "concept:name", "time:timestamp", "transition"]).copy()
    dfl["complete_rank"] = dfl["transition"].eq("complete").groupby(
+
    dfl["start_rank"] = dfl["transition"].eq("start").groupby(
        [dfl["case:concept:name"], dfl["concept:name"]]
+
        [dfl["case:concept:name"], dfl["concept:name"]]
    ).cumsum()
+
    ).cumsum()
 
+
    dfl["complete_rank"] = dfl["transition"].eq("complete").groupby(
    starts = dfl[dfl["transition"] == "start"].rename(columns={"time:timestamp": "start_time"})
+
        [dfl["case:concept:name"], dfl["concept:name"]]
    comps  = dfl[dfl["transition"] == "complete"].rename(columns={"time:timestamp": "complete_time"})
+
    ).cumsum()
 
+
    merged = pd.merge(
+
    starts = dfl[dfl["transition"] == "start"].rename(columns={"time:timestamp": "start_time"})
        starts[["case:concept:name", "concept:name", "start_rank", "start_time"]],
+
    comps  = dfl[dfl["transition"] == "complete"].rename(columns={"time:timestamp": "complete_time"})
        comps[["case:concept:name", "concept:name", "complete_rank", "complete_time"]],
+
        left_on=["case:concept:name", "concept:name", "start_rank"],
+
    merged = pd.merge(
        right_on=["case:concept:name", "concept:name", "complete_rank"],
+
        starts[["case:concept:name", "concept:name", "start_rank", "start_time"]],
        how="inner",
+
        comps[["case:concept:name", "concept:name", "complete_rank", "complete_time"]],
    ).rename(columns={"case:concept:name": "case", "concept:name": "activity"})
+
        left_on=["case:concept:name", "concept:name", "start_rank"],
 
+
        right_on=["case:concept:name", "concept:name", "complete_rank"],
    merged["service_sec"] = (merged["complete_time"] - merged["start_time"]).dt.total_seconds()
+
        how="inner",
    merged = merged[(merged["service_sec"] >= 0) & np.isfinite(merged["service_sec"])]
+
    ).rename(columns={"case:concept:name": "case", "concept:name": "activity"})
    return merged[["case", "activity", "start_time", "complete_time", "service_sec"]]
+
 
+
    merged["service_sec"] = (merged["complete_time"] - merged["start_time"]).dt.total_seconds()
 
+
    merged = merged[(merged["service_sec"] >= 0) & np.isfinite(merged["service_sec"])]
def compute_waiting_times(df: pd.DataFrame, exec_df: pd.DataFrame) -> pd.DataFrame:
+
    return merged[["case", "activity", "start_time", "complete_time", "service_sec"]]
    """
+
    Hitung waiting time antar aktivitas di tiap case.
+
    - Jika ada start/complete: tunggu = start(curr) - complete(prev)
+
def compute_waiting_times(df: pd.DataFrame, exec_df: pd.DataFrame) -> pd.DataFrame:
    - Jika tidak ada lifecycle: tunggu = time(curr) - time(prev)
+
    """
    """
+
    Hitung waiting time antar aktivitas di tiap case.
    has_lifecycle = "lifecycle:transition" in df.columns and \
+
    - Jika ada start/complete: tunggu = start(curr) - complete(prev)
        df["lifecycle:transition"].str.lower().isin(["start", "complete"]).any()
+
    - Jika tidak ada lifecycle: tunggu = time(curr) - time(prev)
 
+
    """
    rows = []
+
    has_lifecycle = "lifecycle:transition" in df.columns and \
    if has_lifecycle and not exec_df.empty:
+
        df["lifecycle:transition"].str.lower().isin(["start", "complete"]).any()  
        per_case = exec_df.sort_values(["case", "start_time"])
+
        for case, g in per_case.groupby("case"):
+
    rows = []
            g = g.sort_values("start_time")
+
    if has_lifecycle and not exec_df.empty:
            prev_complete, prev_act = None, None
+
        per_case = exec_df.sort_values(["case", "start_time"])
            for _, r in g.iterrows():
+
        for case, g in per_case.groupby("case"):
                if prev_complete is not None:
+
            g = g.sort_values("start_time")
                    wt = (r["start_time"] - prev_complete).total_seconds()
+
            prev_complete, prev_act = None, None
                    if wt >= 0:
+
            for _, r in g.iterrows():
                        rows.append({"case": case, "from_activity": prev_act,
+
                if prev_complete is not None:
                                    "to_activity": r["activity"], "waiting_sec": wt})
+
                    wt = (r["start_time"] - prev_complete).total_seconds()
                prev_complete, prev_act = r["complete_time"], r["activity"]
+
                    if wt >= 0:
    else:
+
                        rows.append({"case": case, "from_activity": prev_act,
        # fallback tanpa lifecycle
+
                                      "to_activity": r["activity"], "waiting_sec": wt})
        df2 = df.sort_values(["case:concept:name", "time:timestamp"])
+
                prev_complete, prev_act = r["complete_time"], r["activity"]
        for case, g in df2.groupby("case:concept:name"):
+
    else:
            g = g.sort_values("time:timestamp")
+
        # fallback tanpa lifecycle
            prev_time, prev_act = None, None
+
        df2 = df.sort_values(["case:concept:name", "time:timestamp"])
            for _, r in g.iterrows():
+
        for case, g in df2.groupby("case:concept:name"):
                if prev_time is not None:
+
            g = g.sort_values("time:timestamp")
                    wt = (r["time:timestamp"] - prev_time).total_seconds()
+
            prev_time, prev_act = None, None
                    if wt >= 0:
+
            for _, r in g.iterrows():
                        rows.append({"case": case, "from_activity": prev_act,
+
                if prev_time is not None:
                                    "to_activity": r["concept:name"], "waiting_sec": wt})
+
                    wt = (r["time:timestamp"] - prev_time).total_seconds()
                prev_time, prev_act = r["time:timestamp"], r["concept:name"]
+
                    if wt >= 0:
 
+
                        rows.append({"case": case, "from_activity": prev_act,
    if not rows:
+
                                      "to_activity": r["concept:name"], "waiting_sec": wt})
        return pd.DataFrame(columns=["case", "from_activity", "to_activity", "waiting_sec"])
+
                prev_time, prev_act = r["time:timestamp"], r["concept:name"]  
    w = pd.DataFrame(rows)
+
    return w[["case", "from_activity", "to_activity", "waiting_sec"]]
+
    if not rows:
 
+
        return pd.DataFrame(columns=["case", "from_activity", "to_activity", "waiting_sec"])
 
+
    w = pd.DataFrame(rows)
def zscore(series: pd.Series) -> pd.Series:
+
    return w[["case", "from_activity", "to_activity", "waiting_sec"]]
    mu = np.nanmean(series)
+
 
    sd = np.nanstd(series, ddof=0)
+
    if sd == 0 or np.isnan(sd):
+
def zscore(series: pd.Series) -> pd.Series:
        return pd.Series(np.zeros(len(series)), index=series.index)
+
    mu = np.nanmean(series)
    return (series - mu) / sd
+
    sd = np.nanstd(series, ddof=0)
 
+
    if sd == 0 or np.isnan(sd):
 
+
        return pd.Series(np.zeros(len(series)), index=series.index)
def main():
+
    return (series - mu) / sd
    ap = argparse.ArgumentParser(description="Bottleneck Detection from XES using PM4Py")
+
 
    ap.add_argument("xes_path", type=str, help="Path ke file .xes")
+
    ap.add_argument("--top", type=int, default=10, help="Top-N bottleneck yang ditampilkan (default 10)")
+
def main():
    ap.add_argument("--out", type=str, default="bottlenecks_summary.csv", help="Output CSV ringkasan")
+
    ap = argparse.ArgumentParser(description="Bottleneck Detection from XES using PM4Py")
    args = ap.parse_args()
+
    ap.add_argument("xes_path", type=str, help="Path ke file .xes")
 
+
    ap.add_argument("--top", type=int, default=10, help="Top-N bottleneck yang ditampilkan (default 10)")
    xes_path = Path(args.xes_path)
+
    ap.add_argument("--out", type=str, default="bottlenecks_summary.csv", help="Output CSV ringkasan")
    if not xes_path.exists():
+
    args = ap.parse_args()
        print(f"[ERROR] File tidak ditemukan: {xes_path}", file=sys.stderr)
+
        sys.exit(1)
+
    xes_path = Path(args.xes_path)
 
+
    if not xes_path.exists():
    # 1) Load XES -> DataFrame
+
        print(f"[ERROR] File tidak ditemukan: {xes_path}", file=sys.stderr)
    log = xes_importer.apply(str(xes_path))
+
        sys.exit(1)  
    df = convert_to_dataframe(log)
+
 
+
    # 1) Load XES -> DataFrame
    for c in ["case:concept:name", "concept:name", "time:timestamp"]:
+
    log = xes_importer.apply(str(xes_path))
        if c not in df.columns:
+
    df = convert_to_dataframe(log)  
            print(f"[ERROR] Kolom wajib hilang di event log: {c}", file=sys.stderr)
+
            sys.exit(1)
+
    for c in ["case:concept:name", "concept:name", "time:timestamp"]:
 
+
        if c not in df.columns:
    df["time:timestamp"] = pd.to_datetime(df["time:timestamp"], errors="coerce")
+
            print(f"[ERROR] Kolom wajib hilang di event log: {c}", file=sys.stderr)
    df = df.dropna(subset=["time:timestamp"]).copy()
+
            sys.exit(1)
 
+
    # 2) Service time per aktivitas
+
    df["time:timestamp"] = pd.to_datetime(df["time:timestamp"], errors="coerce")
    exec_df = pair_start_complete(df)
+
    df = df.dropna(subset=["time:timestamp"]).copy()
 
+
    # 3) Waiting time antar aktivitas
+
    # 2) Service time per aktivitas
    wait_df = compute_waiting_times(df, exec_df)
+
    exec_df = pair_start_complete(df)
 
+
    # 4) Agregasi per aktivitas
+
    # 3) Waiting time antar aktivitas
    if not exec_df.empty:
+
    wait_df = compute_waiting_times(df, exec_df)
        service_stats = exec_df.groupby("activity")["service_sec"].agg(
+
            service_mean_sec="mean",
+
    # 4) Agregasi per aktivitas
            service_median_sec="median",
+
    if not exec_df.empty:
            service_p95_sec=lambda x: np.nanpercentile(x, 95),
+
        service_stats = exec_df.groupby("activity")["service_sec"].agg(
            service_count="count",
+
            service_mean_sec="mean",
        ).reset_index()
+
            service_median_sec="median",
    else:
+
            service_p95_sec=lambda x: np.nanpercentile(x, 95),
        service_stats = pd.DataFrame(columns=["activity","service_mean_sec","service_median_sec","service_p95_sec","service_count"])
+
            service_count="count",
 
+
        ).reset_index()
    if not wait_df.empty:
+
    else:
        wait_stats = wait_df.groupby("to_activity")["waiting_sec"].agg(
+
        service_stats = pd.DataFrame(columns=["activity","service_mean_sec","service_median_sec","service_p95_sec","service_count"])
            wait_mean_sec="mean",
+
            wait_median_sec="median",
+
    if not wait_df.empty:
            wait_p95_sec=lambda x: np.nanpercentile(x, 95),
+
        wait_stats = wait_df.groupby("to_activity")["waiting_sec"].agg(
            wait_count="count",
+
            wait_mean_sec="mean",
        ).reset_index().rename(columns={"to_activity": "activity"})
+
            wait_median_sec="median",
    else:
+
            wait_p95_sec=lambda x: np.nanpercentile(x, 95),
        wait_stats = pd.DataFrame(columns=["activity","wait_mean_sec","wait_median_sec","wait_p95_sec","wait_count"])
+
            wait_count="count",
 
+
        ).reset_index().rename(columns={"to_activity": "activity"})
    summary = pd.merge(service_stats, wait_stats, on="activity", how="outer").fillna(0)
+
    else:
 
+
        wait_stats = pd.DataFrame(columns=["activity","wait_mean_sec","wait_median_sec","wait_p95_sec","wait_count"])
    # 5) Skor bottleneck (gabungan z-score)
+
    summary["z_service"]      = zscore(summary["service_mean_sec"])
+
    summary = pd.merge(service_stats, wait_stats, on="activity", how="outer").fillna(0)
    summary["z_wait"]        = zscore(summary["wait_mean_sec"])
+
    summary["z_service_p95"]  = zscore(summary["service_p95_sec"])
+
    # 5) Skor bottleneck (gabungan z-score)
    summary["z_wait_p95"]    = zscore(summary["wait_p95_sec"])
+
    summary["z_service"]      = zscore(summary["service_mean_sec"])
 
+
    summary["z_wait"]        = zscore(summary["wait_mean_sec"])
    summary["bottleneck_score"] = (
+
    summary["z_service_p95"]  = zscore(summary["service_p95_sec"])
        0.4 * summary["z_service"] +
+
    summary["z_wait_p95"]    = zscore(summary["wait_p95_sec"])
        0.4 * summary["z_wait"] +
+
        0.1 * summary["z_service_p95"] +
+
    summary["bottleneck_score"] = (
        0.1 * summary["z_wait_p95"]
+
        0.4 * summary["z_service"] +
    )
+
        0.4 * summary["z_wait"] +
 
+
        0.1 * summary["z_service_p95"] +
    summary_sorted = summary.sort_values("bottleneck_score", ascending=False)
+
        0.1 * summary["z_wait_p95"]
    summary_sorted.to_csv(args.out, index=False)
+
    )
 
+
    # 6) Tampilkan Top-N ringkas
+
    summary_sorted = summary.sort_values("bottleneck_score", ascending=False)
    n = min(args.top, len(summary_sorted))
+
    summary_sorted.to_csv(args.out, index=False)
    print(f"\n=== TOP {n} BOTTLENECK ACTIVITIES ===")
+
    if n == 0:
+
    # 6) Tampilkan Top-N ringkas
        print("Tidak ada aktivitas terdeteksi. Cek struktur log Anda.")
+
    n = min(args.top, len(summary_sorted))
        sys.exit(0)
+
    print(f"\n=== TOP {n} BOTTLENECK ACTIVITIES ===")
 
+
    if n == 0:
    for _, r in summary_sorted.head(n).iterrows():
+
        print("Tidak ada aktivitas terdeteksi. Cek struktur log Anda.")
        print(
+
        sys.exit(0)
            f"- {r['activity']}: score={r['bottleneck_score']:.3f} | "
+
            f"svc_mean={r['service_mean_sec']:.1f}s (p95={r['service_p95_sec']:.1f}s, n={int(r['service_count'])}) | "
+
    for _, r in summary_sorted.head(n).iterrows():
            f"wait_mean={r['wait_mean_sec']:.1f}s (p95={r['wait_p95_sec']:.1f}s, n={int(r['wait_count'])})"
+
        print(
        )
+
            f"- {r['activity']}: score={r['bottleneck_score']:.3f} | "
 
+
            f"svc_mean={r['service_mean_sec']:.1f}s (p95={r['service_p95_sec']:.1f}s, n={int(r['service_count'])}) | "
    print(f"\nCSV ringkasan disimpan ke: {args.out}")
+
            f"wait_mean={r['wait_mean_sec']:.1f}s (p95={r['wait_p95_sec']:.1f}s, n={int(r['wait_count'])})"
    print("Prioritaskan aktivitas dengan score & p95 tinggi, terutama jika wait_mean besar (antrian).")
+
        )
 
+
 
+
    print(f"\nCSV ringkasan disimpan ke: {args.out}")
if __name__ == "__main__":
+
    print("Prioritaskan aktivitas dengan score & p95 tinggi, terutama jika wait_mean besar (antrian).")
 +
 +
 +
if __name__ == "__main__":
 
     main()
 
     main()
```
 
 
**Jalankan ke file terlampir** (path sesuai unggahan Anda):
 
 
```bash
 
python bottleneck_pm4py.py /mnt/data/BPIC_2012_A.xes --top 15 --out bottlenecks_BPICA.csv
 
```
 
 
---
 
 
### 2) (Opsional) Cross-check cepat pakai **Performance DFG** PM4Py
 
 
Ini alternatif ringkas untuk melihat **mean waiting time** antar-aktivitas (edge) dan **sojourn time** (di node). Cocok untuk sanity check bottleneck transisi.
 
 
```python
 
#!/usr/bin/env python3
 
# perf_dfg_quickcheck.py
 
from pm4py.objects.log.importer.xes import importer as xes_importer
 
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
 
from pm4py.statistics.sojourn_time.log import get as soj_get
 
import pandas as pd
 
import sys
 
  
xes = sys.argv[1]
+
'''Jalankan ke file terlampir''' (path sesuai unggahan Anda):
log = xes_importer.apply(xes)
 
  
# Mean performance DFG (edge durations)
+
python bottleneck_pm4py.py /mnt/data/BPIC_2012_A.xes --top 15 --out bottlenecks_BPICA.csv
perf_dfg = dfg_discovery.apply(log, variant=dfg_discovery.Variants.PERFORMANCE)
 
df_edges = pd.DataFrame(
 
    [{"from": a, "to": b, "mean_sec": v} for (a, b), v in perf_dfg.items()]
 
).sort_values("mean_sec", ascending=False)
 
  
# Sojourn time per activity (durasi berada di node)
 
soj = soj_get.apply(log)  # returns dict {activity: mean_seconds}
 
df_nodes = pd.DataFrame(
 
    [{"activity": k, "sojourn_mean_sec": v} for k, v in soj.items()]
 
).sort_values("sojourn_mean_sec", ascending=False)
 
  
df_edges.to_csv("perf_dfg_edges.csv", index=False)
+
==2) (Opsional) Cross-check cepat pakai '''Performance DFG''' PM4Py==
df_nodes.to_csv("sojourn_nodes.csv", index=False)
 
  
print("Top 10 edges by mean_sec:")
+
Ini alternatif ringkas untuk melihat '''mean waiting time''' antar-aktivitas (edge) dan '''sojourn time''' (di node). Cocok untuk sanity check bottleneck transisi.
print(df_edges.head(10))
 
print("\nTop 10 activities by sojourn_mean_sec:")
 
print(df_nodes.head(10))
 
```
 
  
**Jalankan:**
+
#!/usr/bin/env python3
 +
# perf_dfg_quickcheck.py
 +
from pm4py.objects.log.importer.xes import importer as xes_importer
 +
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
 +
from pm4py.statistics.sojourn_time.log import get as soj_get
 +
import pandas as pd
 +
import sys
 +
 +
xes = sys.argv[1]
 +
log = xes_importer.apply(xes)
 +
 +
# Mean performance DFG (edge durations)
 +
perf_dfg = dfg_discovery.apply(log, variant=dfg_discovery.Variants.PERFORMANCE)
 +
df_edges = pd.DataFrame(
 +
    [{"from": a, "to": b, "mean_sec": v} for (a, b), v in perf_dfg.items()]
 +
).sort_values("mean_sec", ascending=False)
 +
 +
# Sojourn time per activity (durasi berada di node)
 +
soj = soj_get.apply(log)  # returns dict {activity: mean_seconds}
 +
df_nodes = pd.DataFrame(
 +
    [{"activity": k, "sojourn_mean_sec": v} for k, v in soj.items()]
 +
).sort_values("sojourn_mean_sec", ascending=False)
 +
 +
df_edges.to_csv("perf_dfg_edges.csv", index=False)
 +
df_nodes.to_csv("sojourn_nodes.csv", index=False)
 +
 +
print("Top 10 edges by mean_sec:")
 +
print(df_edges.head(10))
 +
print("\nTop 10 activities by sojourn_mean_sec:")
 +
print(df_nodes.head(10))
  
```bash
+
'''Jalankan:'''
python perf_dfg_quickcheck.py /mnt/data/BPIC_2012_A.xes
 
```
 
  
---
+
python perf_dfg_quickcheck.py /mnt/data/BPIC_2012_A.xes
  
### Catatan penting
 
  
* **Akurasi service time** bergantung pada hadirnya pasangan `lifecycle:start/complete`. Jika dataset hanya punya `complete`, fokuskan interpretasi pada **waiting antar event** (edge) dan **sojourn** (node).
+
==Catatan penting==
* **p95** membantu mengungkap **ekor panjang** (spikes jarang tapi berat) yang sering jadi bottleneck meski mean tidak terlalu tinggi.
 
* Untuk investigasi mendalam, gabungkan hasil `bottlenecks_summary.csv` dengan **variasi per resource**, **per channel**, atau **per case attribute** (mis. `org:resource`, `org:role`, `application type`, dll.) lalu lakukan **groupby** tambahan.
 
  
Kalau mau, saya bisa lanjutkan dengan **visualisasi** (bar chart Top-N) atau **filter per resource**—tinggal sebutkan atribut yang ingin dianalisis.
+
* '''Akurasi service time''' bergantung pada hadirnya pasangan `lifecycle:start/complete`. Jika dataset hanya punya `complete`, fokuskan interpretasi pada '''waiting antar event''' (edge) dan '''sojourn''' (node).
 +
* '''p95''' membantu mengungkap '''ekor panjang''' (spikes jarang tapi berat) yang sering jadi bottleneck meski mean tidak terlalu tinggi.
 +
* Untuk investigasi mendalam, gabungkan hasil `bottlenecks_summary.csv` dengan '''variasi per resource''', '''per channel''', atau '''per case attribute''' (mis. `org:resource`, `org:role`, `application type`, dll.) lalu lakukan '''groupby''' tambahan.

Latest revision as of 15:52, 13 September 2025

Mantap. Aku siapkan script siap-pakai untuk mendeteksi bottleneck dari file XES terlampir (`/mnt/data/BPIC_2012_A.xes`) memakai PM4Py. Script menghitung:

  • Service time per aktivitas dari pasangan `lifecycle:start`–`complete`
  • Waiting time antar-aktivitas di dalam tiap case
  • Ringkasan metrik (mean/median/p95/count) + bottleneck score (gabungan z-score service & waiting)
  • Ekspor ke CSV dan tampilkan TOP-N bottleneck

Install (sekali saja)

pip install pm4py pandas numpy


1) Script utama — `bottleneck_pm4py.py`

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
import sys
from pathlib import Path
import numpy as np
import pandas as pd

from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py import convert_to_dataframe
 

def pair_start_complete(df: pd.DataFrame) -> pd.DataFrame:
    """Pasangkan start/complete per (case, activity) -> satu baris per eksekusi aktivitas."""
    if "lifecycle:transition" not in df.columns:
        return pd.DataFrame(columns=["case", "activity", "start_time", "complete_time", "service_sec"])  

    dfl = df.dropna(subset=["lifecycle:transition"]).copy()
    dfl["transition"] = dfl["lifecycle:transition"].str.lower()
    dfl = dfl[dfl["transition"].isin(["start", "complete"])]
    if dfl.empty:
        return pd.DataFrame(columns=["case", "activity", "start_time", "complete_time", "service_sec"]) 

    dfl = dfl.sort_values(["case:concept:name", "concept:name", "time:timestamp", "transition"]).copy()
    dfl["start_rank"] = dfl["transition"].eq("start").groupby(
        [dfl["case:concept:name"], dfl["concept:name"]]
    ).cumsum()
    dfl["complete_rank"] = dfl["transition"].eq("complete").groupby(
        [dfl["case:concept:name"], dfl["concept:name"]]
    ).cumsum()

    starts = dfl[dfl["transition"] == "start"].rename(columns={"time:timestamp": "start_time"})
    comps  = dfl[dfl["transition"] == "complete"].rename(columns={"time:timestamp": "complete_time"})

    merged = pd.merge(
        starts"case:concept:name", "concept:name", "start_rank", "start_time",
        comps"case:concept:name", "concept:name", "complete_rank", "complete_time",
        left_on=["case:concept:name", "concept:name", "start_rank"],
        right_on=["case:concept:name", "concept:name", "complete_rank"],
        how="inner",
    ).rename(columns={"case:concept:name": "case", "concept:name": "activity"})

    merged["service_sec"] = (merged["complete_time"] - merged["start_time"]).dt.total_seconds()
    merged = merged[(merged["service_sec"] >= 0) & np.isfinite(merged["service_sec"])]
    return merged"case", "activity", "start_time", "complete_time", "service_sec"


def compute_waiting_times(df: pd.DataFrame, exec_df: pd.DataFrame) -> pd.DataFrame:
    """
    Hitung waiting time antar aktivitas di tiap case.
    - Jika ada start/complete: tunggu = start(curr) - complete(prev)
    - Jika tidak ada lifecycle: tunggu = time(curr) - time(prev)
    """
    has_lifecycle = "lifecycle:transition" in df.columns and \
        df["lifecycle:transition"].str.lower().isin(["start", "complete"]).any() 

    rows = []
    if has_lifecycle and not exec_df.empty:
        per_case = exec_df.sort_values(["case", "start_time"])
        for case, g in per_case.groupby("case"):
            g = g.sort_values("start_time")
            prev_complete, prev_act = None, None
            for _, r in g.iterrows():
                if prev_complete is not None:
                    wt = (r["start_time"] - prev_complete).total_seconds()
                    if wt >= 0:
                        rows.append({"case": case, "from_activity": prev_act,
                                     "to_activity": r["activity"], "waiting_sec": wt})
                prev_complete, prev_act = r["complete_time"], r["activity"]
    else:
        # fallback tanpa lifecycle
        df2 = df.sort_values(["case:concept:name", "time:timestamp"])
        for case, g in df2.groupby("case:concept:name"):
            g = g.sort_values("time:timestamp")
            prev_time, prev_act = None, None
            for _, r in g.iterrows():
                if prev_time is not None:
                    wt = (r["time:timestamp"] - prev_time).total_seconds()
                    if wt >= 0:
                        rows.append({"case": case, "from_activity": prev_act,
                                     "to_activity": r["concept:name"], "waiting_sec": wt})
                prev_time, prev_act = r["time:timestamp"], r["concept:name"] 

    if not rows:
        return pd.DataFrame(columns=["case", "from_activity", "to_activity", "waiting_sec"])
    w = pd.DataFrame(rows)
    return w"case", "from_activity", "to_activity", "waiting_sec"
 

def zscore(series: pd.Series) -> pd.Series:
    mu = np.nanmean(series)
    sd = np.nanstd(series, ddof=0)
    if sd == 0 or np.isnan(sd):
        return pd.Series(np.zeros(len(series)), index=series.index)
    return (series - mu) / sd
 

def main():
    ap = argparse.ArgumentParser(description="Bottleneck Detection from XES using PM4Py")
    ap.add_argument("xes_path", type=str, help="Path ke file .xes")
    ap.add_argument("--top", type=int, default=10, help="Top-N bottleneck yang ditampilkan (default 10)")
    ap.add_argument("--out", type=str, default="bottlenecks_summary.csv", help="Output CSV ringkasan")
    args = ap.parse_args()

    xes_path = Path(args.xes_path)
    if not xes_path.exists():
        print(f"[ERROR] File tidak ditemukan: {xes_path}", file=sys.stderr)
        sys.exit(1) 

    # 1) Load XES -> DataFrame
    log = xes_importer.apply(str(xes_path))
    df = convert_to_dataframe(log) 

    for c in ["case:concept:name", "concept:name", "time:timestamp"]:
        if c not in df.columns:
            print(f"[ERROR] Kolom wajib hilang di event log: {c}", file=sys.stderr)
            sys.exit(1)

    df["time:timestamp"] = pd.to_datetime(df["time:timestamp"], errors="coerce")
    df = df.dropna(subset=["time:timestamp"]).copy()

    # 2) Service time per aktivitas
    exec_df = pair_start_complete(df)

    # 3) Waiting time antar aktivitas
    wait_df = compute_waiting_times(df, exec_df)

    # 4) Agregasi per aktivitas
    if not exec_df.empty:
        service_stats = exec_df.groupby("activity")["service_sec"].agg(
            service_mean_sec="mean",
            service_median_sec="median",
            service_p95_sec=lambda x: np.nanpercentile(x, 95),
            service_count="count",
        ).reset_index()
    else:
        service_stats = pd.DataFrame(columns=["activity","service_mean_sec","service_median_sec","service_p95_sec","service_count"])

    if not wait_df.empty:
        wait_stats = wait_df.groupby("to_activity")["waiting_sec"].agg(
            wait_mean_sec="mean",
            wait_median_sec="median",
            wait_p95_sec=lambda x: np.nanpercentile(x, 95),
            wait_count="count",
        ).reset_index().rename(columns={"to_activity": "activity"})
    else:
        wait_stats = pd.DataFrame(columns=["activity","wait_mean_sec","wait_median_sec","wait_p95_sec","wait_count"])

    summary = pd.merge(service_stats, wait_stats, on="activity", how="outer").fillna(0)

    # 5) Skor bottleneck (gabungan z-score)
    summary["z_service"]      = zscore(summary["service_mean_sec"])
    summary["z_wait"]         = zscore(summary["wait_mean_sec"])
    summary["z_service_p95"]  = zscore(summary["service_p95_sec"])
    summary["z_wait_p95"]     = zscore(summary["wait_p95_sec"])

    summary["bottleneck_score"] = (
        0.4 * summary["z_service"] +
        0.4 * summary["z_wait"] +
        0.1 * summary["z_service_p95"] +
        0.1 * summary["z_wait_p95"]
    )

    summary_sorted = summary.sort_values("bottleneck_score", ascending=False)
    summary_sorted.to_csv(args.out, index=False)

    # 6) Tampilkan Top-N ringkas
    n = min(args.top, len(summary_sorted))
    print(f"\n=== TOP {n} BOTTLENECK ACTIVITIES ===")
    if n == 0:
        print("Tidak ada aktivitas terdeteksi. Cek struktur log Anda.")
        sys.exit(0)

    for _, r in summary_sorted.head(n).iterrows():
        print(
            f"- {r['activity']}: score={r['bottleneck_score']:.3f} | "
            f"svc_mean={r['service_mean_sec']:.1f}s (p95={r['service_p95_sec']:.1f}s, n={int(r['service_count'])}) | "
            f"wait_mean={r['wait_mean_sec']:.1f}s (p95={r['wait_p95_sec']:.1f}s, n={int(r['wait_count'])})"
        )

    print(f"\nCSV ringkasan disimpan ke: {args.out}")
    print("Prioritaskan aktivitas dengan score & p95 tinggi, terutama jika wait_mean besar (antrian).")


if __name__ == "__main__":
   main()

Jalankan ke file terlampir (path sesuai unggahan Anda):

python bottleneck_pm4py.py /mnt/data/BPIC_2012_A.xes --top 15 --out bottlenecks_BPICA.csv


2) (Opsional) Cross-check cepat pakai Performance DFG PM4Py

Ini alternatif ringkas untuk melihat mean waiting time antar-aktivitas (edge) dan sojourn time (di node). Cocok untuk sanity check bottleneck transisi.

#!/usr/bin/env python3
# perf_dfg_quickcheck.py
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
from pm4py.statistics.sojourn_time.log import get as soj_get
import pandas as pd
import sys

xes = sys.argv[1]
log = xes_importer.apply(xes)

# Mean performance DFG (edge durations)
perf_dfg = dfg_discovery.apply(log, variant=dfg_discovery.Variants.PERFORMANCE)
df_edges = pd.DataFrame(
    [{"from": a, "to": b, "mean_sec": v} for (a, b), v in perf_dfg.items()]
).sort_values("mean_sec", ascending=False)

# Sojourn time per activity (durasi berada di node)
soj = soj_get.apply(log)  # returns dict {activity: mean_seconds}
df_nodes = pd.DataFrame(
    [{"activity": k, "sojourn_mean_sec": v} for k, v in soj.items()]
).sort_values("sojourn_mean_sec", ascending=False)

df_edges.to_csv("perf_dfg_edges.csv", index=False)
df_nodes.to_csv("sojourn_nodes.csv", index=False)

print("Top 10 edges by mean_sec:")
print(df_edges.head(10))
print("\nTop 10 activities by sojourn_mean_sec:")
print(df_nodes.head(10))

Jalankan:

python perf_dfg_quickcheck.py /mnt/data/BPIC_2012_A.xes


Catatan penting

  • Akurasi service time bergantung pada hadirnya pasangan `lifecycle:start/complete`. Jika dataset hanya punya `complete`, fokuskan interpretasi pada waiting antar event (edge) dan sojourn (node).
  • p95 membantu mengungkap ekor panjang (spikes jarang tapi berat) yang sering jadi bottleneck meski mean tidak terlalu tinggi.
  • Untuk investigasi mendalam, gabungkan hasil `bottlenecks_summary.csv` dengan variasi per resource, per channel, atau per case attribute (mis. `org:resource`, `org:role`, `application type`, dll.) lalu lakukan groupby tambahan.