# MLE_fit.py # %% import numpy as np import pandas as pd from scipy.optimize import minimize from scipy.special import logsumexp # %% def simulate_model(theta, choices=None, rewards=None, n_arms=4, T=None, seed=None): """Simple simulator to generate choices+rewards if not provided. theta = (alpha, lam) """ alpha, f = theta rng = np.random.RandomState(seed) if T is None: if choices is None: raise ValueError("Provide T or choices") T = len(choices) Q = np.zeros(n_arms) sim_choices = [] sim_rewards = [] # example reward distributions: Bernoulli with different p ps = np.linspace(0.2, 0.8, n_arms) for t in range(T): V = Q logits = V - logsumexp(V) # numerically stable probs = np.exp(logits) a = rng.choice(n_arms, p=probs) r = rng.binomial(1, ps[a]) # example reward # updates Q[a] = Q[a] + alpha * (r - Q[a]) non_chosen = [i for i in range(n_arms) if i != a] Q[non_chosen] = (1-f) * Q[non_chosen] sim_choices.append(a) sim_rewards.append(r) return np.array(sim_choices), np.array(sim_rewards) def neg_log_likelihood_raw(params, choices, rewards, n_arms=4): """Params are in unconstrained real space; we'll transform inside the function.""" # params vector: [logit_alpha, log_lambda, logit_f] la, lb = params # transforms alpha = 1/(1+np.exp(-la)) # sigmoid -> (0,1) f = 1/(1+np.exp(-lb)) # sigmoid -> (0,1) T = len(choices) Q = np.zeros(n_arms) loglik = 0.0 for t in range(T): V = Q # compute stable log-softmax logp = V - logsumexp(V) a = choices[t] loglik += logp[a] r = rewards[t] # update chosen Q[a] = Q[a] + alpha * (r - Q[a]) # update non-chosen via forgetting non_chosen = [i for i in range(n_arms) if i != a] Q[non_chosen] = (1-f) * Q[non_chosen] # no forgetting in this model return -loglik # negative log-likelihood for minimization def fit_mle(choices, rewards, n_arms=4, n_starts=10): best = None for s in range(n_starts): x0 = np.random.normal(size=2) # la, lb res = minimize(neg_log_likelihood_raw, x0, args=(choices, rewards, n_arms), method='L-BFGS-B', options={'maxiter':5000}) if not res.success: continue if best is None or res.fun < best['fun']: best = res if best is None: raise RuntimeError("MLE failed") la, lb = best.x # transform back alpha = 1 / (1+np.exp(-la)) f = 1 / (1 + np.exp(-lb)) return {'alpha':alpha, 'f':f, 'nll':best.fun, 'opt_result':best} # %% # --- Example usage with synthetic data --- if __name__ == "__main__": # generate synthetic participant true_theta = (0.2, 0.05) # alpha, f choices, rewards = simulate_model(true_theta, T=200, seed=123) res = fit_mle(choices, rewards, n_starts=20) print("True theta:", true_theta) print("MLE estimate:", res) # %% # Loading participants data from load_data import all_participant_data, unique_participants # %% import os save_dir = "results/model0" if not os.path.exists(save_dir): os.makedirs(save_dir) save_path = os.path.join(save_dir, "mle.csv") for pid in unique_participants: pdata = all_participant_data[all_participant_data["participant"] == pid] choices = pdata["choice"].values rewards = pdata["rescaled_reward"].values res = fit_mle(choices, rewards, n_starts=20) print(f"Participant {pid} MLE estimate:", res) pd_res = pd.DataFrame([res]) pd_res["participant"] = pid pd_res = pd_res.reindex(columns=["participant", "alpha", "f", "nll"]) # Save results pd_res.to_csv(save_path, index=False, mode="a", header=not os.path.exists(save_path)) # %%