117 lines
3.9 KiB
Python
117 lines
3.9 KiB
Python
# MLE_fit.py
|
|
# %%
|
|
import numpy as np
|
|
import pandas as pd
|
|
from scipy.optimize import minimize
|
|
from scipy.special import logsumexp
|
|
|
|
# %%
|
|
def simulate_model(theta, choices=None, rewards=None, n_arms=4, T=None, seed=None):
|
|
"""Simple simulator to generate choices+rewards if not provided.
|
|
theta = (alpha, lam, f)
|
|
"""
|
|
alpha, lam, f = theta
|
|
rng = np.random.RandomState(seed)
|
|
if T is None:
|
|
if choices is None:
|
|
raise ValueError("Provide T or choices")
|
|
T = len(choices)
|
|
Q = np.zeros(n_arms)
|
|
sim_choices = []
|
|
sim_rewards = []
|
|
# example reward distributions: Bernoulli with different p
|
|
ps = np.linspace(0.2, 0.8, n_arms)
|
|
for t in range(T):
|
|
V = lam * Q
|
|
logits = V - logsumexp(V) # numerically stable
|
|
probs = np.exp(logits)
|
|
a = rng.choice(n_arms, p=probs)
|
|
r = rng.binomial(1, ps[a]) # example reward
|
|
# updates
|
|
Q[a] = Q[a] + alpha * (r - Q[a])
|
|
non_chosen = [i for i in range(n_arms) if i != a]
|
|
Q[non_chosen] = Q[non_chosen] * (1 - f)
|
|
sim_choices.append(a)
|
|
sim_rewards.append(r)
|
|
return np.array(sim_choices), np.array(sim_rewards)
|
|
|
|
def neg_log_likelihood_raw(params, choices, rewards, n_arms=4):
|
|
"""Params are in unconstrained real space; we'll transform inside the function."""
|
|
# params vector: [logit_alpha, log_lambda, logit_f]
|
|
la, lb, lc = params
|
|
# transforms
|
|
alpha = 1/(1+np.exp(-la)) # sigmoid -> (0,1)
|
|
lam = np.exp(lb) # positive
|
|
f = 1/(1+np.exp(-lc)) # sigmoid -> (0,1)
|
|
T = len(choices)
|
|
Q = np.zeros(n_arms)
|
|
loglik = 0.0
|
|
for t in range(T):
|
|
V = lam * Q
|
|
# compute stable log-softmax
|
|
logp = V - logsumexp(V)
|
|
a = choices[t]
|
|
loglik += logp[a]
|
|
r = rewards[t]
|
|
# update chosen
|
|
Q[a] = Q[a] + alpha * (r - Q[a])
|
|
# update non-chosen via forgetting
|
|
non_chosen = [i for i in range(n_arms) if i != a]
|
|
Q[non_chosen] = Q[non_chosen] * (1 - f)
|
|
return -loglik # negative log-likelihood for minimization
|
|
|
|
def fit_mle(choices, rewards, n_arms=4, n_starts=10):
|
|
best = None
|
|
for s in range(n_starts):
|
|
x0 = np.random.normal(size=3)
|
|
res = minimize(neg_log_likelihood_raw, x0,
|
|
args=(choices, rewards, n_arms),
|
|
method='L-BFGS-B',
|
|
options={'maxiter':5000})
|
|
if not res.success:
|
|
continue
|
|
if best is None or res.fun < best['fun']:
|
|
best = res
|
|
if best is None:
|
|
raise RuntimeError("MLE failed")
|
|
la, lb, lc = best.x
|
|
# transform back
|
|
alpha = 1/(1+np.exp(-la))
|
|
lam = np.exp(lb)
|
|
f = 1/(1+np.exp(-lc))
|
|
return {'alpha':alpha, 'lambda':lam, 'f':f, 'nll':best.fun, 'opt_result':best}
|
|
|
|
# %%
|
|
# --- Example usage with synthetic data ---
|
|
if __name__ == "__main__":
|
|
# generate synthetic participant
|
|
true_theta = (0.2, 2.0, 0.05) # alpha, lambda, f
|
|
choices, rewards = simulate_model(true_theta, T=200, seed=123)
|
|
res = fit_mle(choices, rewards, n_starts=20)
|
|
print("True theta:", true_theta)
|
|
print("MLE estimate:", res)
|
|
|
|
# %%
|
|
# Loading participants data
|
|
from load_data import all_participant_data, unique_participants
|
|
|
|
# %%
|
|
import os
|
|
save_dir = "results/model1"
|
|
if not os.path.exists(save_dir):
|
|
os.makedirs(save_dir)
|
|
save_path = os.path.join(save_dir, "mle.csv")
|
|
|
|
for pid in unique_participants:
|
|
pdata = all_participant_data[all_participant_data["participant"] == pid]
|
|
choices = pdata["choice"].values
|
|
rewards = pdata["rescaled_reward"].values
|
|
res = fit_mle(choices, rewards, n_starts=20)
|
|
print(f"Participant {pid} MLE estimate:", res)
|
|
pd_res = pd.DataFrame([res])
|
|
pd_res["participant"] = pid
|
|
pd_res = pd_res.reindex(columns=["participant", "alpha", "lambda", "f", "nll"])
|
|
# Save results
|
|
pd_res.to_csv(save_path, index=False, mode="a", header=not os.path.exists(save_path))
|
|
|
|
# %%
|