强化学习笔记

Preliminary

  • Robbins-Monro Algorithm

    Robbins-Monro Algorithm is designed to solve the following equation:

    [int c(s, heta) au_{ heta}(s)=0 ]

    where ( au_ heta) is a distribution of (s) parameterized by ( heta).

    We can use following rule to obtain ( heta^*)

    [ heta_{k+1} = heta_k-eta_k c(s_k, c_k) ]

    Q-learning Algorithm uses Robbins-Monro to update Q function, i.e.,

    [Q'(s,a) = r+lambda Q(s_{t+1}, a_{t+1}) - Q(s, a) ]

    The original equation of (Q(s, a)) and (Q(s_{t+1}, a_{t+1})) is

    [Q(s,a)=r + lambda sum_{s_{t+1}} P(s_{t+1}vert s_t, a)max_{a'}Q(s_{t+1}, a') ]

    The sum of (s_{t+1}) can be considered as expectation. If we take (Q(s, a)) and (r) into the expectation, the above equation follows Robbin-Monro Algorithm.

Policy Gradient

Now, we parameterize the policy distribution with parameter ( heta) denoted by ( au_ heta) and the objective function is (J( heta)). We want to minimize (J( heta)) (or maximize, depends on the definition) and improve our policy by optimizing ( heta).

Let reward function (mu^{ au_{ heta}}(s_0)=sumlimits_a au(avert s_0)Q^{ au_ heta}(s_0,a)) be the objective function (Note, we neglect time term (t)) and take gradient with respect to ( heta)

[egin{align*} abla_{ heta} mu^{ au_ heta}(s_0)&=sum_a( abla au(avert s_0)Q(a,s_0)+ au(avert s_0) abla Q(a, s_0))\ &=sum_a( abla au(avert s_0)Q(a,s_0)+ au(avert s_0) abla sum_{s'.r'}P(s',r'vert a,s_0)(r'+mu ^{ au_ heta}(s')))\ end{align*} ]

Lemma:

If (I-P > 0), for the equation ((I-P)x=y), we have

[egin{align*} x&=(I-P)^{-1}y\ &=sum_{k=0}^{infin} P^{k}y end{align*} ]

Using this lemma, we have

[ ablamu^{ au_ heta}(s_0) = sum_{xin mathcal S}sum_{k=0}^infin P(s=x, k, au_ heta)sum_a abla au_{ heta}(avert x)Q^{ au}(x,a) ]

Let (eta(x)) denotes (sumlimits_{k=0}^{infin}P(s=x,k, au_ heta)), we can rewrite the equation

[egin{align*} ablamu^{ au_ heta}(s_0) &= sum_{xin mathcal S}eta(x)sum_a abla au_{ heta}(avert x)Q^{ au}(x,a)\ &proptosum_{xin mathcal S}frac{eta(x)}{sum_{x'}eta(x')}sum_a abla au_{ heta}(avert x)Q^{ au}(x,a)\ &=E^{ au_ heta}[ abla au_{ heta}(avert x)Q^{ au}(x,a)]\ &=E^{ au_ heta}[Q^{ au}(S_t,A_t) ablalog( au(A_tvert S_t))] end{align*} ]

AC

We can using a network to estimate Q function and the actor network to learn ( heta).

from collections import deque
import random

import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers import Adam
physical_devices = tf.config.experimental.list_physical_devices('GPU')

assert len(physical_devices) > 0, "Not enough GPU hardware devices available"

tf.config.experimental.set_memory_growth(physical_devices[0], True)

import gym
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('--gamma', type=float, default=0.99)
parser.add_argument('--update_interval', type=int, default=5)
parser.add_argument('--actor_lr', type=float, default=0.0005)
parser.add_argument('--critic_lr', type=float, default=0.001)

