# MLE_fit.py # %% import numpy as np import pandas as pd from scipy.optimize import minimize from scipy.special import logsumexp # %% N_PARAM = 2 # alpha_gain, alpha_loss def simulate_model(theta, choices=None, rewards=None, n_arms=4, T=None, seed=None): """Simple simulator to generate choices+rewards if not provided. theta = (alpha, lam) """ alpha_gain, alpha_loss = theta rng = np.random.RandomState(seed) if T is None: if choices is None: raise ValueError("Provide T or choices") T = len(choices) Q = np.zeros(n_arms) sim_choices = [] sim_rewards = [] # example reward distributions: Bernoulli with different p ps = np.linspace(0.2, 0.8, n_arms) for t in range(T): V = Q logits = V - logsumexp(V) # numerically stable probs = np.exp(logits) a = rng.choice(n_arms, p=probs) r = rng.binomial(1, ps[a]) * (1 if rng.rand() < 0.5 else -1) # example reward with gain/loss # updates if r >= 0: Q[a] = Q[a] + alpha_gain * (r - Q[a]) else: Q[a] = Q[a] + alpha_loss * (r - Q[a]) non_chosen = [i for i in range(n_arms) if i != a] Q[non_chosen] = Q[non_chosen] sim_choices.append(a) sim_rewards.append(r) return np.array(sim_choices), np.array(sim_rewards) def neg_log_likelihood_raw(params, choices, rewards, n_arms=4): """Params are in unconstrained real space; we'll transform inside the function.""" # params vector: [logit_alpha_gain, logit_alpha_loss] lag, lal = params # transforms alpha_gain = 1 / (1 + np.exp(-lag)) # sigmoid -> (0,1) alpha_loss = 1 / (1 + np.exp(-lal)) # sigmoid -> (0,1) T = len(choices) Q = np.zeros(n_arms) loglik = 0.0 for t in range(T): V = Q # compute stable log-softmax logp = V - logsumexp(V) a = choices[t] loglik += logp[a] r = rewards[t] # update chosen if r >= 0: Q[a] = Q[a] + alpha_gain * (r - Q[a]) else: Q[a] = Q[a] + alpha_loss * (r - Q[a]) # update non-chosen via forgetting non_chosen = [i for i in range(n_arms) if i != a] Q[non_chosen] = Q[non_chosen] # no forgetting in this model return -loglik # negative log-likelihood for minimization def fit_mle(choices, rewards, n_arms=4, x0=None, n_starts=10): best = None for s in range(n_starts): if x0 is None: x0 = np.random.normal(size=N_PARAM) # lag, lal res = minimize( neg_log_likelihood_raw, x0, args=(choices, rewards, n_arms), method="L-BFGS-B", options={"maxiter": 5000}, ) if not res.success: continue if best is None or res.fun < best["fun"]: best = res if best is None: raise RuntimeError("MLE failed") lag, lal = best.x # transform back alpha_gain = 1 / (1 + np.exp(-lag)) alpha_loss = 1 / (1 + np.exp(-lal)) return {"alpha_gain": alpha_gain, "alpha_loss": alpha_loss, "nll": best.fun, "opt_result": best} # %% # --- Example usage with synthetic data --- if __name__ == "__main__": np.random.seed(42) # generate synthetic participant true_theta = (0.2, 0.9) # alpha_gain, alpha_loss choices, rewards = simulate_model(true_theta, T=200, seed=123) res = fit_mle(choices, rewards, n_starts=20) print("True theta:", true_theta) print("MLE estimate:", res) # %% # Loading participants data from load_data import all_participant_data, unique_participants # %% import os save_dir = "results/model2" if not os.path.exists(save_dir): os.makedirs(save_dir) save_path = os.path.join(save_dir, "mle.csv") for pid in unique_participants: pdata = all_participant_data[all_participant_data["participant"] == pid] choices = pdata["choice"].values rewards = pdata["rescaled_reward"].values res = fit_mle(choices, rewards, x0=np.array([1.0, 1.0]), n_starts=20) print(f"Participant {pid} MLE estimate:", res) pd_res = pd.DataFrame([res]) pd_res["participant"] = pid pd_res = pd_res.reindex(columns=["participant", "alpha_gain", "alpha_loss", "nll"]) pd_res["model"] = "model2_loss_gain" # Save results pd_res.to_csv( save_path, index=False, mode="a", header=not os.path.exists(save_path) ) # %%