Skip to content

CoMPASS Level 1 - HMM Models

Hidden Markov Model implementations for inferring fine-grained motor states.

Overview

This module implements Gamma-distributed HMM models to identify behavioral states based on:

  • Step length distributions (Gamma)
  • Turn angle distributions (von Mises)

The model identifies two primary states:

  • Surveillance: Low step length + High turn angle
  • Ambulation: High step length + Low turn angle

Model Architecture

The GammaHMM uses a custom implementation with forward-backward and Viterbi algorithms optimized for behavioral state inference.


Core Model Class

GammaHMM

compass_labyrinth.compass.level_1.momentuPY.GammaHMM

GammaHMM(
    n_states=2,
    angle_model="vm",
    max_iter=200,
    tol=0.0001,
    method="L-BFGS-B",
    random_state=0,
    stationary=True,
    angle_bias: Optional[Tuple[float, float]] = None,
)

angle_model: - "vm" : use angle (radians) ~ von Mises - "absgamma" : use |angle| ~ Gamma

S instance-attribute

S = n_states

angle_model instance-attribute

angle_model = angle_model

max_iter instance-attribute

max_iter = max_iter

tol instance-attribute

tol = tol

method instance-attribute

method = method

rng instance-attribute

rng = default_rng(random_state)

stationary instance-attribute

stationary = stationary

angle_bias instance-attribute

angle_bias = angle_bias

_init_params

_init_params(step, angle)

_loglik

_loglik(step, angle)

_mstep_emissions

_mstep_emissions(step, angle, gamma)

fit

fit(
    df, id_col="Session", step_col="step", angle_col="angle"
)

score

score(
    df, id_col="Session", step_col="step", angle_col="angle"
)

reorder_states

reorder_states(order: List[int])

Model Fitting

fit_best_hmm

compass_labyrinth.compass.level_1.momentuPY.fit_best_hmm

