for Robot Artificial Inteligence

20. RBF Neural Networks

|

RBF Networks

  • RBF = Radial Basis Function
  • Useful in RL
  • 2 perspectives
    • Linear model with feature extraction, where the feature extraction is RBF kernel
    • 1-hidden layer neural network, with RBF kernel as activation function
  • when we first learned about neural networks, we learned these in reverse order
  • we first learned that a neural network is a nonlinear function approximator
  • later, we saw that hidden units happen to learn features

RBF Basis Function

  • is a non-normalized Gaussian
  • x = input vector
  • c = center / exemplar(模范) vector
  • Only Repends on distance between x and c, not direction, hence the term radial
  • Max is 1, when x == c, approaches 0 as x goes further away from c

How do we choose c?

  • how many c’s should we choose?
  • number of center / exemplar == number of hidden units in the RBF Network
  • Each unit will have a different center
  • A few different ways to choose them

Support Vecotr Machines

  • SVMs also use RBF Kernels
  • # of exemplars == # of training points
  • in fact, with SVMs the exemplars are the training points
  • this is why SVMs have fallen out of favour
  • Training becomes O(N^2), prediction is O(N), N = # Training samples
  • important piece of deep learning history
    • SVMs were once though to be superior

      Another Methods

  • Just sample a few points from the state space
  • can then choose the # of exemplars
  • env.observation_sapce.sample()
  • how many exemplars we choose is like how many hidden units in a neural network - it’s hyperparameter that must be tuned

Implementation

  • we’ll make use of sci-kit learn
  • our own difect from-definition implementation would be unnecessarily slow
  • RBFsampler uses a Monte Carlo algorithms(MC)
from sklearn.kernel_approximation import RBFsampler
  • Standard Interface
sampler = RBFSampler()
sampler.fit(radw_data)
features = sampler.transform(raw_data)

The rest we have done before

  • now that we know how to transform the raw state, the rest we should be familiar with: Q-learning, linear function approximation with gradient descent
  • Unlike a feedforward neural network, the features won’t change as we learn
  • exemplar we choose at the beginning will remain forever
  • May seem restrictive, but works better than feedforward NN

Old Perspective vs New Perspective

  • we used linear functions with polynomial features before
  • now we use RBF Kernel for features
  • the other Perspective: 1-hidden layer neural network
  • remember that in general, this is a nonlinear transformation -> linear model at the final layer
  • Recall: dot product is a cosine distance: $a^T$b = IaI IbI cos(angle(a,b))

Implementation Details

  • Scale parameter(aka. Variance)
  • We don’t know what’s good
  • Perhaps multiple are good
  • sci-kit learn has facilities that allow us to use multiple RBF samplers simultaneously
from sklearn.pipeline import FeatureUnion
  • Can concatenate(连在一起的) any features, not just those from RBFSampler
  • Standardize our data too:
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import SGDRegressor
# Functions:
partial_fit(X,y) # one step of gradient descent
Predict(X)
  • SGDRegressor behaves a little strangely
  • partial_fit() must be called at least once before we do any prediction
  • Prediction must come before any real fitting, b/c we are using Q-Learning(where we need the max over Q(s,a))
  • so we’ll start by calling partial_fit with dummy values
input = transform(env.reset(), target = 0)
model.partial_fit(input,target)
  • after calling partial_fit(SGD) with target 0, it will make all predictions for awhile
  • This is weird - a linear model shouldn’t behave this way(it may not be purely linear model)
  • this quirk(怪异的性格(或行为)) is useful
  • for our next task, mountain car, all rewards are -1
  • therefore, Q prediction of 0 is higher than anything we can actually get
  • this is the optimistic initial value method
  • Technically don’t need epsilon-greedy

Prove it to ourselves

from sklearn.linear_model import SGDRegressor
model = SGDRegressor()
model.partial_fit([0,0],[0])
model.predict([[0,0]])
# array([0.]) - make sense
model.predict([0,1])
# array([0.]) - huh?
model.predict([1,0])
# array([0.]) - huh?
...
model.predict([1,1])
# array([0.]) - huh?

