Skip to content

API Reference

This reference documents the core modules of eida-consistency.

Core Checker

eida_consistency.core.checker

Pick random epochs and check availability coverage (returns location and matched span).

This module selects up to epochs unique (network, station, channel) items from the candidate pool. For each channel, it queries /availability/1/query?format=json once for the entire epoch-span (from StationXML). Then, within that span, it picks random test epochs of length duration seconds.

Return value

results, stats

results = [ ( availability_url, # the availability request URL (full epoch-span) availability_ok, # True/False depending on slice coverage epoch_start_iso, # slice start epoch_end_iso, # slice end location_exact, # location code (from matched span or StationXML) matched_span # the span dict that covered the slice (or None) ), ... ]

stats = { "candidates_requested": epochs, "candidates_generated": len(results), "candidates_pool": len(pool), "queries_performed": attempts, }

check_candidate(base_url, candidate, candidates=None, epochs=10, duration=600)

Build up to epochs test cases and check availability coverage.

Returns

results : list of tuples (availability_url, availability_ok, epoch_start_iso, epoch_end_iso, location_exact, matched_span) stats : dict {"candidates_requested", "candidates_generated", "candidates_pool", "queries_performed"}

Source code in src/eida_consistency/core/checker.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def check_candidate(
    base_url: str,
    candidate: Dict[str, str],
    candidates: Optional[List[Dict[str, str]]] = None,
    epochs: int = 10,
    duration: int = 600,
) -> Tuple[List[Tuple[str, bool, str, str, str, Dict[str, str] | None]], Dict[str, int]]:
    """
    Build up to `epochs` test cases and check availability coverage.

    Returns
    -------
    results : list of tuples
        (availability_url, availability_ok, epoch_start_iso, epoch_end_iso,
         location_exact, matched_span)
    stats : dict
        {"candidates_requested", "candidates_generated", "candidates_pool", "queries_performed"}
    """
    if duration < 600:
        raise ValueError("Duration must be at least 600 seconds (10 minutes).")

    results: List[Tuple[str, bool, str, str, str, Dict[str, str] | None]] = []

    pool = [
        c for c in (candidates or [candidate])
        if all(k in c for k in ("network", "station", "channel", "starttime"))
    ]
    if not pool:
        return results, {
            "candidates_requested": epochs,
            "candidates_generated": 0,
            "candidates_pool": 0,
            "queries_performed": 0,
        }

    used: set[tuple[str, str, str]] = set()
    attempts, max_attempts = 0, max(epochs * 20, len(pool) * 2)

    while len(results) < epochs and attempts < max_attempts:
        attempts += 1
        sample = random.choice(pool)
        key = (sample["network"], sample["station"], sample["channel"])
        # Allow checking same channel multiple times if needed, but keeping uniqueness for now if preferred
        # or we rely on random selection from large pool.
        # Original logic was 'used' per query, but now we do multiple per channel.
        # Let's drop unique constraint to allow multiple epochs per channel if pool is small.
        # Logic change: The user requested N *epochs*, potentially from same channel.
        # But original logic had `if key in used: continue`.
        # We'll stick to unique channels if possible, or relax it if epochs > pool.
        if len(pool) >= epochs and key in used:
            continue

        ch_start = _parse_iso(sample.get("starttime"))
        ch_end = _parse_iso(sample.get("endtime")) or datetime.utcnow()
        if not ch_start or not ch_end or ch_start >= ch_end:
            continue

        # Random slice of `duration` seconds inside [ch_start, ch_end]
        duration_td = timedelta(seconds=duration)
        latest_start = ch_end - duration_td
        if ch_start >= latest_start:
            continue

        offset = random.randint(0, int((latest_start - ch_start).total_seconds()))
        epoch_start_dt = ch_start + timedelta(seconds=offset)
        epoch_end_dt = epoch_start_dt + duration_td

        s = epoch_start_dt.strftime("%Y-%m-%dT%H:%M:%S")
        e = epoch_end_dt.strftime("%Y-%m-%dT%H:%M:%S")

        # Query availability for THIS specific epoch
        # Return format: {"ok": bool, "matched_span": dict|None, ...}
        av_res = check_availability_query(
            base_url,
            sample["network"],
            sample["station"],
            sample["channel"],
            s,
            e,
            location=sample.get("location", "*"),
        )

        available = bool(av_res["ok"])
        matched_span = av_res.get("matched_span")
        loc = matched_span["location"] if (matched_span and matched_span.get("location")) else sample.get("location", "")

        results.append((av_res["url"], available, s, e, loc, matched_span))
        used.add(key)

    # Final summary line
    logging.info(
        f"Final usable epochs: {len(results)} / {epochs} "
        f"(from {len(pool)} candidates, {attempts} attempts)"
    )

    stats = {
        "candidates_requested": epochs,
        "candidates_generated": len(results),
        "candidates_pool": len(pool),
        "queries_performed": attempts,
    }

    return results, stats

Runner

eida_consistency.runner

CLI entry point and orchestration for running consistency checks.

run_consistency_check(node, epochs=10, percentage=None, duration=600, seed=None, delete_old=False, max_workers=10, print_stdout=False, report_dir=REPORT_DIR, station_multiplier=3)

Run the availability + dataselect consistency check and write reports.

Returns

Optional[Path] The path to the saved JSON report, or None if nothing was generated (e.g., no candidates).

Source code in src/eida_consistency/runner.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def run_consistency_check(
    node: str,
    epochs: int | None = 10,
    percentage: float | None = None,
    duration: int = 600,
    seed: int | None = None,
    delete_old: bool = False,
    max_workers: int = 10,
    print_stdout: bool = False,
    report_dir: Path = REPORT_DIR,
    station_multiplier: int = 3,
) -> Optional[Path]:
    """
    Run the availability + dataselect consistency check and write reports.

    Returns
    -------
    Optional[Path]
        The path to the saved JSON report, or None if nothing was generated
        (e.g., no candidates).
    """
    start_time = time.time()  # ⬅️ measure runtime start

    if seed is None:
        seed = random.randint(0, 999_999)
        logging.info(f" Using generated seed: {seed}")
    else:
        logging.info(f" Using provided seed: {seed}")

    random.seed(seed)
    base_url = load_node_url(node)

    if percentage is not None:
        logging.info(f" Fetching candidates (Percentage {percentage:.1%} of available stations)...")
        candidates = fetch_candidates(base_url, percentage=percentage)
        # If percentage logic is used, we test ONE epoch per selected candidate.
        epochs = len(candidates)
    else:
        logging.info(f" Fetching random candidates for node: {node}...")
        # Always fetch station_multiplier × epochs candidates
        epochs = epochs or 10  # fallback if None
        target_candidates = epochs * station_multiplier
        candidates = fetch_candidates(base_url, max_candidates=target_candidates)

    if not candidates:
        logging.warning("No candidates fetched. No report will be generated.")
        return None

    # Each item: (url, available, start, end, loc_exact, matched_span)
    results, stats = check_candidate(
        base_url,
        candidates[0],
        candidates=candidates,
        epochs=epochs,
        duration=duration,
    )

    logging.info("▶ Checking availability + dataselect consistency in parallel:\n")

    all_logs, all_records = [], []

    def worker(args):
        idx, (url, available, start, end, loc_exact, matched_span), match = args
        loc_final = loc_exact or match.get("location", "")
        ds_result = dataselect(
            base_url,
            match["network"], match["station"], match["channel"],
            start, end, loc_final
        )
        log = format_result(
            idx,
            url,
            available,
            ds_result,
            {**match, "location": loc_final, "matched_span": matched_span},
        )
        record = {
            "index": idx,
            "url": url,
            "network": match["network"],
            "station": match["station"],
            "channel": match["channel"],
            "location": loc_final,
            "available": available,
            "dataselect_success": ds_result["success"],
            "dataselect_status": ds_result["status"],
            "dataselect_type": ds_result.get("type", "?"),
            "consistent": available == ds_result["success"],
            "starttime": str(start),
            "endtime": str(end),
            "debug": ds_result.get("debug", ""),
            "matched_span": {
                "start": matched_span.get("start") if matched_span else None,
                "end": matched_span.get("end") if matched_span else None,
                "location": matched_span.get("location") if matched_span else None,
            },
        }
        return log, record

    args_list = []
    for idx, (url, available, start, end, loc_exact, matched_span) in enumerate(results, 1):
        try:
            parts = url.split("?")[1].split("&")
            net = next(p.split("=")[1] for p in parts if p.startswith("network="))
            sta = next(p.split("=")[1] for p in parts if p.startswith("station="))
            cha = next(p.split("=")[1] for p in parts if p.startswith("channel="))
        except Exception:
            net, sta, cha = "?", "?", "?"

        match = next(
            (
                c
                for c in candidates
                if c["network"] == net and c["station"] == sta and c["channel"] == cha
            ),
            None,
        )
        if match:
            args_list.append((idx, (url, available, start, end, loc_exact, matched_span), match))

    pool_size = max(1, min(max_workers, len(args_list)))
    with concurrent.futures.ThreadPoolExecutor(max_workers=pool_size) as executor:
        futures = [executor.submit(worker, a) for a in args_list]
        for fut in concurrent.futures.as_completed(futures):
            log, record = fut.result()
            logging.info(log + "\n")
            all_logs.append(log)
            all_records.append(record)

    logging.info(f"✅ Collected {len(all_records)} results.")

    # --- Measure test duration ---
    end_time = time.time()
    test_duration = round(end_time - start_time, 2)


    # --- Save reports into chosen report_dir ---
    report = create_report_object(
        node=node,
        seed=seed,
        epochs=epochs,
        duration=duration,
        records=all_records,
    )
    report["summary"].update(stats)  # merge candidate stats into summary
    report["summary"]["test_duration_sec"] = test_duration  # ⬅️ add runtime here

    json_path = save_report_json(report, report_dir=report_dir)
    md_path = save_report_markdown(report, report_dir=report_dir)
    logging.info(f"📁 Report saved to: {json_path}")
    logging.info(f"📜 Markdown saved to: {md_path}")
    logging.info(f"🕒 Test duration: {test_duration} seconds")
    if print_stdout:
        sys.stdout.write(json.dumps(report, indent=2, ensure_ascii=False) + "\n")
        sys.stdout.flush()

    return json_path