fit_best_hmm(
    preproc_df: DataFrame,
    n_states: int = 2,
    n_repetitions: int = 20,
    opt_methods: list[str] = [
        "BFGS",
        "L-BFGS-B",
        "Nelder-Mead",
        "Powell",
    ],
    max_iter: int = 200,
    use_abs_angle: tuple[bool, ...] = (True, False),
    stationary_flag: str | bool = "auto",
    use_data_driven_ranges: bool = True,
    angle_mean_biased: tuple[float, float] = (
        np.pi / 2,
        0.0,
    ),
    session_col: str = "Session",
    seed: int = 123,
    enforce_behavioral_constraints: bool = True,
    show_progress: bool = True,
) -> BestResult
Source code in src/compass_labyrinth/compass/level_1/momentuPY.py
def fit_best_hmm(
    preproc_df: pd.DataFrame,
    n_states: int = 2,
    n_repetitions: int = 20,
    opt_methods: list[str] = ["BFGS", "L-BFGS-B", "Nelder-Mead", "Powell"],
    max_iter: int = 200,
    use_abs_angle: tuple[bool, ...] = (True, False),  # True => |angle|~Gamma ; False => angle~VM
    stationary_flag: str | bool = "auto",
    use_data_driven_ranges: bool = True,
    angle_mean_biased: tuple[float, float] = (np.pi / 2, 0.0),  # only for VM branch
    session_col: str = "Session",
    seed: int = 123,
    enforce_behavioral_constraints: bool = True,
    show_progress: bool = True,
) -> BestResult:

    rng = np.random.default_rng(seed)
    base = preproc_df.copy()

    if stationary_flag == "auto":
        lens = _contig_lengths(base[session_col].to_numpy())
        stationary = bool(np.median(lens) >= 100)
    else:
        stationary = bool(stationary_flag)

    records = []
    candidates: List[Tuple[GammaHMM, pd.DataFrame, Dict[str, Any]]] = []

    total = len(use_abs_angle) * len(opt_methods) * n_repetitions
    pbar = tqdm(total=total, desc="Gamma/VM HMM search", leave=False) if show_progress else None

    for abs_flag in use_abs_angle:
        angle_var = "angle_abs" if abs_flag else "angle"
        df_use = base.copy()
        if abs_flag:
            df_use["angle_abs"] = np.abs(df_use["angle"].to_numpy(float))
        if use_data_driven_ranges:
            _ = compute_parameter_ranges(df_use, angle_var, use_vm=(not abs_flag))

        for opt in opt_methods:
            for it in range(n_repetitions):
                this_seed = int(rng.integers(0, 10_000_000))
                try:
                    model = GammaHMM(
                        n_states=n_states,
                        angle_model=("absgamma" if abs_flag else "vm"),
                        max_iter=max_iter,
                        tol=1e-4,
                        method=opt,
                        random_state=this_seed,
                        stationary=stationary,
                        angle_bias=(angle_mean_biased if not abs_flag else None),
                    )
                    model.fit(df_use, id_col=session_col, step_col="step", angle_col="angle")

                    # Strict behavioral gate + enforce ordering
                    if enforce_behavioral_constraints and not _enforce_or_reject(model):
                        continue

                    summ = _summarize(model)
                    records.append(
                        {
                            "abs_angle": abs_flag,
                            "optimizer": opt,
                            "seed": this_seed,
                            "AIC": summ["AIC"],
                            "loglik": summ["loglik"],
                        }
                    )

                    # attach states/posteriors back to the original df rows
                    out = preproc_df.copy()
                    out["HMM_State"] = np.nan
                    for s in range(model.S):
                        out[f"Post_Prob_{s+1}"] = np.nan
                    # valid mask (same as fit)
                    m = np.isfinite(preproc_df["step"].to_numpy(float)) & np.isfinite(
                        (
                            np.abs(preproc_df["angle"].to_numpy(float))
                            if abs_flag
                            else preproc_df["angle"].to_numpy(float)
                        )
                    )
                    out.loc[m, "HMM_State"] = (model.viterbi_ + 1).astype(int)  # 1/2 labeling
                    # for s in range(model.S):      # Optional add the State Probs
                    #     out.loc[m, f"Post_Prob_{s+1}"] = model.posterior_[:, s]

                    candidates.append((model, out, summ))

                except Exception:
                    pass
                finally:
                    if pbar:
                        pbar.update(1)

    if pbar:
        pbar.close()
    if len(candidates) == 0:
        raise RuntimeError("No valid models met the behavioral objective across configurations.")

    # Select best by AIC
    best_idx = int(np.argmin([c[2]["AIC"] for c in candidates]))
    best_model, df_with_states, best_summary = candidates[best_idx]
    rec_df = pd.DataFrame.from_records(records).sort_values(
        ["AIC", "optimizer"],
        ascending=[True, True],
        kind="mergesort",
    )
    return BestResult(
        model=best_model,
        summary=best_summary,
        records=rec_df,
        data=df_with_states,
    )

save_compass_level_1_results

compass_labyrinth.compass.level_1.momentuPY.save_compass_level_1_results

save_compass_level_1_results(
    config: Dict[str, Any], results: BestResult
)

Save CoMPASS Level 1 HMM results to disk.

Exports: - model_summary.json: Model parameters and statistics - data_with_states.csv: Full dataset with HMM state assignments - model_selection_records.csv: All candidate models tried during fitting - fitted_model.joblib: Serialized GammaHMM object (if joblib available)

Parameters:

  • config (dict) –

    Project configuration containing 'project_path_full'

  • results (BestResult) –

    Result object from fit_best_hmm()

Source code in src/compass_labyrinth/compass/level_1/momentuPY.py
def save_compass_level_1_results(
    config: Dict[str, Any],
    results: BestResult,
):
    """
    Save CoMPASS Level 1 HMM results to disk.

    Exports:
        - model_summary.json: Model parameters and statistics
        - data_with_states.csv: Full dataset with HMM state assignments
        - model_selection_records.csv: All candidate models tried during fitting
        - fitted_model.joblib: Serialized GammaHMM object (if joblib available)

    Parameters
    ----------
    config : dict
        Project configuration containing 'project_path_full'
    results : BestResult
        Result object from fit_best_hmm()
    """
    dest_dir = Path(config["project_path_full"]) / "results" / "compass_level_1"
    dest_dir.mkdir(parents=True, exist_ok=True)

    # 1. Save model summary as JSON
    summary_path = dest_dir / "model_summary.json"
    with open(summary_path, "w") as f:
        json.dump(results.summary, f, indent=2)
    print(f"Saved model summary: {summary_path.name}")

    # 2. Save data with state assignments
    data_path = dest_dir / "data_with_states.csv"
    results.data.to_csv(data_path, index=False)
    print(f"Saved data with states: {data_path.name}")

    # 3. Save model selection records
    records_path = dest_dir / "model_selection_records.csv"
    results.records.to_csv(records_path, index=False)
    print(f"Saved model selection records: {records_path.name}")

    # 4. Save fitted model object (if joblib available)
    if joblib is not None:
        model_path = dest_dir / "fitted_model.joblib"
        joblib.dump(results.model, model_path)
        print(f"Saved fitted model: {model_path.name}")
    else:
        print(f"Skipped model serialization (joblib not installed)")

    print(f"\n All CoMPASS Level 1 results saved to: {dest_dir}")

