REE-RL-Lola/model0.py
2025-12-01 21:07:40 +01:00

116 lines
3.8 KiB
Python

# MLE_fit.py
# %%
import numpy as np
import pandas as pd
from scipy.optimize import minimize
from scipy.special import logsumexp
# %%
def simulate_model(theta, choices=None, rewards=None, n_arms=4, T=None, seed=None):
"""Simple simulator to generate choices+rewards if not provided.
theta = (alpha, lam)
"""
alpha, f = theta
rng = np.random.RandomState(seed)
if T is None:
if choices is None:
raise ValueError("Provide T or choices")
T = len(choices)
Q = np.zeros(n_arms)
sim_choices = []
sim_rewards = []
# example reward distributions: Bernoulli with different p
ps = np.linspace(0.2, 0.8, n_arms)
for t in range(T):
V = Q
logits = V - logsumexp(V) # numerically stable
probs = np.exp(logits)
a = rng.choice(n_arms, p=probs)
r = rng.binomial(1, ps[a]) # example reward
# updates
Q[a] = Q[a] + alpha * (r - Q[a])
non_chosen = [i for i in range(n_arms) if i != a]
Q[non_chosen] = (1-f) * Q[non_chosen]
sim_choices.append(a)
sim_rewards.append(r)
return np.array(sim_choices), np.array(sim_rewards)
def neg_log_likelihood_raw(params, choices, rewards, n_arms=4):
"""Params are in unconstrained real space; we'll transform inside the function."""
# params vector: [logit_alpha, log_lambda, logit_f]
la, lb = params
# transforms
alpha = 1/(1+np.exp(-la)) # sigmoid -> (0,1)
f = 1/(1+np.exp(-lb)) # sigmoid -> (0,1)
T = len(choices)
Q = np.zeros(n_arms)
loglik = 0.0
for t in range(T):
V = Q
# compute stable log-softmax
logp = V - logsumexp(V)
a = choices[t]
loglik += logp[a]
r = rewards[t]
# update chosen
Q[a] = Q[a] + alpha * (r - Q[a])
# update non-chosen via forgetting
non_chosen = [i for i in range(n_arms) if i != a]
Q[non_chosen] = (1-f) * Q[non_chosen] # no forgetting in this model
return -loglik # negative log-likelihood for minimization
def fit_mle(choices, rewards, n_arms=4, n_starts=10):
best = None
for s in range(n_starts):
x0 = np.random.normal(size=2) # la, lb
res = minimize(neg_log_likelihood_raw, x0,
args=(choices, rewards, n_arms),
method='L-BFGS-B',
options={'maxiter':5000})
if not res.success:
continue
if best is None or res.fun < best['fun']:
best = res
if best is None:
raise RuntimeError("MLE failed")
la, lb = best.x
# transform back
alpha = 1 / (1+np.exp(-la))
f = 1 / (1 + np.exp(-lb))
return {'alpha':alpha, 'f':f, 'nll':best.fun, 'opt_result':best}
# %%
# --- Example usage with synthetic data ---
if __name__ == "__main__":
# generate synthetic participant
true_theta = (0.2, 0.05) # alpha, f
choices, rewards = simulate_model(true_theta, T=200, seed=123)
res = fit_mle(choices, rewards, n_starts=20)
print("True theta:", true_theta)
print("MLE estimate:", res)
# %%
# Loading participants data
from load_data import all_participant_data, unique_participants
# %%
import os
save_dir = "results/model0"
if not os.path.exists(save_dir):
os.makedirs(save_dir)
save_path = os.path.join(save_dir, "mle.csv")
for pid in unique_participants:
pdata = all_participant_data[all_participant_data["participant"] == pid]
choices = pdata["choice"].values
rewards = pdata["rescaled_reward"].values
res = fit_mle(choices, rewards, n_starts=20)
print(f"Participant {pid} MLE estimate:", res)
pd_res = pd.DataFrame([res])
pd_res["participant"] = pid
pd_res = pd_res.reindex(columns=["participant", "alpha", "f", "nll"])
# Save results
pd_res.to_csv(save_path, index=False, mode="a", header=not os.path.exists(save_path))
# %%