134 lines
4.3 KiB
Python
134 lines
4.3 KiB
Python
# MLE_fit.py
|
|
# %%
|
|
import numpy as np
|
|
import pandas as pd
|
|
from scipy.optimize import minimize
|
|
from scipy.special import logsumexp
|
|
|
|
|
|
# %%
|
|
N_PARAM = 2 # alpha_gain, alpha_loss
|
|
def simulate_model(theta, choices=None, rewards=None, n_arms=4, T=None, seed=None):
|
|
"""Simple simulator to generate choices+rewards if not provided.
|
|
theta = (alpha, lam)
|
|
"""
|
|
alpha_gain, alpha_loss = theta
|
|
rng = np.random.RandomState(seed)
|
|
if T is None:
|
|
if choices is None:
|
|
raise ValueError("Provide T or choices")
|
|
T = len(choices)
|
|
Q = np.zeros(n_arms)
|
|
sim_choices = []
|
|
sim_rewards = []
|
|
# example reward distributions: Bernoulli with different p
|
|
ps = np.linspace(0.2, 0.8, n_arms)
|
|
for t in range(T):
|
|
V = Q
|
|
logits = V - logsumexp(V) # numerically stable
|
|
probs = np.exp(logits)
|
|
a = rng.choice(n_arms, p=probs)
|
|
r = rng.binomial(1, ps[a]) * (1 if rng.rand() < 0.5 else -1) # example reward with gain/loss
|
|
# updates
|
|
if r >= 0:
|
|
Q[a] = Q[a] + alpha_gain * (r - Q[a])
|
|
else:
|
|
Q[a] = Q[a] + alpha_loss * (r - Q[a])
|
|
non_chosen = [i for i in range(n_arms) if i != a]
|
|
Q[non_chosen] = Q[non_chosen]
|
|
sim_choices.append(a)
|
|
sim_rewards.append(r)
|
|
return np.array(sim_choices), np.array(sim_rewards)
|
|
|
|
|
|
def neg_log_likelihood_raw(params, choices, rewards, n_arms=4):
|
|
"""Params are in unconstrained real space; we'll transform inside the function."""
|
|
# params vector: [logit_alpha_gain, logit_alpha_loss]
|
|
lag, lal = params
|
|
# transforms
|
|
alpha_gain = 1 / (1 + np.exp(-lag)) # sigmoid -> (0,1)
|
|
alpha_loss = 1 / (1 + np.exp(-lal)) # sigmoid -> (0,1)
|
|
T = len(choices)
|
|
Q = np.zeros(n_arms)
|
|
loglik = 0.0
|
|
for t in range(T):
|
|
V = Q
|
|
# compute stable log-softmax
|
|
logp = V - logsumexp(V)
|
|
a = choices[t]
|
|
loglik += logp[a]
|
|
r = rewards[t]
|
|
# update chosen
|
|
if r >= 0:
|
|
Q[a] = Q[a] + alpha_gain * (r - Q[a])
|
|
else:
|
|
Q[a] = Q[a] + alpha_loss * (r - Q[a])
|
|
# update non-chosen via forgetting
|
|
non_chosen = [i for i in range(n_arms) if i != a]
|
|
Q[non_chosen] = Q[non_chosen] # no forgetting in this model
|
|
return -loglik # negative log-likelihood for minimization
|
|
|
|
|
|
def fit_mle(choices, rewards, n_arms=4, x0=None, n_starts=10):
|
|
best = None
|
|
for s in range(n_starts):
|
|
if x0 is None:
|
|
x0 = np.random.normal(size=N_PARAM) # lag, lal
|
|
res = minimize(
|
|
neg_log_likelihood_raw,
|
|
x0,
|
|
args=(choices, rewards, n_arms),
|
|
method="L-BFGS-B",
|
|
options={"maxiter": 5000},
|
|
)
|
|
if not res.success:
|
|
continue
|
|
if best is None or res.fun < best["fun"]:
|
|
best = res
|
|
if best is None:
|
|
raise RuntimeError("MLE failed")
|
|
lag, lal = best.x
|
|
# transform back
|
|
alpha_gain = 1 / (1 + np.exp(-lag))
|
|
alpha_loss = 1 / (1 + np.exp(-lal))
|
|
return {"alpha_gain": alpha_gain, "alpha_loss": alpha_loss, "nll": best.fun, "opt_result": best}
|
|
|
|
# %%
|
|
# --- Example usage with synthetic data ---
|
|
if __name__ == "__main__":
|
|
np.random.seed(42)
|
|
# generate synthetic participant
|
|
true_theta = (0.2, 0.9) # alpha_gain, alpha_loss
|
|
choices, rewards = simulate_model(true_theta, T=200, seed=123)
|
|
res = fit_mle(choices, rewards, n_starts=20)
|
|
print("True theta:", true_theta)
|
|
print("MLE estimate:", res)
|
|
|
|
# %%
|
|
# Loading participants data
|
|
from load_data import all_participant_data, unique_participants
|
|
|
|
# %%
|
|
import os
|
|
|
|
save_dir = "results/model2"
|
|
if not os.path.exists(save_dir):
|
|
os.makedirs(save_dir)
|
|
save_path = os.path.join(save_dir, "mle.csv")
|
|
|
|
for pid in unique_participants:
|
|
pdata = all_participant_data[all_participant_data["participant"] == pid]
|
|
choices = pdata["choice"].values
|
|
rewards = pdata["rescaled_reward"].values
|
|
res = fit_mle(choices, rewards, x0=np.array([1.0, 1.0]), n_starts=20)
|
|
print(f"Participant {pid} MLE estimate:", res)
|
|
pd_res = pd.DataFrame([res])
|
|
pd_res["participant"] = pid
|
|
pd_res = pd_res.reindex(columns=["participant", "alpha_gain", "alpha_loss", "nll"])
|
|
pd_res["model"] = "model2_loss_gain"
|
|
# Save results
|
|
pd_res.to_csv(
|
|
save_path, index=False, mode="a", header=not os.path.exists(save_path)
|
|
)
|
|
|
|
# %%
|