Parameter Estimation

compute_parameter_ranges

compass_labyrinth.compass.level_1.momentuPY.compute_parameter_ranges

compute_parameter_ranges(
    df: DataFrame, angle_var: str, use_vm: bool = False
) -> Dict[str, Tuple[float, float]]
Source code in src/compass_labyrinth/compass/level_1/momentuPY.py
def compute_parameter_ranges(
    df: pd.DataFrame,
    angle_var: str,
    use_vm: bool = False,
) -> Dict[str, Tuple[float, float]]:
    step = df["step"].to_numpy(float)
    step_iqr = _iqr(step)
    step_median = np.nanmedian(step)
    step_mean_range = (max(1e-6, step_median - step_iqr), step_median + step_iqr)
    step_sd_range = (0.1, max(0.2, 1.5 * _mad(step)))

    angle = df[angle_var].to_numpy(float)
    angle_median = np.nanmedian(angle)
    angle_iqr = _iqr(angle)
    angle_mean_range = (max(0.0, angle_median - angle_iqr), angle_median + angle_iqr)
    angle_sd_range = (0.1, max(0.2, 1.5 * _mad(angle)))

    angle_conc_range = None
    if use_vm:
        c = np.nanmean(np.cos(angle))
        s = np.nanmean(np.sin(angle))
        R = np.sqrt(c**2 + s**2) if np.isfinite(c) and np.isfinite(s) else 0.0
        if R > 0:
            if R < 0.53:
                kappa = 2 * R + R**3 + (5 * R**5) / 6
            elif R < 0.85:
                kappa = -0.4 + 1.39 * R + 0.43 / (1 - R)
            else:
                kappa = 1 / (R**3 - 4 * R**2 + 3 * R)
            angle_conc_range = (1.0, float(min(max(5.0, 2 * kappa + 1.0), 50.0)))
        else:
            angle_conc_range = (1.0, 15.0)

    return dict(
        step_mean_range=step_mean_range,
        step_sd_range=step_sd_range,
        angle_mean_range=angle_mean_range,
        angle_sd_range=angle_sd_range,
        angle_conc_range=angle_conc_range,
    )

Inference Algorithms

forward_backward

compass_labyrinth.compass.level_1.momentuPY.forward_backward

forward_backward(log_lik, startprob, transmat, lengths)
Source code in src/compass_labyrinth/compass/level_1/momentuPY.py
def forward_backward(log_lik, startprob, transmat, lengths):
    S = startprob.shape[0]
    N = log_lik.shape[0]
    alpha = np.zeros((N, S))
    scales = np.zeros(N)
    idx = 0
    for L in lengths:
        a0 = np.log(startprob + 1e-15) + log_lik[idx]
        c0 = np.logaddexp.reduce(a0)
        alpha[idx] = a0 - c0
        scales[idx] = c0
        for t in range(idx + 1, idx + L):
            at = alpha[t - 1][:, None] + np.log(transmat + 1e-15)
            a = np.logaddexp.reduce(at, axis=0) + log_lik[t]
            ct = np.logaddexp.reduce(a)
            alpha[t] = a - ct
            scales[t] = ct
        idx += L
    beta = np.zeros((N, S))
    idx = 0
    for L in lengths:
        for t in range(idx + L - 2, idx - 1, -1):
            btn = (beta[t + 1] + log_lik[t + 1])[None, :] + np.log(transmat + 1e-15)
            b = np.logaddexp.reduce(btn, axis=1)
            beta[t] = b - np.logaddexp.reduce(b)
        idx += L
    g = alpha + beta
    g -= np.max(g, axis=1, keepdims=True)
    g = np.exp(g)
    g /= g.sum(axis=1, keepdims=True)
    xisum = np.zeros_like(transmat)
    idx = 0
    for L in lengths:
        for t in range(idx, idx + L - 1):
            M = alpha[t][:, None] + np.log(transmat + 1e-15) + log_lik[t + 1][None, :] + beta[t + 1][None, :]
            M -= np.max(M)
            P = np.exp(M)
            P /= P.sum()
            xisum += P
        idx += L
    total_ll = float(scales.sum())
    return g, xisum, total_ll

