import gym import numpy as np class GameControl: def __init__(self, game, agent, renderMode='human'): self.game = game self.env = gym.make(self.game) self.agent = agent self.dynInfo = DynamicInfo() self.score = (0, 0) self.renderMode = renderMode self.play(self.env) def play(self, env): (obs, reward) = (self.env.reset(), 0) stepCount = 0 done = False printedLeftArrow = False while not done: stepCount += 1 self.dynInfo.extractInfo(obs) shouldPrintTraceLine = self.dynInfo.puckDirection == self.dynInfo.east and \ (self.dynInfo.puck is None or self.dynInfo.paddle is None or \ self.dynInfo.puck[1] <= self.dynInfo.paddle[1]) if stepCount > 45: if shouldPrintTraceLine: print(f'{self.dynInfo.str()} >> ', end='') printedLeftArrow = False elif self.dynInfo.puckDirection == self.dynInfo.west and not printedLeftArrow: print('\n\t\t\t\t <===\n') printedLeftArrow = True # The system uses the first 25 steps to find the upper and lower bounds for the field # and then the next 20 to wait for the first serve. inp = 2 if stepCount <= 10 else \ 3 if stepCount <= 24 else \ 2 if stepCount <= 29 else \ 0 if stepCount <= 45 else \ self.agent.makeMove(stepCount, self.dynInfo, reward) # The only valid actions are 0 .. 6. # They mean: nop, nop, up, down, up down action = int(inp) if str(inp) in '012345' and inp != '' else 0 (obs, reward, done, debug) = env.step(action) if stepCount > 45 and shouldPrintTraceLine: move = {0:"-", 2:"Up", 3:"Down"}[action] print(f'{move}') if reward != 0: (computer, player) = self.score pointWinner = {-1: 'computer', 1:'agent'}[reward] (computer, player) = self.score = \ {'computer': (computer+1, player), 'agent': (computer, player+1)}[pointWinner] tag = '' if computer < 21 and player < 21 else '\n\t\tGame over. Take a picture?' print(f'\n\t\tThe {pointWinner} won a point. Computer: {computer}, Agent: {player}. {tag}\n') self.env.render(self.renderMode) print(input('')) env.close() # https://github.com/openai/gym/issues/893 class DynamicInfo(): def __init__(self): self.blocks = None self.yTop = None self.yBottom = None self.midpoint = None self.puckHalfHeight = 1 self.paddleHalfHeight = 7 self.puck = None self.puckSlopeList = [] self.puckSlope = None self.puckDirection = None self.puckIntercept = None self.paddle = None self.paddleVelocity = None self.paddleTarget = None self.east = '-->' self.west = '<--' self.NoneStr = '**' def extractInfo(self, obs): self.blocks = self.getBlocks(obs) # Find the topmost and bottommost rows, i.e., the rows that frame the field. # Since this system counts rows downward, the topmost/bottommost row is the # row with lowest/highest number. blocksRows = [row for (row, _) in self.blocks] blockYTop = min([rowTop for (rowTop, _) in blocksRows]) - 1 blockYBottom = max([rowBottom for (_, rowBottom) in blocksRows]) + 1 self.yTop = blockYTop if self.yTop is None else min(self.yTop, blockYTop) self.yBottom = blockYBottom if self.yBottom is None else max(self.yBottom, blockYBottom) self.midpoint = self.yTop*0.5 + self.yBottom*0.5 if (len(self.blocks)) == 3: self.computePuckIntercept() self.computePaddleTarget() else: self.puckIntercept = None self.puck = None self.puckSlopeList = [] return self def computePaddleTarget(self): paddle = self.blocks[-1] # Care only about the right-most paddle col. Increases chance of hitting puck with top # or bottom of paddle. Those create high-angle shots, which often win points. # Also, creates margin of error for intercept on defense. paddleRightCol = paddle[1][1] paddleMeanRow = self.mean(paddle[0]) # self.paddle will be None initially. self.paddleVelocity = None if self.paddle is None else round(paddleMeanRow - self.paddle[0], 1) self.paddle = (paddleMeanRow, paddleRightCol) self.paddleTarget = None if self.paddleVelocity is None else paddleMeanRow + self.paddleVelocity def computePuckIntercept(self): newPuckBlock = self.blocks[1] # We only care about the left puck column. (Same reasoning as rightmost passdle col.) newPuckLeftCol = newPuckBlock[1][0] newPuckMeanRow = self.mean(newPuckBlock[0]) newPuck = (newPuckMeanRow, newPuckLeftCol) self.puckSlope = None if self.puck is not None: self.puckDirection = self.east if newPuck[1] > self.puck[1] else self.west divisor = newPuck[1] - self.puck[1] slope = 0 if divisor == 0 else (newPuck[0] - self.puck[0]) / divisor self.puckSlopeList.append(int(round(slope, 0))) # Keep the two most recent slopes. (The '-' in front of min is negative indexing.) self.puckSlopeList = self.puckSlopeList[-min(2, len(self.puckSlopeList)):] # If the two most recent slopes are the same, take that as the slope. if len(self.puckSlopeList) > 1 and self.puckSlopeList[-1] == self.puckSlopeList[-2]: self.puckSlope = self.puckSlopeList[-1] self.puck = newPuck # Compute the projected intercept of the puck with the paddle column. if self.puckSlope is not None and self.puckDirection == self.east: distance = self.paddle[1] - self.puck[1] self.puckIntercept = self.puck[0] + distance * self.puckSlope # If the intercept is above or below the frame of the board, fold it over. # Need three comparisons to allow for hitting both top and bottom. # This system counts rows from top down. yTop is smaller than yBottom. if self.puckIntercept < self.yTop: self.puckIntercept += 2 * (self.yTop - self.puckIntercept) if self.puckIntercept > self.yBottom: self.puckIntercept -= 2 * (self.puckIntercept - self.yBottom) if self.puckIntercept < self.yTop: self.puckIntercept += 2 * (self.yTop - self.puckIntercept) def getBlocks(self, obs): """ Returns a list of blocks. Each block is ((row_top, row_bottom), (col_left, col_right)). :param obs: An observation :return: [((int, int), (int, int))] """ # Drop the top 30 rows and the leftmost 15 columns obs = np.delete(obs[30:], list(range(15)), 1) (rows, cols, _) = np.shape(obs) # Build a dictionary. They keys are cell colors. The values are the cells with those colors, partitions = self.partition(obs, rows, cols) # Convert the paritions dictionary into a dictionary with colors as keys. (Same as partitions.) # But now the values are ((rowMin, rowMax), (colMin, colMax)) for that color. colorDict = {color: (self.minAndMax([row for (row, _) in partitions[color]]), self.minAndMax([col for (_, col) in partitions[color]])) for color in partitions} # The blocks are the values of the colorDict dictionary. blocks = colorDict.values() # Return the blocks sorted by column, i.e., from left to right on the screen. # Recall that rc is ((rowMin, rowMax), (colMin, colMax)). So rc[1] is (colMin, colMax) sortedBlocks = sorted(blocks, key=(lambda rc: rc[1])) return sortedBlocks @staticmethod def mean(elts: tuple) -> float: return round(sum(elts) / len(elts), 1) @staticmethod def minAndMax(lst): return (min(lst), max(lst)) if lst else None def paddleTargetBottom(self): return None if self.paddleTarget is None else self.paddleTarget + self.paddleHalfHeight def paddleTargetTop(self): return None if self.paddleTarget is None else self.paddleTarget - self.paddleHalfHeight @staticmethod def partition(obs, rows, cols): """ Partition an np.array into lists, one for each element value. :param obs: An observation :param rows: The number of rows in the observation :param cols: The number of cols in the observation :return: A dictionary. The keys are the individual values in obs. The values are the cells with that value. """ components = {} for r in range(rows): # Use the first element of a row as the background to compare against. bkgrnd = tuple(obs[r, 0]) for c in range(cols): # Must transform the contents of the cells from lists to tuples. Lists may not # be used as dictionary keys. Tuples may. cell = tuple(obs[r, c]) # Ignore background cells. if cell != bkgrnd: components.setdefault(cell, []).append((r, c)) return components def puckInterceptBottom(self): return None if self.puckIntercept is None else self.puckIntercept + self.puckHalfHeight def puckInterceptTop(self): return None if self.puckIntercept is None else self.puckIntercept - self.puckHalfHeight def separation(self): return None if self.paddle is None or self.puck is None else self.paddle[1]-self.puck[1] def separationStr(self): if self.paddle is None or self.puck is None: return '' sep = self.paddle[1]-self.puck[1] halfSep = '-'*int(round((sep/20))) sepStr = f'|{halfSep}{sep}{halfSep}|' return sepStr #None if self.paddle is None or self.puck is None else self.paddle[1]-self.puck[1] def str(self): """ A string representation of the important features of the world. :return: """ st0a = self.toStr(None if self.puck is None else self.puck[0]) st0b = self.toStr(self.puckSlope) st0c = self.toStr(self.puckDirection) st1 = f'puck: [{(st0a)}] {st0b} {st0c} ' st2 = f'{self.toStr(self.puckInterceptTop())} - {self.toStr(self.puckInterceptBottom())}; ' st3 = f'paddle: [{self.paddle[0]}] {self.paddleVelocity} --> ' st4 = f'{self.paddleTargetTop()} - {self.paddleTargetBottom()}; ' st5 = self.separationStr() #f'separation: {self.separation()}' st = st1 + st2 + st3 + st4 + st5 return st def toStr(self, x): """ :param x: Any value :return: the NoneStr ('**') if x is None; otherwise str(x) """ return self.NoneStr if x is None else str(x) class Agent: """ The Agent generic player """ def __init__(self): self.puckInterceptTop = None self.puckInterceptBottom = None self.paddleTargetTop = None self.paddleTargetBottom = None self.paddle = None self.NoMove = 0 self.Up = 2 self.Down = 3 def cacheInfo(self, dynInfo): # Cache these values in case we lose track of the puck or the paddle. if dynInfo.puckInterceptTop() is not None: self.puckInterceptTop = dynInfo.puckInterceptTop() if dynInfo.puckInterceptBottom() is not None: self.puckInterceptBottom = dynInfo.puckInterceptBottom() if dynInfo.paddleTargetTop() is not None: self.paddleTargetTop = dynInfo.paddleTargetTop() if dynInfo.paddleTargetBottom() is not None: self.paddleTargetBottom = dynInfo.paddleTargetBottom() if dynInfo.paddle[0] is not None: self.paddle = dynInfo.paddle[0] class Bot(Agent): """ The Bot player """ def __init__(self): self.moveFrequency = 3 super().__init__() def makeMove(self, stepCount, dynInfo, reward): self.cacheInfo(dynInfo) # The first two rows are intended to prevent to paddle from going too far # out of the field. It doesn't seem to work. # Notice the third row down. We take an action only every third opportunity. # This gives the paddle a change to settle down. # Always leave at least one space between moves. if stepCount % 2 == 0: return self.NoMove # If it's not our turn yet and paddle is still moving, do nothing. if stepCount % self.moveFrequency != 0 and \ dynInfo.paddleVelocity != 0 and \ dynInfo.separation() is not None and \ dynInfo.separation() > 50: return self.NoMove # If puck is moving west, go to center (more or less). if dynInfo.puckDirection != dynInfo.east: return self.Up if self.paddleTargetTop > dynInfo.yTop*0.45 + dynInfo.yBottom*0.55 else \ self.Down if self.paddleTargetBottom < dynInfo.yTop*0.55 + dynInfo.yBottom*0.45 else \ self.NoMove # These are the strategic moves. return self.Up if self.paddleTargetTop > self.puckInterceptBottom else \ self.Down if self.paddleTargetBottom < self.puckInterceptTop else \ self.Up if self.paddleTargetBottom > dynInfo.yBottom + dynInfo.paddleHalfHeight/2 else \ self.Down if self.paddleTargetTop < dynInfo.yTop - dynInfo.paddleHalfHeight/2 else \ self.NoMove class QBot(Agent): """ The Bot player """ def __init__(self): super().__init__() def makeMove(self, stepCount, dynInfo, reward): self.cacheInfo(dynInfo) # The first two rows are intended to prevent to paddle from going too far # out of the field. It doesn't seem to work. # Notice the third row down. We take an action only every third opportunity. # This gives the paddle a change to settle down. # Always leave at least one space between moves. if stepCount % 2 == 0: return 0 # If it's not our turn yet and paddle is still moving, do nothing. if stepCount % self.moveFrequency != 0 and \ dynInfo.paddleVelocity != 0 and \ dynInfo.separation() is not None and \ dynInfo.separation() > 50: return 0 # If puck is moving west, go to center (more or less). if dynInfo.puckDirection != dynInfo.east: return 2 if self.paddleTargetTop > dynInfo.yTop*0.45 + dynInfo.yBottom*0.55 else \ 3 if self.paddleTargetBottom < dynInfo.yTop*0.55 + dynInfo.yBottom*0.45 else \ 0 # These are the strategic moves. return 3 if self.puckInterceptTop > self.paddleTargetBottom else \ 2 if self.puckInterceptBottom < self.paddleTargetTop else \ 0 class Human(Agent): """ The Human player """ def makeMove(self, stepCount, dynInfo, reward): self.cacheInfo(dynInfo) return input(f'{stepCount}. >> ') if __name__ == '__main__': # To have the bot play, use Bot() as second argument. # To have a human play, use Human() as second argument. # For a q-learning bot, use QBot() as second argument. GameControl('Pong-v0', Bot())