model.predict([99,99])
# array([0.]) - huh?

One model per action

  • Another implementation detail used by Deep Q Learning too
  • instead of x <- transform(s,a)
  • we’ll used x <- transform(s)
  • since actions are discrete, we can have a different Q(s) for every a(action)
  • For mountain Car, 3 actions: left, right, nothing
  • Neural Network with 3 output nodes

mountain Car

  • https://github.com/openai/gym/wiki/mountaincar-v0
  • https://github.com/openai/envs/mountaincar-v0

Cost-to-go Function

  • Is the negative of optimal value funtion V*(s)
  • what they call it in sutton & barto
  • 2 state variables -> 3-D plot

Import Library

# https://deeplearningcourses.com/c/deep-reinforcement-learning-in-python
# https://www.udemy.com/deep-reinforcement-learning-in-python
from __future__ import print_function, division
from builtins import range
# Note: you may need to update your version of future
# sudo pip install -U future
#
# This takes 4min 30s to run in Python 2.7
# But only 1min 30s to run in Python 3.5!
#
# Note: gym changed from version 0.7.3 to 0.8.0
# MountainCar episode length is capped at 200 in later versions.
# This means your agent can't learn as much in the earlier episodes
# since they are no longer as long.

import gym
import os
import sys
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from gym import wrappers
from datetime import datetime
from sklearn.pipeline import FeatureUnion
from sklearn.preprocessing import StandardScaler
from sklearn.kernel_approximation import RBFSampler
from sklearn.linear_model import SGDRegressor

# SGDRegressor defaults:
# loss='squared_loss', penalty='l2', alpha=0.0001,
# l1_ratio=0.15, fit_intercept=True, n_iter=5, shuffle=True,
# verbose=0, epsilon=0.1, random_state=None, learning_rate='invscaling',
# eta0=0.01, power_t=0.25, warm_start=False, average=False

Feature Transformer

# Inspired by https://github.com/dennybritz/reinforcement-learning
class FeatureTransformer:
  def __init__(self, env, n_components=500):
    observation_examples = np.array([env.observation_space.sample() for x in range(10000)])
    # 4개의 밸류값의 리스트가 만개가 생긴다 in observation_examples에
    scaler = StandardScaler()
    scaler.fit(observation_examples)

    # Used to converte a state to a featurizes represenation.
    # We use RBF kernels with different variances to cover different parts of the space
    featurizer = FeatureUnion([
            ("rbf1", RBFSampler(gamma=5.0, n_components=n_components)),
            ("rbf2", RBFSampler(gamma=2.0, n_components=n_components)),
            ("rbf3", RBFSampler(gamma=1.0, n_components=n_components)),
            ("rbf4", RBFSampler(gamma=0.5, n_components=n_components))
            ])
    example_features = featurizer.fit_transform(scaler.transform(observation_examples))

    self.dimensions = example_features.shape[1]
    self.scaler = scaler
    self.featurizer = featurizer

    def transform(self, observations):
      # print "observations:", observations
      scaled = self.scaler.transform(observations)
      # assert(len(scaled.shape) == 2)
      return self.featurizer.transform(scaled)    

Model

# Holds one SGDRegressor for each action
class Model:
  def __init__(self, env, feature_transformer, learning_rate):
    self.env = env
    self.models = []
    self.feature_transformer = feature_transformer
    for i in range(env.action_space.n):
      model = SGDRegressor(learning_rate=learning_rate)
      model.partial_fit(feature_transformer.transform( [env.reset()] ), [0])
      self.models.append(model)

  def predict(self, s):
    X = self.feature_transformer.transform([s])
    result = np.stack([m.predict(X) for m in self.models]).T
    assert(len(result.shape) == 2)
    return result

  def update(self, s, a, G):
    X = self.feature_transformer.transform([s])
    assert(len(X.shape) == 2)
    self.models[a].partial_fit(X, [G])

  def sample_action(self, s, eps):
    # eps = 0
    # Technically, we don't need to do epsilon-greedy
    # because SGDRegressor predicts 0 for all states
    # until they are updated. This works as the
    # "Optimistic Initial Values" method, since all
    # the rewards for Mountain Car are -1.
    if np.random.random() < eps:
      return self.env.action_space.sample()
    else:
      return np.argmax(self.predict(s))

