import gym import numpy as np class GameControl: def __init__(self, agent): self.agent = agent self.dynInfo = DynamicInfo() self.printedLeftArrow = False self.score = (0, 0) self.shouldPrintTraceLine = False self.stepCount = 0 def getAction(self, reward): # Use the first 25 steps to find the upper and lower field bounds. # (The puck and the other player don't appear until about step 45.) inp = (2 if self.stepCount <= 10 else 3 if self.stepCount <= 24 else self.agent.makeMove(self.stepCount, self.dynInfo, reward) ) # The only valid actions are 0 .. 6. # They mean: nop, nop, up, down, up down. action = int(inp) if str(inp) in '012345' and inp != '' else 0 return action def play(self, game, renderMode='human'): env = gym.make(game) (obs, reward) = (env.reset(), 0) self.stepCount = 0 done = False while not done: self.stepCount += 1 if self.stepCount%2 == 0: self.dynInfo.extractInfo(obs) action = self.getAction(reward) self.printTraceLine1() else: action = 0 (obs, reward, done, debug) = env.step(action) if self.stepCount%2 == 0 or reward != 0: self.printTraceLine2(action, reward) env.render(renderMode) print(input('')) env.close() # https://github.com/openai/gym/issues/893 def printTraceLine1(self): self.shouldPrintTraceLine = ( self.dynInfo.puckDirection == self.dynInfo.east and (self.dynInfo.puck is None or self.dynInfo.paddle is None or self.dynInfo.puck[1] <= self.dynInfo.paddle[1] ) ) if self.stepCount > 45: if self.shouldPrintTraceLine: print(f'{self.dynInfo.str()} >> ', end='') self.printedLeftArrow = False elif self.dynInfo.puckDirection == self.dynInfo.west and not self.printedLeftArrow: print('\n\t\t\t\t <===\n') self.printedLeftArrow = True def printTraceLine2(self, action, reward): if self.stepCount > 45 and self.shouldPrintTraceLine: move = {0: "-", 2: "Up", 3: "Down"}[action] print(move) if reward != 0: (computer, player) = self.score pointWinner = {-1: 'computer', 1: 'agent'}[reward] (computer, player) = self.score = ( {'computer': (computer + 1, player), 'agent': (computer, player + 1)}[pointWinner] ) tag = '' if computer < 21 and player < 21 else '\n\t\tThe {pointWinner} won the game. Take a picture?' print(f'\n\t\tThe {pointWinner} won a point. Computer: {computer}, Agent: {player}. {tag}\n') class DynamicInfo(): def __init__(self): self.blocks = None self.yTop = None self.yBottom = None self.midpoint = None self.puckHalfHeight = 1 self.paddleHalfHeight = 7 self.puck = None self.puckSlopeList = [] self.puckSlope = None self.puckDirection = None self.puckIntercept = None self.paddle = None self.paddleVelocity = None self.paddleTarget = None self.east = '-->' self.west = '<--' self.NoneStr = '**' def computePaddleTarget(self): paddle = self.blocks[-1] # Care only about the right-most paddle col. Increases chance of hitting puck with top # or bottom of paddle. Those create high-angle shots, which often win points. # Also, creates margin of error for intercept on defense. paddleRightCol = paddle[1][1] paddleMeanRow = self.mean(paddle[0]) # self.paddle will be None initially. self.paddleVelocity = None if self.paddle is None else round(paddleMeanRow - self.paddle[0], 1) self.paddle = (paddleMeanRow, paddleRightCol) self.paddleTarget = None if self.paddleVelocity is None else paddleMeanRow + self.paddleVelocity def computePuckIntercept(self): newPuckBlock = self.blocks[1] # We only care about the left puck column. (Same reasoning as rightmost passdle col.) newPuckLeftCol = newPuckBlock[1][0] newPuckMeanRow = self.mean(newPuckBlock[0]) newPuck = (newPuckMeanRow, newPuckLeftCol) self.puckSlope = None if self.puck is not None: self.puckDirection = self.east if newPuck[1] > self.puck[1] else self.west divisor = newPuck[1] - self.puck[1] slope = 0 if divisor == 0 else (newPuck[0] - self.puck[0]) / divisor # self.puckSlopeList.append(int(round(slope, 1))) self.puckSlopeList.append(round(slope, 1)) # Keep the two most recent slopes. (The '-' in front of min is negative indexing.) self.puckSlopeList = self.puckSlopeList[-min(3, len(self.puckSlopeList)):] # If the two most recent slopes are the same, take that as the slope. if len(self.puckSlopeList) == 3 and self.puckSlopeList[0] == self.puckSlopeList[1] == self.puckSlopeList[2]: self.puckSlope = self.puckSlopeList[0] self.puck = newPuck # Compute the projected intercept of the puck with the paddle column. if self.puckSlope is not None and self.puckDirection == self.east: distance = self.paddle[1] - self.puck[1] self.puckIntercept = self.puck[0] + distance * self.puckSlope # If the intercept is above or below the frame of the board, fold it over. # Need three comparisons to allow for hitting both top and bottom. # This system counts rows from top down. yTop is smaller than yBottom. if self.puckIntercept < self.yTop: self.puckIntercept += 2 * (self.yTop - self.puckIntercept) if self.puckIntercept > self.yBottom: self.puckIntercept -= 2 * (self.puckIntercept - self.yBottom) if self.puckIntercept < self.yTop: self.puckIntercept += 2 * (self.yTop - self.puckIntercept) def extractInfo(self, obs): self.blocks = self.getBlocks(obs) # Find the topmost and bottommost rows, i.e., the rows that frame the field. # Since this system counts rows downward, the topmost/bottommost row is the # row with lowest/highest number. blocksRows = [row for (row, _) in self.blocks] blockYTop = min([rowTop for (rowTop, _) in blocksRows]) - 1 blockYBottom = max([rowBottom for (_, rowBottom) in blocksRows]) + 1 self.yTop = blockYTop if self.yTop is None else min(self.yTop, blockYTop) self.yBottom = blockYBottom if self.yBottom is None else max(self.yBottom, blockYBottom) self.midpoint = self.yTop*0.5 + self.yBottom*0.5 if (len(self.blocks)) == 3: self.computePuckIntercept() self.computePaddleTarget() else: self.puckIntercept = None self.puck = None self.puckSlopeList = [] # return self def getBlocks(self, obs): """ Returns a list of blocks. Each block is ((row_top, row_bottom), (col_left, col_right)). :param obs: An observation :return: [((int, int), (int, int))] """ # Drop the top 30 rows and the leftmost 15 columns obs = np.delete(obs[30:], list(range(15)), 1) (rows, cols, _) = np.shape(obs) # Build a dictionary. They keys are cell colors. The values are the cells with those colors, partitions = self.partition(obs, rows, cols) # Convert the paritions dictionary into a dictionary with colors as keys. (Same as partitions.) # But now the values are ((rowMin, rowMax), (colMin, colMax)) for that color. colorDict = {color: (self.minAndMax([row for (row, _) in partitions[color]]), self.minAndMax([col for (_, col) in partitions[color]])) for color in partitions} # print(colorDict) # The blocks are the values of the colorDict dictionary. blocks = colorDict.values() # Return the blocks sorted by column, i.e., from left to right on the screen. # Recall that rc is ((rowMin, rowMax), (colMin, colMax)). So rc[1] is (colMin, colMax) sortedBlocks = sorted(blocks, key=(lambda rc: rc[1])) if len(sortedBlocks) == 3: # sb0 = sortedBlocks.copy() # Sort the two rightmost blocks by height. This covers the case # in which the puck has passed the left edge of the paddle sortedBlocks = sortedBlocks[0:1] + sorted(sortedBlocks[1:], key=lambda rc: rc[0][1] - rc[0][0]) # if sortedBlocks != sb0: # print('\n\n') # print(sb0) # print(sortedBlocks) # print('\n') return sortedBlocks @staticmethod def mean(elts: tuple) -> float: return round(sum(elts) / len(elts), 1) @staticmethod def minAndMax(lst): return (min(lst), max(lst)) if lst else None def paddleTargetBottom(self): return None if self.paddleTarget is None else self.paddleTarget + self.paddleHalfHeight def paddleTargetTop(self): return None if self.paddleTarget is None else self.paddleTarget - self.paddleHalfHeight @staticmethod def partition(obs, rows, cols): """ Partition an np.array into lists, one for each element value. :param obs: An observation :param rows: The number of rows in the observation :param cols: The number of cols in the observation :return: A dictionary. The keys are the individual values in obs. The values are the cells with that value. """ components = {} for r in range(rows): # Use the first element of a row as the background to compare against. bkgrnd = tuple(obs[r, 0]) for c in range(cols): # Must transform the contents of the cells from lists to tuples. Lists may not # be used as dictionary keys. Tuples may. cell = tuple(obs[r, c]) # Ignore background cells. if cell != bkgrnd: components.setdefault(cell, []).append((r, c)) return components def puckInterceptBottom(self): return None if self.puckIntercept is None else self.puckIntercept + self.puckHalfHeight def puckInterceptTop(self): return None if self.puckIntercept is None else self.puckIntercept - self.puckHalfHeight def separation(self): return None if self.paddle is None or self.puck is None else self.paddle[1]-self.puck[1] def separationStr(self): if self.paddle is None or self.puck is None: return '' sep = self.paddle[1]-self.puck[1] halfSep = '-'*int(round((sep/20))) sepStr = f'|{halfSep}{sep}{halfSep}|' return sepStr def str(self): """ A string representation of the important features of the world. :return: """ st0a = self.toStr(None if self.puck is None else self.puck[0]) st0b = self.toStr(self.puckSlope) st0c = self.toStr(self.puckDirection) st1 = f'puck: [{(st0a)}] {st0b} {st0c} ' st2 = f'{self.toStr(self.puckInterceptTop())} - {self.toStr(self.puckInterceptBottom())}; ' st3 = f'paddle: [{self.paddle[0]}] {self.paddleVelocity} --> ' st4 = f'{self.paddleTargetTop()} - {self.paddleTargetBottom()}; ' st5 = self.separationStr() st = st1 + st2 + st3 + st4 + st5 return st def toStr(self, x): """ :param x: Any value :return: the NoneStr ('**') if x is None; otherwise str(x) """ return self.NoneStr if x is None else str(x) class Agent: """ The generic Agent player """ def __init__(self): self.Down = 3 self.moveFrequency = 3 self.NoMove = 0 self.paddle = None self.paddleTargetTop = None self.paddleTargetBottom = None self.puckInterceptTop = None self.puckInterceptBottom = None self.Up = 2 def cacheInfo(self, dynInfo): # Cache these values in case we lose track of the puck or the paddle. if dynInfo.puckInterceptTop() is not None: self.puckInterceptTop = dynInfo.puckInterceptTop() if dynInfo.puckInterceptBottom() is not None: self.puckInterceptBottom = dynInfo.puckInterceptBottom() if dynInfo.paddleTargetTop() is not None: self.paddleTargetTop = dynInfo.paddleTargetTop() if dynInfo.paddleTargetBottom() is not None: self.paddleTargetBottom = dynInfo.paddleTargetBottom() if dynInfo.paddle[0] is not None: self.paddle = dynInfo.paddle[0] class Bot(Agent): """ A Bot player """ def makeMove(self, stepCount, dynInfo, reward): self.cacheInfo(dynInfo) # Always leave at least one step between moves. if stepCount % 2 != 0: return self.NoMove # If it's not our turn and paddle is both moving and far away, do nothing. if (stepCount % self.moveFrequency != 0 and dynInfo.paddleVelocity != 0 and dynInfo.separation() is not None and dynInfo.separation() > 50): return self.NoMove # If puck is not moving east, go to center (more or less). if None in [self.paddleTargetTop, self.paddleTargetBottom]: return self.NoMove if dynInfo.puckDirection != dynInfo.east: return (self.Up if self.paddleTargetTop > dynInfo.yTop*0.45 + dynInfo.yBottom*0.55 else self.Down if self.paddleTargetBottom < dynInfo.yTop*0.55 + dynInfo.yBottom*0.45 else self.NoMove ) # These are the strategic moves. if None in [self.puckInterceptTop, self.puckInterceptBottom]: return self.NoMove return (self.Up if self.paddleTargetTop > self.puckInterceptBottom else self.Down if self.paddleTargetBottom < self.puckInterceptTop else self.Up if self.paddleTargetBottom > dynInfo.yBottom + dynInfo.paddleHalfHeight/2 else self.Down if self.paddleTargetTop < dynInfo.yTop - dynInfo.paddleHalfHeight/2 else self.NoMove ) class QBot(Agent): """ A Q-learner Bot """ def makeMove(self, stepCount, dynInfo, reward): self.cacheInfo(dynInfo) return self.NoMove class Human(Agent): """ The Human player """ def makeMove(self, stepCount, dynInfo, reward): self.cacheInfo(dynInfo) return input(f'{stepCount}. >> ') if __name__ == '__main__': # To have the bot play, use Bot() as argument. # To have a human play, use Human() as argument. # For a q-learning bot, use QBot() as argument. GameControl(Bot()).play('Pong-v0')