viterbi

compass_labyrinth.compass.level_1.momentuPY.viterbi

viterbi(log_lik, startprob, transmat, lengths)
Source code in src/compass_labyrinth/compass/level_1/momentuPY.py
def viterbi(log_lik, startprob, transmat, lengths):
    S = startprob.shape[0]
    N = log_lik.shape[0]
    path = np.empty(N, dtype=int)
    idx = 0
    for L in lengths:
        delta = np.log(startprob + 1e-15) + log_lik[idx]
        psi = np.zeros((L, S), dtype=int)
        deltas = np.zeros((L, S))
        deltas[0] = delta
        for t in range(1, L):
            prev = deltas[t - 1][:, None] + np.log(transmat + 1e-15)
            psi[t] = np.argmax(prev, axis=0)
            deltas[t] = np.max(prev, axis=0) + log_lik[idx + t]
        sT = int(np.argmax(deltas[L - 1]))
        seq = np.empty(L, dtype=int)
        seq[L - 1] = sT
        for t in range(L - 2, -1, -1):
            seq[t] = psi[t + 1, seq[t + 1]]
        path[idx : idx + L] = seq
        idx += L
    return path

Probability Distributions

logpdf_gamma

compass_labyrinth.compass.level_1.momentuPY.logpdf_gamma

logpdf_gamma(x, k, theta)
Source code in src/compass_labyrinth/compass/level_1/momentuPY.py
def logpdf_gamma(x, k, theta):
    x = np.asarray(x, dtype=float)
    x = np.clip(x, 1e-12, None)
    k = np.clip(k, 1e-8, None)
    theta = np.clip(theta, 1e-8, None)
    return (k - 1) * np.log(x) - (x / theta) - k * np.log(theta) - gammaln(k)

logpdf_vonmises

compass_labyrinth.compass.level_1.momentuPY.logpdf_vonmises

logpdf_vonmises(phi, mu, kappa)
Source code in src/compass_labyrinth/compass/level_1/momentuPY.py
def logpdf_vonmises(phi, mu, kappa):
    kappa = np.clip(kappa, 0.0, None)
    # stable log C = -log(2πI0(k)), with i0e handling exp(k) internally
    logC = -(np.log(2 * np.pi) + (np.log(i0e(kappa)) + np.abs(kappa)))
    return kappa * np.cos(phi - mu) + logC

Utilities

compass_labyrinth.compass.level_1.momentuPY.print_hmm_summary

print_hmm_summary(
    model_summary: Dict[str, Any], model: GammaHMM
)
Source code in src/compass_labyrinth/compass/level_1/momentuPY.py
def print_hmm_summary(model_summary: Dict[str, Any], model: GammaHMM):
    s = model_summary
    print("\n Best Model Characteristics:")
    print(f"• Angle type: {s['angle_type']}")
    print(f"• Optimizer: {s['optimizer']}")
    print(f"• AIC: {s['AIC']:.2f} | logLik: {s['loglik']:.2f}")
    print(f"• Start probs: {s['startprob']}")
    print("• Transmat:\n", np.array(s["transmat"]))
    print(f"• Step Means: {s['step_means']}")
    if model.angle_model == "vm":
        print(f"• VM mu: {s['vm_mu']}")
        print(f"• VM kappa: {s['vm_kappa']}")
    else:
        print(f"• |angle| Gamma k: {s['ang_k']}")
        print(f"• |angle| Gamma theta: {s['ang_theta']}")
    print(f"• Met behavioral constraints: {s['behavioral_constraint_met']}")
    print("• Final state ordering: State 1 = low step + high turn; State 2 = high step + low turn\n")