Difference between revisions of "PM: Bottleneck Detection"
Jump to navigation
Jump to search
Onnowpurbo (talk | contribs) (Created page with "Mantap. Aku siapkan **script siap-pakai** untuk mendeteksi bottleneck dari **file XES terlampir** (`/mnt/data/BPIC_2012_A.xes`) memakai **PM4Py**. Script menghitung: * **Serv...") |
Onnowpurbo (talk | contribs) |
||
Line 1: | Line 1: | ||
− | Mantap. Aku siapkan | + | Mantap. Aku siapkan '''script siap-pakai''' untuk mendeteksi bottleneck dari '''file XES terlampir''' (`/mnt/data/BPIC_2012_A.xes`) memakai '''PM4Py'''. |
Script menghitung: | Script menghitung: | ||
− | * | + | * '''Service time''' per aktivitas dari pasangan `lifecycle:start`–`complete` |
− | * | + | * '''Waiting time''' antar-aktivitas di dalam tiap case |
− | * Ringkasan metrik (mean/median/p95/count) + | + | * Ringkasan metrik (mean/median/p95/count) + '''bottleneck score''' (gabungan z-score service & waiting) |
− | * Ekspor ke | + | * Ekspor ke '''CSV''' dan tampilkan '''TOP-N bottleneck''' |
− | + | ==Install (sekali saja)== | |
− | |||
− | |||
− | |||
− | |||
− | + | pip install pm4py pandas numpy | |
− | |||
− | `` | + | ==1) Script utama — `bottleneck_pm4py.py`== |
− | |||
− | |||
− | import argparse | + | #!/usr/bin/env python3 |
− | import sys | + | # -*- coding: utf-8 -*- |
− | from pathlib import Path | + | |
− | import numpy as np | + | import argparse |
− | import pandas as pd | + | import sys |
− | + | from pathlib import Path | |
− | from pm4py.objects.log.importer.xes import importer as xes_importer | + | import numpy as np |
− | from pm4py import convert_to_dataframe | + | import pandas as pd |
− | + | ||
− | + | from pm4py.objects.log.importer.xes import importer as xes_importer | |
− | def pair_start_complete(df: pd.DataFrame) -> pd.DataFrame: | + | from pm4py import convert_to_dataframe |
− | + | ||
− | + | ||
− | + | def pair_start_complete(df: pd.DataFrame) -> pd.DataFrame: | |
− | + | """Pasangkan start/complete per (case, activity) -> satu baris per eksekusi aktivitas.""" | |
− | + | if "lifecycle:transition" not in df.columns: | |
− | + | return pd.DataFrame(columns=["case", "activity", "start_time", "complete_time", "service_sec"]) | |
− | + | ||
− | + | dfl = df.dropna(subset=["lifecycle:transition"]).copy() | |
− | + | dfl["transition"] = dfl["lifecycle:transition"].str.lower() | |
− | + | dfl = dfl[dfl["transition"].isin(["start", "complete"])] | |
− | + | if dfl.empty: | |
− | + | return pd.DataFrame(columns=["case", "activity", "start_time", "complete_time", "service_sec"]) | |
− | + | ||
− | + | dfl = dfl.sort_values(["case:concept:name", "concept:name", "time:timestamp", "transition"]).copy() | |
− | + | dfl["start_rank"] = dfl["transition"].eq("start").groupby( | |
− | + | [dfl["case:concept:name"], dfl["concept:name"]] | |
− | + | ).cumsum() | |
− | + | dfl["complete_rank"] = dfl["transition"].eq("complete").groupby( | |
− | + | [dfl["case:concept:name"], dfl["concept:name"]] | |
− | + | ).cumsum() | |
− | + | ||
− | + | starts = dfl[dfl["transition"] == "start"].rename(columns={"time:timestamp": "start_time"}) | |
− | + | comps = dfl[dfl["transition"] == "complete"].rename(columns={"time:timestamp": "complete_time"}) | |
− | + | ||
− | + | merged = pd.merge( | |
− | + | starts[["case:concept:name", "concept:name", "start_rank", "start_time"]], | |
− | + | comps[["case:concept:name", "concept:name", "complete_rank", "complete_time"]], | |
− | + | left_on=["case:concept:name", "concept:name", "start_rank"], | |
− | + | right_on=["case:concept:name", "concept:name", "complete_rank"], | |
− | + | how="inner", | |
− | + | ).rename(columns={"case:concept:name": "case", "concept:name": "activity"}) | |
− | + | ||
− | + | merged["service_sec"] = (merged["complete_time"] - merged["start_time"]).dt.total_seconds() | |
− | + | merged = merged[(merged["service_sec"] >= 0) & np.isfinite(merged["service_sec"])] | |
− | def compute_waiting_times(df: pd.DataFrame, exec_df: pd.DataFrame) -> pd.DataFrame: | + | return merged[["case", "activity", "start_time", "complete_time", "service_sec"]] |
− | + | ||
− | + | ||
− | + | def compute_waiting_times(df: pd.DataFrame, exec_df: pd.DataFrame) -> pd.DataFrame: | |
− | + | """ | |
− | + | Hitung waiting time antar aktivitas di tiap case. | |
− | + | - Jika ada start/complete: tunggu = start(curr) - complete(prev) | |
− | + | - Jika tidak ada lifecycle: tunggu = time(curr) - time(prev) | |
− | + | """ | |
− | + | has_lifecycle = "lifecycle:transition" in df.columns and \ | |
− | + | df["lifecycle:transition"].str.lower().isin(["start", "complete"]).any() | |
− | + | ||
− | + | rows = [] | |
− | + | if has_lifecycle and not exec_df.empty: | |
− | + | per_case = exec_df.sort_values(["case", "start_time"]) | |
− | + | for case, g in per_case.groupby("case"): | |
− | + | g = g.sort_values("start_time") | |
− | + | prev_complete, prev_act = None, None | |
− | + | for _, r in g.iterrows(): | |
− | + | if prev_complete is not None: | |
− | + | wt = (r["start_time"] - prev_complete).total_seconds() | |
− | + | if wt >= 0: | |
− | + | rows.append({"case": case, "from_activity": prev_act, | |
− | + | "to_activity": r["activity"], "waiting_sec": wt}) | |
− | + | prev_complete, prev_act = r["complete_time"], r["activity"] | |
− | + | else: | |
− | + | # fallback tanpa lifecycle | |
− | + | df2 = df.sort_values(["case:concept:name", "time:timestamp"]) | |
− | + | for case, g in df2.groupby("case:concept:name"): | |
− | + | g = g.sort_values("time:timestamp") | |
− | + | prev_time, prev_act = None, None | |
− | + | for _, r in g.iterrows(): | |
− | + | if prev_time is not None: | |
− | + | wt = (r["time:timestamp"] - prev_time).total_seconds() | |
− | + | if wt >= 0: | |
− | + | rows.append({"case": case, "from_activity": prev_act, | |
− | + | "to_activity": r["concept:name"], "waiting_sec": wt}) | |
− | + | prev_time, prev_act = r["time:timestamp"], r["concept:name"] | |
− | + | ||
− | + | if not rows: | |
− | + | return pd.DataFrame(columns=["case", "from_activity", "to_activity", "waiting_sec"]) | |
− | + | w = pd.DataFrame(rows) | |
− | def zscore(series: pd.Series) -> pd.Series: | + | return w[["case", "from_activity", "to_activity", "waiting_sec"]] |
− | + | ||
− | + | ||
− | + | def zscore(series: pd.Series) -> pd.Series: | |
− | + | mu = np.nanmean(series) | |
− | + | sd = np.nanstd(series, ddof=0) | |
− | + | if sd == 0 or np.isnan(sd): | |
− | + | return pd.Series(np.zeros(len(series)), index=series.index) | |
− | def main(): | + | return (series - mu) / sd |
− | + | ||
− | + | ||
− | + | def main(): | |
− | + | ap = argparse.ArgumentParser(description="Bottleneck Detection from XES using PM4Py") | |
− | + | ap.add_argument("xes_path", type=str, help="Path ke file .xes") | |
− | + | ap.add_argument("--top", type=int, default=10, help="Top-N bottleneck yang ditampilkan (default 10)") | |
− | + | ap.add_argument("--out", type=str, default="bottlenecks_summary.csv", help="Output CSV ringkasan") | |
− | + | args = ap.parse_args() | |
− | + | ||
− | + | xes_path = Path(args.xes_path) | |
− | + | if not xes_path.exists(): | |
− | + | print(f"[ERROR] File tidak ditemukan: {xes_path}", file=sys.stderr) | |
− | + | sys.exit(1) | |
− | + | ||
− | + | # 1) Load XES -> DataFrame | |
− | + | log = xes_importer.apply(str(xes_path)) | |
− | + | df = convert_to_dataframe(log) | |
− | + | ||
− | + | for c in ["case:concept:name", "concept:name", "time:timestamp"]: | |
− | + | if c not in df.columns: | |
− | + | print(f"[ERROR] Kolom wajib hilang di event log: {c}", file=sys.stderr) | |
− | + | sys.exit(1) | |
− | + | ||
− | + | df["time:timestamp"] = pd.to_datetime(df["time:timestamp"], errors="coerce") | |
− | + | df = df.dropna(subset=["time:timestamp"]).copy() | |
− | + | ||
− | + | # 2) Service time per aktivitas | |
− | + | exec_df = pair_start_complete(df) | |
− | + | ||
− | + | # 3) Waiting time antar aktivitas | |
− | + | wait_df = compute_waiting_times(df, exec_df) | |
− | + | ||
− | + | # 4) Agregasi per aktivitas | |
− | + | if not exec_df.empty: | |
− | + | service_stats = exec_df.groupby("activity")["service_sec"].agg( | |
− | + | service_mean_sec="mean", | |
− | + | service_median_sec="median", | |
− | + | service_p95_sec=lambda x: np.nanpercentile(x, 95), | |
− | + | service_count="count", | |
− | + | ).reset_index() | |
− | + | else: | |
− | + | service_stats = pd.DataFrame(columns=["activity","service_mean_sec","service_median_sec","service_p95_sec","service_count"]) | |
− | + | ||
− | + | if not wait_df.empty: | |
− | + | wait_stats = wait_df.groupby("to_activity")["waiting_sec"].agg( | |
− | + | wait_mean_sec="mean", | |
− | + | wait_median_sec="median", | |
− | + | wait_p95_sec=lambda x: np.nanpercentile(x, 95), | |
− | + | wait_count="count", | |
− | + | ).reset_index().rename(columns={"to_activity": "activity"}) | |
− | + | else: | |
− | + | wait_stats = pd.DataFrame(columns=["activity","wait_mean_sec","wait_median_sec","wait_p95_sec","wait_count"]) | |
− | + | ||
− | + | summary = pd.merge(service_stats, wait_stats, on="activity", how="outer").fillna(0) | |
− | + | ||
− | + | # 5) Skor bottleneck (gabungan z-score) | |
− | + | summary["z_service"] = zscore(summary["service_mean_sec"]) | |
− | + | summary["z_wait"] = zscore(summary["wait_mean_sec"]) | |
− | + | summary["z_service_p95"] = zscore(summary["service_p95_sec"]) | |
− | + | summary["z_wait_p95"] = zscore(summary["wait_p95_sec"]) | |
− | + | ||
− | + | summary["bottleneck_score"] = ( | |
− | + | 0.4 * summary["z_service"] + | |
− | + | 0.4 * summary["z_wait"] + | |
− | + | 0.1 * summary["z_service_p95"] + | |
− | + | 0.1 * summary["z_wait_p95"] | |
− | + | ) | |
− | + | ||
− | + | summary_sorted = summary.sort_values("bottleneck_score", ascending=False) | |
− | + | summary_sorted.to_csv(args.out, index=False) | |
− | + | ||
− | + | # 6) Tampilkan Top-N ringkas | |
− | + | n = min(args.top, len(summary_sorted)) | |
− | + | print(f"\n=== TOP {n} BOTTLENECK ACTIVITIES ===") | |
− | + | if n == 0: | |
− | + | print("Tidak ada aktivitas terdeteksi. Cek struktur log Anda.") | |
− | + | sys.exit(0) | |
− | + | ||
− | + | for _, r in summary_sorted.head(n).iterrows(): | |
− | + | print( | |
− | + | f"- {r['activity']}: score={r['bottleneck_score']:.3f} | " | |
− | + | f"svc_mean={r['service_mean_sec']:.1f}s (p95={r['service_p95_sec']:.1f}s, n={int(r['service_count'])}) | " | |
− | + | f"wait_mean={r['wait_mean_sec']:.1f}s (p95={r['wait_p95_sec']:.1f}s, n={int(r['wait_count'])})" | |
− | + | ) | |
− | + | ||
− | + | print(f"\nCSV ringkasan disimpan ke: {args.out}") | |
− | if __name__ == "__main__": | + | print("Prioritaskan aktivitas dengan score & p95 tinggi, terutama jika wait_mean besar (antrian).") |
+ | |||
+ | |||
+ | if __name__ == "__main__": | ||
main() | main() | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | + | '''Jalankan ke file terlampir''' (path sesuai unggahan Anda): | |
− | |||
− | + | python bottleneck_pm4py.py /mnt/data/BPIC_2012_A.xes --top 15 --out bottlenecks_BPICA.csv | |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | + | ==2) (Opsional) Cross-check cepat pakai '''Performance DFG''' PM4Py== | |
− | |||
− | + | Ini alternatif ringkas untuk melihat '''mean waiting time''' antar-aktivitas (edge) dan '''sojourn time''' (di node). Cocok untuk sanity check bottleneck transisi. | |
− | |||
− | |||
− | |||
− | |||
− | + | #!/usr/bin/env python3 | |
+ | # perf_dfg_quickcheck.py | ||
+ | from pm4py.objects.log.importer.xes import importer as xes_importer | ||
+ | from pm4py.algo.discovery.dfg import algorithm as dfg_discovery | ||
+ | from pm4py.statistics.sojourn_time.log import get as soj_get | ||
+ | import pandas as pd | ||
+ | import sys | ||
+ | |||
+ | xes = sys.argv[1] | ||
+ | log = xes_importer.apply(xes) | ||
+ | |||
+ | # Mean performance DFG (edge durations) | ||
+ | perf_dfg = dfg_discovery.apply(log, variant=dfg_discovery.Variants.PERFORMANCE) | ||
+ | df_edges = pd.DataFrame( | ||
+ | [{"from": a, "to": b, "mean_sec": v} for (a, b), v in perf_dfg.items()] | ||
+ | ).sort_values("mean_sec", ascending=False) | ||
+ | |||
+ | # Sojourn time per activity (durasi berada di node) | ||
+ | soj = soj_get.apply(log) # returns dict {activity: mean_seconds} | ||
+ | df_nodes = pd.DataFrame( | ||
+ | [{"activity": k, "sojourn_mean_sec": v} for k, v in soj.items()] | ||
+ | ).sort_values("sojourn_mean_sec", ascending=False) | ||
+ | |||
+ | df_edges.to_csv("perf_dfg_edges.csv", index=False) | ||
+ | df_nodes.to_csv("sojourn_nodes.csv", index=False) | ||
+ | |||
+ | print("Top 10 edges by mean_sec:") | ||
+ | print(df_edges.head(10)) | ||
+ | print("\nTop 10 activities by sojourn_mean_sec:") | ||
+ | print(df_nodes.head(10)) | ||
− | + | '''Jalankan:''' | |
− | |||
− | |||
− | + | python perf_dfg_quickcheck.py /mnt/data/BPIC_2012_A.xes | |
− | |||
− | + | ==Catatan penting== | |
− | |||
− | |||
− | + | * '''Akurasi service time''' bergantung pada hadirnya pasangan `lifecycle:start/complete`. Jika dataset hanya punya `complete`, fokuskan interpretasi pada '''waiting antar event''' (edge) dan '''sojourn''' (node). | |
+ | * '''p95''' membantu mengungkap '''ekor panjang''' (spikes jarang tapi berat) yang sering jadi bottleneck meski mean tidak terlalu tinggi. | ||
+ | * Untuk investigasi mendalam, gabungkan hasil `bottlenecks_summary.csv` dengan '''variasi per resource''', '''per channel''', atau '''per case attribute''' (mis. `org:resource`, `org:role`, `application type`, dll.) lalu lakukan '''groupby''' tambahan. |
Latest revision as of 15:52, 13 September 2025
Mantap. Aku siapkan script siap-pakai untuk mendeteksi bottleneck dari file XES terlampir (`/mnt/data/BPIC_2012_A.xes`) memakai PM4Py. Script menghitung:
- Service time per aktivitas dari pasangan `lifecycle:start`–`complete`
- Waiting time antar-aktivitas di dalam tiap case
- Ringkasan metrik (mean/median/p95/count) + bottleneck score (gabungan z-score service & waiting)
- Ekspor ke CSV dan tampilkan TOP-N bottleneck
Install (sekali saja)
pip install pm4py pandas numpy
1) Script utama — `bottleneck_pm4py.py`
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import sys from pathlib import Path import numpy as np import pandas as pd from pm4py.objects.log.importer.xes import importer as xes_importer from pm4py import convert_to_dataframe def pair_start_complete(df: pd.DataFrame) -> pd.DataFrame: """Pasangkan start/complete per (case, activity) -> satu baris per eksekusi aktivitas.""" if "lifecycle:transition" not in df.columns: return pd.DataFrame(columns=["case", "activity", "start_time", "complete_time", "service_sec"]) dfl = df.dropna(subset=["lifecycle:transition"]).copy() dfl["transition"] = dfl["lifecycle:transition"].str.lower() dfl = dfl[dfl["transition"].isin(["start", "complete"])] if dfl.empty: return pd.DataFrame(columns=["case", "activity", "start_time", "complete_time", "service_sec"]) dfl = dfl.sort_values(["case:concept:name", "concept:name", "time:timestamp", "transition"]).copy() dfl["start_rank"] = dfl["transition"].eq("start").groupby( [dfl["case:concept:name"], dfl["concept:name"]] ).cumsum() dfl["complete_rank"] = dfl["transition"].eq("complete").groupby( [dfl["case:concept:name"], dfl["concept:name"]] ).cumsum() starts = dfl[dfl["transition"] == "start"].rename(columns={"time:timestamp": "start_time"}) comps = dfl[dfl["transition"] == "complete"].rename(columns={"time:timestamp": "complete_time"}) merged = pd.merge( starts"case:concept:name", "concept:name", "start_rank", "start_time", comps"case:concept:name", "concept:name", "complete_rank", "complete_time", left_on=["case:concept:name", "concept:name", "start_rank"], right_on=["case:concept:name", "concept:name", "complete_rank"], how="inner", ).rename(columns={"case:concept:name": "case", "concept:name": "activity"}) merged["service_sec"] = (merged["complete_time"] - merged["start_time"]).dt.total_seconds() merged = merged[(merged["service_sec"] >= 0) & np.isfinite(merged["service_sec"])] return merged"case", "activity", "start_time", "complete_time", "service_sec" def compute_waiting_times(df: pd.DataFrame, exec_df: pd.DataFrame) -> pd.DataFrame: """ Hitung waiting time antar aktivitas di tiap case. - Jika ada start/complete: tunggu = start(curr) - complete(prev) - Jika tidak ada lifecycle: tunggu = time(curr) - time(prev) """ has_lifecycle = "lifecycle:transition" in df.columns and \ df["lifecycle:transition"].str.lower().isin(["start", "complete"]).any() rows = [] if has_lifecycle and not exec_df.empty: per_case = exec_df.sort_values(["case", "start_time"]) for case, g in per_case.groupby("case"): g = g.sort_values("start_time") prev_complete, prev_act = None, None for _, r in g.iterrows(): if prev_complete is not None: wt = (r["start_time"] - prev_complete).total_seconds() if wt >= 0: rows.append({"case": case, "from_activity": prev_act, "to_activity": r["activity"], "waiting_sec": wt}) prev_complete, prev_act = r["complete_time"], r["activity"] else: # fallback tanpa lifecycle df2 = df.sort_values(["case:concept:name", "time:timestamp"]) for case, g in df2.groupby("case:concept:name"): g = g.sort_values("time:timestamp") prev_time, prev_act = None, None for _, r in g.iterrows(): if prev_time is not None: wt = (r["time:timestamp"] - prev_time).total_seconds() if wt >= 0: rows.append({"case": case, "from_activity": prev_act, "to_activity": r["concept:name"], "waiting_sec": wt}) prev_time, prev_act = r["time:timestamp"], r["concept:name"] if not rows: return pd.DataFrame(columns=["case", "from_activity", "to_activity", "waiting_sec"]) w = pd.DataFrame(rows) return w"case", "from_activity", "to_activity", "waiting_sec" def zscore(series: pd.Series) -> pd.Series: mu = np.nanmean(series) sd = np.nanstd(series, ddof=0) if sd == 0 or np.isnan(sd): return pd.Series(np.zeros(len(series)), index=series.index) return (series - mu) / sd def main(): ap = argparse.ArgumentParser(description="Bottleneck Detection from XES using PM4Py") ap.add_argument("xes_path", type=str, help="Path ke file .xes") ap.add_argument("--top", type=int, default=10, help="Top-N bottleneck yang ditampilkan (default 10)") ap.add_argument("--out", type=str, default="bottlenecks_summary.csv", help="Output CSV ringkasan") args = ap.parse_args() xes_path = Path(args.xes_path) if not xes_path.exists(): print(f"[ERROR] File tidak ditemukan: {xes_path}", file=sys.stderr) sys.exit(1) # 1) Load XES -> DataFrame log = xes_importer.apply(str(xes_path)) df = convert_to_dataframe(log) for c in ["case:concept:name", "concept:name", "time:timestamp"]: if c not in df.columns: print(f"[ERROR] Kolom wajib hilang di event log: {c}", file=sys.stderr) sys.exit(1) df["time:timestamp"] = pd.to_datetime(df["time:timestamp"], errors="coerce") df = df.dropna(subset=["time:timestamp"]).copy() # 2) Service time per aktivitas exec_df = pair_start_complete(df) # 3) Waiting time antar aktivitas wait_df = compute_waiting_times(df, exec_df) # 4) Agregasi per aktivitas if not exec_df.empty: service_stats = exec_df.groupby("activity")["service_sec"].agg( service_mean_sec="mean", service_median_sec="median", service_p95_sec=lambda x: np.nanpercentile(x, 95), service_count="count", ).reset_index() else: service_stats = pd.DataFrame(columns=["activity","service_mean_sec","service_median_sec","service_p95_sec","service_count"]) if not wait_df.empty: wait_stats = wait_df.groupby("to_activity")["waiting_sec"].agg( wait_mean_sec="mean", wait_median_sec="median", wait_p95_sec=lambda x: np.nanpercentile(x, 95), wait_count="count", ).reset_index().rename(columns={"to_activity": "activity"}) else: wait_stats = pd.DataFrame(columns=["activity","wait_mean_sec","wait_median_sec","wait_p95_sec","wait_count"]) summary = pd.merge(service_stats, wait_stats, on="activity", how="outer").fillna(0) # 5) Skor bottleneck (gabungan z-score) summary["z_service"] = zscore(summary["service_mean_sec"]) summary["z_wait"] = zscore(summary["wait_mean_sec"]) summary["z_service_p95"] = zscore(summary["service_p95_sec"]) summary["z_wait_p95"] = zscore(summary["wait_p95_sec"]) summary["bottleneck_score"] = ( 0.4 * summary["z_service"] + 0.4 * summary["z_wait"] + 0.1 * summary["z_service_p95"] + 0.1 * summary["z_wait_p95"] ) summary_sorted = summary.sort_values("bottleneck_score", ascending=False) summary_sorted.to_csv(args.out, index=False) # 6) Tampilkan Top-N ringkas n = min(args.top, len(summary_sorted)) print(f"\n=== TOP {n} BOTTLENECK ACTIVITIES ===") if n == 0: print("Tidak ada aktivitas terdeteksi. Cek struktur log Anda.") sys.exit(0) for _, r in summary_sorted.head(n).iterrows(): print( f"- {r['activity']}: score={r['bottleneck_score']:.3f} | " f"svc_mean={r['service_mean_sec']:.1f}s (p95={r['service_p95_sec']:.1f}s, n={int(r['service_count'])}) | " f"wait_mean={r['wait_mean_sec']:.1f}s (p95={r['wait_p95_sec']:.1f}s, n={int(r['wait_count'])})" ) print(f"\nCSV ringkasan disimpan ke: {args.out}") print("Prioritaskan aktivitas dengan score & p95 tinggi, terutama jika wait_mean besar (antrian).") if __name__ == "__main__": main()
Jalankan ke file terlampir (path sesuai unggahan Anda):
python bottleneck_pm4py.py /mnt/data/BPIC_2012_A.xes --top 15 --out bottlenecks_BPICA.csv
2) (Opsional) Cross-check cepat pakai Performance DFG PM4Py
Ini alternatif ringkas untuk melihat mean waiting time antar-aktivitas (edge) dan sojourn time (di node). Cocok untuk sanity check bottleneck transisi.
#!/usr/bin/env python3 # perf_dfg_quickcheck.py from pm4py.objects.log.importer.xes import importer as xes_importer from pm4py.algo.discovery.dfg import algorithm as dfg_discovery from pm4py.statistics.sojourn_time.log import get as soj_get import pandas as pd import sys xes = sys.argv[1] log = xes_importer.apply(xes) # Mean performance DFG (edge durations) perf_dfg = dfg_discovery.apply(log, variant=dfg_discovery.Variants.PERFORMANCE) df_edges = pd.DataFrame( [{"from": a, "to": b, "mean_sec": v} for (a, b), v in perf_dfg.items()] ).sort_values("mean_sec", ascending=False) # Sojourn time per activity (durasi berada di node) soj = soj_get.apply(log) # returns dict {activity: mean_seconds} df_nodes = pd.DataFrame( [{"activity": k, "sojourn_mean_sec": v} for k, v in soj.items()] ).sort_values("sojourn_mean_sec", ascending=False) df_edges.to_csv("perf_dfg_edges.csv", index=False) df_nodes.to_csv("sojourn_nodes.csv", index=False) print("Top 10 edges by mean_sec:") print(df_edges.head(10)) print("\nTop 10 activities by sojourn_mean_sec:") print(df_nodes.head(10))
Jalankan:
python perf_dfg_quickcheck.py /mnt/data/BPIC_2012_A.xes
Catatan penting
- Akurasi service time bergantung pada hadirnya pasangan `lifecycle:start/complete`. Jika dataset hanya punya `complete`, fokuskan interpretasi pada waiting antar event (edge) dan sojourn (node).
- p95 membantu mengungkap ekor panjang (spikes jarang tapi berat) yang sering jadi bottleneck meski mean tidak terlalu tinggi.
- Untuk investigasi mendalam, gabungkan hasil `bottlenecks_summary.csv` dengan variasi per resource, per channel, atau per case attribute (mis. `org:resource`, `org:role`, `application type`, dll.) lalu lakukan groupby tambahan.