REE-RL-Lola/model2_loss_gain.py
2025-12-01 21:07:40 +01:00

134 lines
4.3 KiB
Python

# MLE_fit.py
# %%
import numpy as np
import pandas as pd
from scipy.optimize import minimize
from scipy.special import logsumexp
# %%
N_PARAM = 2 # alpha_gain, alpha_loss
def simulate_model(theta, choices=None, rewards=None, n_arms=4, T=None, seed=None):
"""Simple simulator to generate choices+rewards if not provided.
theta = (alpha, lam)
"""
alpha_gain, alpha_loss = theta
rng = np.random.RandomState(seed)
if T is None:
if choices is None:
raise ValueError("Provide T or choices")
T = len(choices)
Q = np.zeros(n_arms)
sim_choices = []
sim_rewards = []
# example reward distributions: Bernoulli with different p
ps = np.linspace(0.2, 0.8, n_arms)
for t in range(T):
V = Q
logits = V - logsumexp(V) # numerically stable
probs = np.exp(logits)
a = rng.choice(n_arms, p=probs)
r = rng.binomial(1, ps[a]) * (1 if rng.rand() < 0.5 else -1) # example reward with gain/loss
# updates
if r >= 0:
Q[a] = Q[a] + alpha_gain * (r - Q[a])
else:
Q[a] = Q[a] + alpha_loss * (r - Q[a])
non_chosen = [i for i in range(n_arms) if i != a]
Q[non_chosen] = Q[non_chosen]
sim_choices.append(a)
sim_rewards.append(r)
return np.array(sim_choices), np.array(sim_rewards)
def neg_log_likelihood_raw(params, choices, rewards, n_arms=4):
"""Params are in unconstrained real space; we'll transform inside the function."""
# params vector: [logit_alpha_gain, logit_alpha_loss]
lag, lal = params
# transforms
alpha_gain = 1 / (1 + np.exp(-lag)) # sigmoid -> (0,1)
alpha_loss = 1 / (1 + np.exp(-lal)) # sigmoid -> (0,1)
T = len(choices)
Q = np.zeros(n_arms)
loglik = 0.0
for t in range(T):
V = Q
# compute stable log-softmax
logp = V - logsumexp(V)
a = choices[t]
loglik += logp[a]
r = rewards[t]
# update chosen
if r >= 0:
Q[a] = Q[a] + alpha_gain * (r - Q[a])
else:
Q[a] = Q[a] + alpha_loss * (r - Q[a])
# update non-chosen via forgetting
non_chosen = [i for i in range(n_arms) if i != a]
Q[non_chosen] = Q[non_chosen] # no forgetting in this model
return -loglik # negative log-likelihood for minimization
def fit_mle(choices, rewards, n_arms=4, x0=None, n_starts=10):
best = None
for s in range(n_starts):
if x0 is None:
x0 = np.random.normal(size=N_PARAM) # lag, lal
res = minimize(
neg_log_likelihood_raw,
x0,
args=(choices, rewards, n_arms),
method="L-BFGS-B",
options={"maxiter": 5000},
)
if not res.success:
continue
if best is None or res.fun < best["fun"]:
best = res
if best is None:
raise RuntimeError("MLE failed")
lag, lal = best.x
# transform back
alpha_gain = 1 / (1 + np.exp(-lag))
alpha_loss = 1 / (1 + np.exp(-lal))
return {"alpha_gain": alpha_gain, "alpha_loss": alpha_loss, "nll": best.fun, "opt_result": best}
# %%
# --- Example usage with synthetic data ---
if __name__ == "__main__":
np.random.seed(42)
# generate synthetic participant
true_theta = (0.2, 0.9) # alpha_gain, alpha_loss
choices, rewards = simulate_model(true_theta, T=200, seed=123)
res = fit_mle(choices, rewards, n_starts=20)
print("True theta:", true_theta)
print("MLE estimate:", res)
# %%
# Loading participants data
from load_data import all_participant_data, unique_participants
# %%
import os
save_dir = "results/model2"
if not os.path.exists(save_dir):
os.makedirs(save_dir)
save_path = os.path.join(save_dir, "mle.csv")
for pid in unique_participants:
pdata = all_participant_data[all_participant_data["participant"] == pid]
choices = pdata["choice"].values
rewards = pdata["rescaled_reward"].values
res = fit_mle(choices, rewards, x0=np.array([1.0, 1.0]), n_starts=20)
print(f"Participant {pid} MLE estimate:", res)
pd_res = pd.DataFrame([res])
pd_res["participant"] = pid
pd_res = pd_res.reindex(columns=["participant", "alpha_gain", "alpha_loss", "nll"])
pd_res["model"] = "model2_loss_gain"
# Save results
pd_res.to_csv(
save_path, index=False, mode="a", header=not os.path.exists(save_path)
)
# %%