args = parser.parse_args()
class Actor:
    def __init__(self, state_dim, action_dim):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.opt = Adam(args.actor_lr)
        self.model = self.create_model()

    def create_model(self):
        return tf.keras.Sequential([
            Input((self.state_dim, )),
            Dense(32, activation='relu'),
            Dense(16, activation='relu'),
            Dense(self.action_dim, activation='softmax')
        ])

    def compute_loss(self, actions, logits, advantages):
        ce_loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
        actions = tf.cast(actions, tf.int32)
        policy_loss = ce_loss(actions, logits, sample_weight=tf.stop_gradient(advantages))
        return policy_loss

    def train(self, states, actions, advantages):
        with tf.GradientTape() as tape:
            logits = self.model(states, training=True)
            loss = self.compute_loss(actions, logits, advantages)
        grads = tape.gradient(loss, self.model.trainable_variables)
        self.opt.apply_gradients(zip(grads, self.model.trainable_variables))
        return loss

class Critic:
    def __init__(self, state_dim, action_dim):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.opt = Adam(args.critic_lr)
        self.model = self.create_model()

    def create_model(self):
        return tf.keras.Sequential([
            Input((self.state_dim,)),
            Dense(32, activation='relu'),
            Dense(16, activation='relu'),
            Dense(16, activation='relu'),
            Dense(1, activation='linear')
        ])

    def compute_loss(self, v_pred, td_targets):
        mse = tf.keras.losses.MeanSquaredError()
        return mse(td_targets, v_pred)

    def train(self, states, td_targets):
        with tf.GradientTape() as tape:
            v_pred = self.model(states, training=True)
            assert v_pred.shape == td_targets.shape
            loss = self.compute_loss(v_pred, tf.stop_gradient(td_targets))
        grads = tape.gradient(loss, self.model.trainable_variables)
        self.opt.apply_gradients(zip(grads, self.model.trainable_variables))
        return loss

class Agent:
    def __init__(self, env):
        self.env = env
        self.state_dim = env.observation_space.shape[0]
        self.action_dim = env.action_space.n
        self.actor = Actor(self.state_dim, self.action_dim)
        self.critic = Critic(self.state_dim, self.action_dim)

    def td_target(self, reward, next_state, done):
        if done:
            return reward
        v_value = self.critic.model.predict(
            np.reshape(next_state, [1, self.state_dim]))
        return np.reshape(reward + args.gamma * v_value[0], [1, 1])

    def advantage(self, td_targets, baselines):
        return td_targets - baselines

    def list_to_batch(self, list):
        batch = list[0]
        for elem in list[1:]:
            batch = np.append(batch, elem, axis=0)
        return batch

    def train(self, max_episodes=1000):
        for ep in range(max_episodes):
            state_batch = []
            action_batch = []
            td_target_batch = []
            advatnage_batch = []
            episode_reward, done = 0, False

            state = self.env.reset()

            while not done:
                # self.env.render()
                probs = self.actor.model.predict(
                    np.reshape(state, [1, self.state_dim]))
                action = np.random.choice(self.action_dim, p=probs[0])  # choice action according to policy

                next_state, reward, done, _ = self.env.step(action)

                state = np.reshape(state, [1, self.state_dim])
                action = np.reshape(action, [1, 1])
                next_state = np.reshape(next_state, [1, self.state_dim])
                reward = np.reshape(reward, [1, 1])

                td_target = self.td_target(reward * 0.01, next_state, done)
                advantage = self.advantage(
                    td_target, self.critic.model.predict(state))

                state_batch.append(state)
                action_batch.append(action)
                td_target_batch.append(td_target)
                advatnage_batch.append(advantage)

                if len(state_batch) >= args.update_interval or done:
                    states = self.list_to_batch(state_batch)
                    actions = self.list_to_batch(action_batch)
                    td_targets = self.list_to_batch(td_target_batch)
                    advantages = self.list_to_batch(advatnage_batch)

                    actor_loss = self.actor.train(states, actions, advantages)
                    critic_loss = self.critic.train(states, td_targets)

                    state_batch = []
                    action_batch = []
                    td_target_batch = []
                    advatnage_batch = []

                episode_reward += reward[0][0]
                state = next_state[0]

            print('EP{} EpisodeReward={}'.format(ep, episode_reward))

def main():
    env_name = 'CartPole-v1'
    env = gym.make(env_name)
    agent = Agent(env)
    agent.train()


if __name__ == "__main__":
    main()
原文地址:https://www.cnblogs.com/DemonHunter/p/13463727.html