The essay hereby compares Dynamic Programming, Monte Carlo, Q-Learning and SARSA methods to solve the game described below.
PS: SARSA$(\lambda)$ algorithm can be found here.
Consider a two-player game that proceeds in successive rounds where in each round one player wins and other loses. The winner of the game is the player who first wins $d$ rounds more than the opponent, where $d>=1$ is a parameter.
Suppose we model this game so that we consider one of the players. In each round, this player has two available actions, either to invest 1) high or 2) low effort.
If the player invests high effort in a round, she wins this round with probability $p_H$, otherwise, she wins this round with probability $p_L$. And they are parameters such that $0<p_L<p_H<1$.
By investing high effort in a round, the player incurs a cost of value $c_H$, and otherwise, a cost of value $c_L$, in this round, with $0<c_L<c_H$.
The player receives a prize of value $R$ if winning the game.
The essay hererby assumes the following values for parameters:
$d=3$
$p_H=0.55$
$p_L=0.45$
$c_H=50$
$c_L=10$
$R=1000$
Termination criteria:
$||V_{t+1}-V_t||_{\infty}{\leq}1e-6$ or $Episodes=500000$
Find the best strategy of the game (i.e. the value of the respective action).
I hereby build a _gridworld() function to give the environment of states, and the next state (with some probabilities), reward and done flag of differnt actions. All environment attributes are set down here with the greedy policy function.
import numpy as np
from itertools import permutations, combinations_with_replacement # get permutations of policies
n_states = 7
n_actions = 2
low = 0
high = 1
d = 3
p_h = 0.55
p_l = 0.45
c_h = 50
c_l = 10
RETURN = 1000
def grid_world():
p = {}
grid = np.arange(n_states) # only one-dimension
it = np.nditer(grid)
is_done = lambda x: x == 0 or x == (n_states - 1) # write it here to aviod repeat
with it:
while not it.finished:
s = it.iterindex
p[s] = {a: [] for a in range(n_actions)}
if s == n_states - 1: # terminal state
p[s][low] = [(1.0, s, 0, True)]
p[s][high] = [(1.0, s, 0, True)]
elif s == 0: # terminal state
p[s][low] = [(1.0, s, 0, True)]
p[s][high] = [(1.0, s, 0, True)]
else: # - cost if not terminate else r - cost (win) or - cost (lost)
p[s][low] = [(p_l, s+1, - c_l if s+1 != n_states-1 else RETURN - c_l, is_done(s+1)),
(1-p_l, s-1, - c_l, is_done(s-1))]
p[s][high] = [(p_h, s+1, - c_h if s+1 != n_states-1 else RETURN - c_h, is_done(s+1)),
(1-p_h, s-1, - c_h, is_done(s-1))]
it.iternext()
return p
def epsilon_greedy_policy(q_values, epsilon=0.1):
if np.random.binomial(1, epsilon) == 1:
return np.random.choice([low, high])
else:
return np.random.choice([action_ for action_, value_ in enumerate(q_values) if value_ == np.max(q_values)])
Compute numerically the value function, by assuming perfect knowledge of the environment, for each deterministic policy and rank these policies with respect to the value at the initial state.
def main():
lam = 1.0 # assume no decay
theta = 1e-6
pi_a = 1.0 # assume deterministic
transition_probs = grid_world()
# Assume the policies (with no need to think about the two termination states)
policies = [tuple([0]) + j + tuple([0]) for i in combinations_with_replacement(range(n_actions), n_states-2)
for j in set(permutations(i))]
all_state_values = []
for policy in policies:
state_values = np.zeros(n_states) # Initial state values
iteration_counter = 1
while True:
v_old = np.copy(state_values)
delta = 0.0
for s in range(n_states):
v_s = 0.0
current_entries = transition_probs[s][policy[s]] # different results possible
for current_entry in current_entries:
p_sa = current_entry[0]
next_s = current_entry[1]
reward = current_entry[2]
v_s += pi_a * p_sa * (reward + lam * v_old[next_s])
state_values[s] = v_s
delta = np.maximum(delta, np.abs(state_values[s] - v_old[s]))
iteration_counter += 1
if delta < theta: # converge
break
all_state_values.append(state_values)
policy_value = zip(policies, all_state_values)
policy_value = sorted(policy_value, key=lambda x: x[1][3], reverse=True)
print("\n-------All %d policies are checked.------"%len(policy_value),
"\nThe best policy is", policy_value[0][0],
"\nAll values are", policy_value[0][1],
"\nWith initial value as", policy_value[0][1][3],
"\n\nAll ranked policy are below:")
for i, j in policy_value:
print(i, ":\n", " ".join([str(num) for num in j]), sep="")
if __name__ == '__main__':
main()
Firstly, Estimate the action-value function by using the off-policy Monte Carlo estimation method for a threshold policy $\pi$ such that $\pi(s)=high$ whenever $s<=s^*$ and $\pi(s)=low$ otherwise, for given threshold value $s^*=1$ where the behaviour policy, in each state, selects either action equiprobably.
Then, Compute the optimal policy by using the off-policy Monte Carlo algorithm for the behavior policy that in each state selects either action equiprobably. Show action values and policy
star = 1 + 3 # state below than this
def main():
qsa = np.zeros([n_states, n_actions]) # value Q(s, a)
c = np.zeros([n_states, n_actions]) # The cumulative denominator
lam = 1
policy = [1 if i <= star and i >= 1 else 0 for i in range(n_states)]
transition_probs = grid_world()
episodes = 500000
for index in range(episodes):
done = False
s = 0 + 3
state_action_history = []
while not done:
a = np.random.choice([low, high])
current_entries = transition_probs[s][a]
probs = [i[0] for i in current_entries]
current_entry = current_entries[np.random.choice([0, 1], p = probs if len(probs) == 2 else [1, 0])] # generate the episode
r = current_entry[2]
state_action_history.append([s, a, r])
s = current_entry[1] # next state
done = current_entry[-1] # stop loop
g = 0
w = 1
for s, a, r in reversed(state_action_history):
g = lam * g + r
c[s, a] = c[s, a] + w
qsa[s, a] = qsa[s, a] + w / c[s, a] * (g - qsa[s, a])
w = w * (a == policy[s]) / 0.5
if w == 0: # confirm no nan
break
if index % 100000 == 0:
print('%s Iteration(s):\n' % index, qsa)
print("\n At last, Q(s, a) for Policy", policy, ":\n", qsa)
if __name__ == '__main__':
main()
def main():
qsa = np.zeros([n_states, n_actions]) # value Q(s, a)
c = np.zeros([n_states, n_actions]) # The cumulative denominator
pi_s = np.argmax(qsa, axis=1)
lam = 1
transition_probs = grid_world()
episodes = 500000
for index in range(episodes):
done = False
s = 0 + 3
state_action_history = []
while not done:
a = np.random.choice([low, high])
current_entries = transition_probs[s][a]
probs = [i[0] for i in current_entries]
current_entry = current_entries[np.random.choice([0, 1], p = probs if len(probs) == 2 else [1, 0])] # generate the episode
r = current_entry[2]
state_action_history.append([s, a, r])
s = current_entry[1] # next state
done = current_entry[-1] # stop loop
g = 0
w = 1
for s, a, r in reversed(state_action_history):
g = lam * g + r
c[s, a] = c[s, a] + w
qsa[s, a] = qsa[s, a] + w / c[s, a] * (g - qsa[s, a])
pi_s[s] = np.argmax(qsa[s])
if a != pi_s[s]:
break
w = w * 1 / 0.5
if index % 100000 == 0 and index != 0:
print('%s Iteration(s):' % index, pi_s)
print(qsa)
print("\n At last, Optimal Policy is", pi_s,
"\n The related action value:\n", qsa)
if __name__ == '__main__':
main()
Solve the problem by using Q-learning algorithm, with $\epsilon$-greedy with $\epsilon=0.1$.
def q_learning(qsa, next_qs, r, alpha=0.1, gamma=1.0):
return qsa + alpha * (r + gamma * np.max(next_qs) - qsa)
def main():
qsa = np.zeros([n_states, n_actions]) # value Q(s, a)
transition_probs = grid_world()
episodes = 500000
for index in range(episodes):
done = False
s = 0 + 3
while not done:
a = epsilon_greedy_policy(qsa[s])
current_entries = transition_probs[s][a] # get sequences
probs = [i[0] for i in current_entries]
_, s_p, r, done = current_entries[np.random.choice([0, 1], p = probs if len(probs) == 2 else [1, 0])]
qsa[s, a] = q_learning(qsa[s, a], qsa[s_p], r)
s = s_p
if index % 100000 == 0 and index != 0:
print('%s Iteration(s):' %index, np.argmax(qsa, axis=1))
print(qsa)
print("\n At last, Optimal Policy is", np.argmax(qsa, axis=1),
"\n The related action value:\n", qsa)
if __name__ == '__main__':
main()
$\epsilon$-greedy with $\epsilon=0.1$
def sarsa(qsa, next_qsa, r, alpha=0.1, gamma=1.0):
return qsa + alpha * (r + gamma * next_qsa - qsa)
def main():
qsa = np.zeros([n_states, n_actions]) # value Q(s, a)
transition_probs = grid_world()
episodes = 500000
for index in range(episodes):
done = False
s = 0 + 3
a = epsilon_greedy_policy(qsa[s])
while not done:
current_entries = transition_probs[s][a]
probs = [i[0] for i in current_entries] # get sequences
_, s_p, r, done = current_entries[np.random.choice([0, 1], p = probs if len(probs) == 2 else [1, 0])]
a_p = epsilon_greedy_policy(qsa[s_p])
qsa[s, a] = sarsa(qsa[s, a], qsa[s_p, a_p], r)
s = s_p
a = a_p
if index % 100000 == 0 and index != 0:
print('%s Iteration(s):' %index, np.argmax(qsa, axis=1))
print(qsa)
print("\n At last, Optimal Policy is", np.argmax(qsa, axis=1),
"\n The related action value:\n", qsa)
if __name__ == '__main__':
main()
$\lambda=0.9$ and step size $\eta=0.0001$.
def take_random_action(): # evaluate the randome policy
return np.random.binomial(1, 0.5)
def main():
gamma = 1.0
lam = 0.9
step = 0.0001
vs = np.zeros(n_states)
es = np.zeros(n_states)
transition_probs = grid_world()
episodes = 500000
for index in range(episodes):
done = False
s = 0 + 3
while not done:
a = take_random_action()
current_entries = transition_probs[s][a]
probs = [i[0] for i in current_entries] # get next step
_, s_p, r, done = current_entries[np.random.choice([0, 1], p = probs if len(probs) == 2 else [1, 0])]
theta = r + gamma * vs[s_p] - vs[s]
es[s] += 1
for s_pp in range(n_states):
vs[s_pp] += step * es[s_pp] * theta
es[s_pp] *= gamma * lam
s = s_p
if index % 100000 == 0 and index != 0:
print('%s Iteration(s):\n' %index, vs)
print("\n At last, value of the Policy is:\n", vs)
if __name__ == '__main__':
main()