play


def play_one(model, env, eps, gamma):
  observation = env.reset()
  done = False
  totalreward = 0
  iters = 0
  while not done and iters < 10000:
    action = model.sample_action(observation, eps)
    prev_observation = observation
    observation, reward, done, info = env.step(action)

    # update the model
    next = model.predict(observation)
    # assert(next.shape == (1, env.action_space.n))
    G = reward + gamma*np.max(next[0])
    model.update(prev_observation, action, G)

    totalreward += reward
    iters += 1

  return totalreward

Plot_cost_to_go

def plot_cost_to_go(env, estimator, num_tiles=20):
  x = np.linspace(env.observation_space.low[0], env.observation_space.high[0], num=num_tiles)
  y = np.linspace(env.observation_space.low[1], env.observation_space.high[1], num=num_tiles)
  X, Y = np.meshgrid(x, y)
  # both X and Y will be of shape (num_tiles, num_tiles)
  Z = np.apply_along_axis(lambda _: -np.max(estimator.predict(_)), 2, np.dstack([X, Y]))
  # Z will also be of shape (num_tiles, num_tiles)

  fig = plt.figure(figsize=(10, 5))
  ax = fig.add_subplot(111, projection='3d')
  surf = ax.plot_surface(X, Y, Z,
    rstride=1, cstride=1, cmap=matplotlib.cm.coolwarm, vmin=-1.0, vmax=1.0)
  ax.set_xlabel('Position')
  ax.set_ylabel('Velocity')
  ax.set_zlabel('Cost-To-Go == -V(s)')
  ax.set_title("Cost-To-Go Function")
  fig.colorbar(surf)
  plt.show()

Plot_running_avg

def plot_running_avg(totalrewards):
  N = len(totalrewards)
  running_avg = np.empty(N)
  for t in range(N):
    running_avg[t] = totalrewards[max(0, t-100):(t+1)].mean()
  plt.plot(running_avg)
  plt.title("Running Average")
  plt.show()

Main


def main(show_plots=True):
  env = gym.make('MountainCar-v0')
  ft = FeatureTransformer(env)
  model = Model(env, ft, "constant")
  gamma = 0.99

  if 'monitor' in sys.argv:
    filename = os.path.basename(__file__).split('.')[0]
    monitor_dir = './' + filename + '_' + str(datetime.now())
    env = wrappers.Monitor(env, monitor_dir)


  N = 300
  totalrewards = np.empty(N)
  for n in range(N):
    # eps = 1.0/(0.1*n+1)
    eps = 0.1*(0.97**n)
    if n == 199:
      print("eps:", eps)
    # eps = 1.0/np.sqrt(n+1)
    totalreward = play_one(model, env, eps, gamma)
    totalrewards[n] = totalreward
    if (n + 1) % 100 == 0:
      print("episode:", n, "total reward:", totalreward)
  print("avg reward for last 100 episodes:", totalrewards[-100:].mean())
  print("total steps:", -totalrewards.sum())

  if show_plots:
    plt.plot(totalrewards)
    plt.title("Rewards")
    plt.show()

    plot_running_avg(totalrewards)

    # plot the optimal state-value function
    plot_cost_to_go(env, model)


if __name__ == '__main__':
  # for i in range(10):
  #   main(show_plots=False)
  main()
episode: 99 total reward: -113.0
eps: 0.00023311762989647067
episode: 199 total reward: -183.0
episode: 299 total reward: -96.0
avg reward for last 100 episodes: -110.08
total steps: 40586.0

Reference:

Artificial Intelligence Reinforcement Learning

Advance AI : Deep-Reinforcement Learning

Cutting-Edge Deep-Reinforcement Learning

Comments