import gym import numpy as np import matplotlib.pyplot as plt import random def getStateAction(pos, vel, ang, rot, strategyID): """ The input are the observation components. The output is (state, action). This depends on the strategy strategyID state is relevant when we are running a QBot. It reflects a "feature analysis." action is relevant when we are running a hard-wired strategy. We use one or the other, never both. :param pos: an observation component :param vel: an observation component :param ang: an observation component :param rot: an observation component :param strategyID: strategy id. :return: (state, action) """ # The first four focus on rot, the angular rotation. if strategyID == 1: return 1 if rot > 0 else \ 0 if strategyID == 2: return 1 if rot > 1.5 else \ 0 if rot < -1.5 else \ 1 if ang > 0 else \ 0 if strategyID == 3: return ('s1-1', 1) if rot > 1.5 else \ ('s2-0', 0) if rot < -1.5 else \ ('s3-0', 0) if vel > 1.5 else \ ('s4-1', 1) if vel < -1.5 else \ ('s5-1', 1) if ang > 0 else \ ('s6-0', 0) if strategyID == 4: return 1 if rot > 1.5 else \ 0 if rot < -1.5 else \ 1 if ang > 0.01 else \ 0 if ang < -0.01 else \ 0 if pos > 0 else \ 1 # This one gives equal weight to rotation (rot) and velocity (vel) if strategyID == 5: return ('s1-1', 1) if rot > 1.5 or vel < -1.5 else \ ('s2-0', 0) if rot < -1.5 or vel > 1.5 else \ ('s3-1', 1) if ang > 0 else \ ('s4-0', 0) # The next four focus on the velocity, vel if strategyID == 6: return 1 if vel < -0.9 else \ 0 if vel > 0.9 else \ 1 if ang > 0 else \ 0 if strategyID == 7: return 1 if vel < -0.9 else \ 0 if vel > 0.9 else \ 1 if rot > 0.06 else \ 0 if rot < -0.06 else \ 1 if ang > 0.0 else \ 0 if strategyID == 8: return 1 if vel < -0.9 else \ 0 if vel > 0.9 else \ 1 if rot > 0.06 else \ 0 if rot < -0.06 else \ 1 if ang > 0.005 else \ 0 if ang < -0.005 else \ 1 if pos < 0 else \ 0 if strategyID == 9: return 1 if vel < -0.9 else \ 0 if vel > 0.9 else \ 1 if ang > 0.01 else \ 0 if ang < -0.01 else \ 1 if rot > 0.01 else \ 0 if rot < -0.01 else \ 1 if pos < 0 else \ 0 if strategyID == 10: # parameters = [0.05, -0.5, 0.5, 0.1] prod = sum(a*b for (a, b) in zip([0.05, -0.5, 0.5, 0.1], [pos, vel, ang, rot])) return 0 if prod < 0 else 1 def maxWithDefault(lst, default=0): return max(lst) if lst else default def plot_results(results_list): # Make the y-axis start at 0. (There is probably a better way.) plt.plot([0], 'r') plt.plot(results_list) plt.ylabel('Episodes until success') plt.xlabel('Trial') plt.show() def print_plot_results(results, max_episodes, n_trials): print(f'\nmax episodes: {max_episodes}; avg episodes: {np.round(np.sum(results) / n_trials, 1)}') plot_results(results) # plt.hist(results, bins=50, color='g', density=1, alpha=0.75) # plt.xlabel('Episodes required to reach 200') # plt.ylabel('Frequency') # plt.title('Histogram of Custom-coded solution') # plt.show() def roundDictValues(d, p=1): return {k: round(v, p) for (k, v) in d.items()} def weightedAvg(low, weight, high): """ :param low: The value to return if weight == 0 :param weight: a number in 0 .. 1 :param high: The value to return if weight == 1 :return: low*(1-weight) + high*weight """ return low*(1-weight) + high*weight #################################################################################### #################################################################################### #################################################################################### class CartPole_Q_Learning: def __init__(self, strategyID=3, isQBot=False, testTrials=5, envName='CartPole-v0', render=True): self.strategyID = strategyID self.converged = not isQBot self.env = gym.make(envName) self.gamma = 0.99 self.isQBot = isQBot self.possibleActions = [0, 1] self.prevAction = None self.prevState = None self.prevQValues = None # the qValues are {state: (0, {}), state: (0, {}), ...), where # each state is associated with its current estimated value along with # a dictionary of possible actions. # The action dictionary consists of a dictionary of {action: {}}, where # each action is associated with a dictionary of the resulting states # along with a count of the number of times that state occurred. self.qValues = {} self.render = render self.stepsCycle = 1000 self.testTrials = testTrials self.trainingStepLimit = 5000 self.trainingSteps = 0 self.trialType = None def alphaFn(self): # Note that 1000 / (1000 + 1000) => 0.5 # Note that 1000 / (2000 + 1000) => 0.33 # Note that 1000 / (3000 + 1000) => 0.25 n = 5000 return n / (self.trainingSteps + n) def bestAction(self, actionValueDict): return random.choice(self.possibleActions) if not actionValueDict else \ max(actionValueDict, key=lambda action: actionValueDict[action]) def epsilonFn(self): # Note that 0.75 * 0.99 ** 100 => 0.27 # Note that 0.75 * 0.99 ** 200 => 0.1 # 0.75 * 0.99**self.trainingSteps return 0 if self.trialType == 'test' else 1.0 def getActionFromState(self, state): return random.choice(self.possibleActions) \ if self.epsilonFn() > random.random() else \ self.bestAction(self.qValues.setdefault(state, {})) def getActionValues(self, state): return self.qValues.setdefault(state, {}) def main(self): # If we are running a QBot, first do training. if self.isQBot: trial_nbr = 0 self.trialType = "training" trialsNeeded = 100 while not self.converged and \ self.trainingSteps < self.trainingStepLimit or \ trialsNeeded == 1: trial_nbr += 1 trialsNeeded = self.runTrial(trial_nbr) self.trialType = "test" if self.isQBot: print('\n\nFinished training. Setting epsilon to 0.') self.qValues = {'s1-1': {0:0, 1:1}, 's2-0': {0:1, 1:0}, 's3-1': {0:0, 1:1}, 's4-0': {0:1, 1:0} } self.printQvalues() for trial_nbr in range(self.testTrials): self.runTrial(trial_nbr) def maxQValue(self, state): actionValues = self.getActionValues(state) return 0 if not actionValues else max(actionValues.values()) def perhapsPrintQValues(self): if (self.prevQValues is None or len(self.qValues) > len(self.prevQValues) or self.trainingSteps % self.stepsCycle == 0) and \ self.epsilonFn() > 0: roundedQValues = {state: roundDictValues(actionValues, 1) for (state, actionValues) in self.qValues.items()} if self.prevQValues is None or self.prevQValues != roundedQValues: self.prevQValues = roundedQValues self.printQvalues() def printQvalues(self): print(f'\n\nAfter {self.trainingSteps} training steps. ', end='') print(f'epsilon: {round(self.epsilonFn(), 2)}, alpha: {round(self.alphaFn(), 2)}') for state in sorted(self.qValues): print(state, end=' -> ') actionQValues = self.qValues[state] for action in reversed(sorted(actionQValues, key=lambda a: actionQValues[a])): print(f'{action}: {round(actionQValues[action], 3)}', end='; ') print() print() def runEpisode(self): """ An episode starts with env.reset() and ends either after 200 steps or when pole goes out of bounds. :return: whether the episode succeeded """ observation = self.env.reset() (currentState, action) = getStateAction(*observation, self.strategyID) episodeStepCount = 0 while True: # observation = a list of four numbers (not a tuple) # [cart position, cart velocity, pole angle, pole rotation rate] if self.isQBot: action = self.getActionFromState(currentState) # print(f'episodeStepCount: {episodeStepCount}. {action}') (observation, reward, done, debug) = self.env.step(action) (nextState, nextAction) = getStateAction(*observation, self.strategyID) if self.isQBot and not self.converged: self.updateQValue(currentState, action, reward, nextState) if self.render: self.env.render() episodeStepCount += 1 self.trainingSteps += 1 # env returns done when either the pole goes out of bounds or 200 steps have been taken if done: return episodeStepCount == 200 (currentState, action) = (nextState, nextAction) def runTrial(self, trial_nbr): """ A trial is a series of episodes until an episode succeeds, i.e., 200 steps without exceeding limits. :param trialType: "train" or test :param trial_nbr: :return: The number of episodes required to succeed. """ episodeNbr = 0 while self.trialType == 'test' or self.trainingSteps < self.trainingStepLimit: episodeNbr += 1 if self.runEpisode(): print(f'{self.trialType}: {trial_nbr + 1}; episodes: {episodeNbr}') return episodeNbr return episodeNbr def updateQValue(self, state, action, reward, nextState): """ We have just taken prevAction in prevState. We got reward and arrived at observation. Now that we know the reward and nextState resulting from taking prevAction from prevState, update qValues[prevState][prevAction] (in place) using the Bellman equation. :param state: The state from which we did a transition. :param action: The action we took in state. :param reward: The reward resulting from taking action in state :param nextState: The the state we arrived at after taking action in state :return: None. qValues[prevState][prevAction] is updated in place """ # This is a dictionary of actions and their values for this state actionValues = self.getActionValues(state) newValue = reward + self.gamma * self.maxQValue(nextState) actionValues[action] = weightedAvg(actionValues.get(action, 0), self.alphaFn(), newValue) self.perhapsPrintQValues() if __name__ == '__main__': CartPole_Q_Learning(strategyID=5, isQBot=True).main()