diff --git a/planning.py b/planning.py index e0b6aaa24..e89dcd84b 100644 --- a/planning.py +++ b/planning.py @@ -527,6 +527,234 @@ def socks_and_shoes(): effect='LeftSockOn')]) +def logistics_problem(initial_state=None, goal_state=None): + """ + LOGISTICS-PROBLEM + + A logistics problem where a robot moves between places, picking up and + putting down containers in order to deliver them to their destinations. + + Example: + >>> from planning import * + >>> lp = logistics_problem(goal_state='In(C2, D3) & In(C3, D3)') + >>> lp.goal_test() + False + >>> lp.act(expr('PutDown(R1, C1, D1)')) + >>> lp.act(expr('PickUp(R1, C2, D1)')) + >>> lp.act(expr('Move(R1, D1, D3)')) + >>> lp.act(expr('PutDown(R1, C2, D3)')) + >>> lp.act(expr('Move(R1, D3, D2)')) + >>> lp.act(expr('PickUp(R1, C3, D2)')) + >>> lp.act(expr('Move(R1, D2, D3)')) + >>> lp.goal_test() + False + >>> lp.act(expr('PutDown(R1, C3, D3)')) + >>> lp.goal_test() + True + >>> + """ + if initial_state is None: + initial_state = 'In(C1, R1) & In(C2, D1) & In(C3, D2) & In(R1, D1) & Holding(R1)' + if goal_state is None: + raise ValueError('Goal must be defined') + + return PlanningProblem(initial=initial_state, + goals=goal_state, + actions=[Action('PickUp(r, c, d)', + precond='In(r, d) & In(c, d) & ~Holding(r)', + effect='Holding(r) & ~In(c, d) & In(c, r)', + domain='Robot(r) & Place(d) & Container(c)'), + Action('PutDown(r, c, d)', + precond='In(r, d) & In(c, r) & Holding(r)', + effect='~Holding(r) & ~In(c, r) & In(c, d)', + domain='Robot(r) & Place(d) & Container(c)'), + Action('Move(r, d_start, d_end)', + precond='In(r, d_start)', + effect='~In(r, d_start) & In(r, d_end)', + domain='Robot(r) & Place(d_start) & Place(d_end)')], + domain='Container(C1) & Container(C2) & Container(C3) & ' + 'Place(D1) & Place(D2) & Place(D3) & Robot(R1)') + + +def blocks_world(initial, goals, blocks): + """ + GENERALIZED-BLOCKS-WORLD-PROBLEM + + A flexible constructor for creating blocks-world planning problems. + Any initial and goal configuration can be specified for a given set of blocks. + + Example: + >>> from planning import * + >>> initial_state = 'On(C, A) & On(A, Table) & On(B, Table) & Clear(C) & Clear(B)' + >>> goal_state = 'On(A, B) & On(B, C)' + >>> sussman_anomaly = blocks_world(initial_state, goal_state, ['A', 'B', 'C']) + >>> sussman_anomaly.goal_test() + False + >>> sussman_anomaly.act(expr('MoveToTable(C, A)')) + >>> sussman_anomaly.act(expr('Move(B, Table, C)')) + >>> sussman_anomaly.act(expr('Move(A, Table, B)')) + >>> sussman_anomaly.goal_test() + True + >>> + """ + # Dynamically generate the domain knowledge based on the list of blocks + domain = ' & '.join('Block({})'.format(b) for b in blocks) + + actions = [Action('Move(b, x, y)', + precond='On(b, x) & Clear(b) & Clear(y)', + effect='On(b, y) & Clear(x) & ~On(b, x) & ~Clear(y)', + domain='Block(b) & Block(y)'), + Action('MoveToTable(b, x)', + precond='On(b, x) & Clear(b)', + effect='On(b, Table) & Clear(x) & ~On(b, x)', + domain='Block(b) & Block(x)')] + + return PlanningProblem(initial=initial, + goals=goals, + actions=actions, + domain=domain) + + +def rush_hour(): + """ + RUSH-HOUR-PROBLEM (non-numeric version) + + A planning problem for the Rush Hour sliding block puzzle. The goal is to + maneuver the RedCar to the exit. This version uses non-numeric symbols for + grid positions (e.g. R1, C1) instead of integers. + + This specific instance uses: + - RedCar (2x1, horizontal) starting at (R3, C1) + - GreenTruck (3x1, vertical) starting at (R1, C4) + - BlueCar (2x1, vertical) starting at (R5, C2) + """ + initial_state = 'At(RedCar, R3, C1) & At(GreenTruck, R1, C4) & At(BlueCar, R5, C2) & ' \ + 'IsHorizontal(RedCar) & IsVertical(GreenTruck) & IsVertical(BlueCar) & ' \ + 'Clear(R1, C1) & Clear(R1, C2) & Clear(R1, C3) & Clear(R1, C5) & Clear(R1, C6) & ' \ + 'Clear(R2, C1) & Clear(R2, C2) & Clear(R2, C3) & Clear(R2, C5) & Clear(R2, C6) & ' \ + 'Clear(R3, C3) & Clear(R3, C4) & Clear(R3, C5) & Clear(R3, C6) & ' \ + 'Clear(R4, C1) & Clear(R4, C2) & Clear(R4, C3) & Clear(R4, C4) & Clear(R4, C5) & Clear(R4, C6) & ' \ + 'Clear(R5, C1) & Clear(R5, C3) & Clear(R5, C4) & Clear(R5, C5) & Clear(R5, C6) & ' \ + 'Clear(R6, C1) & Clear(R6, C3) & Clear(R6, C4) & Clear(R6, C5) & Clear(R6, C6)' + + # Goal state: the RedCar's left-most part is at column C5. + goal_state = 'At(RedCar, R3, C5)' + + domain = 'Vehicle(RedCar) & Vehicle(GreenTruck) & Vehicle(BlueCar) & ' \ + 'Car(RedCar) & Truck(GreenTruck) & Car(BlueCar) & ' \ + 'Row(R1) & Row(R2) & Row(R3) & Row(R4) & Row(R5) & Row(R6) & ' \ + 'Col(C1) & Col(C2) & Col(C3) & Col(C4) & Col(C5) & Col(C6) & ' \ + 'NextTo(R1, R2) & NextTo(R2, R3) & NextTo(R3, R4) & NextTo(R4, R5) & NextTo(R5, R6) & ' \ + 'NextTo(C1, C2) & NextTo(C2, C3) & NextTo(C3, C4) & NextTo(C4, C5) & NextTo(C5, C6)' + + actions = [ + # car actions (length 2) + Action('MoveRightCar(v, r, c1, c2, c3)', + precond='At(v, r, c1) & Car(v) & IsHorizontal(v) & NextTo(c1, c2) & NextTo(c2, c3) & Clear(r, c3)', + effect='At(v, r, c2) & ~At(v, r, c1) & Clear(r, c1) & ~Clear(r, c3)', + domain='Vehicle(v) & Row(r) & Col(c1) & Col(c2) & Col(c3)'), + Action('MoveLeftCar(v, r, c1, c2, c3)', + precond='At(v, r, c2) & Car(v) & IsHorizontal(v) & NextTo(c1, c2) & NextTo(c2, c3) & Clear(r, c1)', + effect='At(v, r, c1) & ~At(v, r, c2) & Clear(r, c3) & ~Clear(r, c1)', + domain='Vehicle(v) & Row(r) & Col(c1) & Col(c2) & Col(c3)'), + Action('MoveDownCar(v, r1, r2, r3, c)', + precond='At(v, r1, c) & Car(v) & IsVertical(v) & NextTo(r1, r2) & NextTo(r2, r3) & Clear(r3, c)', + effect='At(v, r2, c) & ~At(v, r1, c) & Clear(r1, c) & ~Clear(r3, c)', + domain='Vehicle(v) & Row(r1) & Row(r2) & Row(r3) & Col(c)'), + Action('MoveUpCar(v, r1, r2, r3, c)', + precond='At(v, r2, c) & Car(v) & IsVertical(v) & NextTo(r1, r2) & NextTo(r2, r3) & Clear(r1, c)', + effect='At(v, r1, c) & ~At(v, r2, c) & Clear(r3, c) & ~Clear(r1, c)', + domain='Vehicle(v) & Row(r1) & Row(r2) & Row(r3) & Col(c)'), + # truck actions (length 3) + Action('MoveRightTruck(v, r, c1, c2, c3, c4)', + precond='At(v, r, c1) & Truck(v) & IsHorizontal(v) & NextTo(c1, c2) & NextTo(c2, c3) & ' + 'NextTo(c3, c4) & Clear(r, c4)', + effect='At(v, r, c2) & ~At(v, r, c1) & Clear(r, c1) & ~Clear(r, c4)', + domain='Vehicle(v) & Row(r) & Col(c1) & Col(c2) & Col(c3) & Col(c4)'), + Action('MoveLeftTruck(v, r, c1, c2, c3, c4)', + precond='At(v, r, c2) & Truck(v) & IsHorizontal(v) & NextTo(c1, c2) & NextTo(c2, c3) & ' + 'NextTo(c3, c4) & Clear(r, c1)', + effect='At(v, r, c1) & ~At(v, r, c2) & Clear(r, c4) & ~Clear(r, c1)', + domain='Vehicle(v) & Row(r) & Col(c1) & Col(c2) & Col(c3) & Col(c4)'), + Action('MoveDownTruck(v, r1, r2, r3, r4, c)', + precond='At(v, r1, c) & Truck(v) & IsVertical(v) & NextTo(r1, r2) & NextTo(r2, r3) & ' + 'NextTo(r3, r4) & Clear(r4, c)', + effect='At(v, r2, c) & ~At(v, r1, c) & Clear(r1, c) & ~Clear(r4, c)', + domain='Vehicle(v) & Row(r1) & Row(r2) & Row(r3) & Row(r4) & Col(c)'), + Action('MoveUpTruck(v, r1, r2, r3, r4, c)', + precond='At(v, r2, c) & Truck(v) & IsVertical(v) & NextTo(r1, r2) & NextTo(r2, r3) & ' + 'NextTo(r3, r4) & Clear(r1, c)', + effect='At(v, r1, c) & ~At(v, r2, c) & Clear(r4, c) & ~Clear(r1, c)', + domain='Vehicle(v) & Row(r1) & Row(r2) & Row(r3) & Row(r4) & Col(c)')] + + return PlanningProblem(initial=initial_state, + goals=goal_state, + actions=actions, + domain=domain) + + +def rush_hour_optimized(): + """ + RUSH-HOUR-PROBLEM (optimized version) + + This version optimizes the planning problem by creating vehicle-specific + actions. Since each vehicle's orientation is fixed, generic predicates like + IsHorizontal can be removed and actions can be created that only apply to the + correct vehicle on its fixed axis of movement. This drastically reduces the + number of permutations the planner needs to generate and check. + """ + # Initial state is simpler as orientation is now baked into the actions. + initial_state = 'At(RedCar, R3, C1) & At(GreenTruck, R1, C4) & At(BlueCar, R5, C2) & ' \ + 'Clear(R1, C1) & Clear(R1, C2) & Clear(R1, C3) & Clear(R1, C5) & Clear(R1, C6) & ' \ + 'Clear(R2, C1) & Clear(R2, C2) & Clear(R2, C3) & Clear(R2, C5) & Clear(R2, C6) & ' \ + 'Clear(R3, C3) & Clear(R3, C4) & Clear(R3, C5) & Clear(R3, C6) & ' \ + 'Clear(R4, C1) & Clear(R4, C2) & Clear(R4, C3) & Clear(R4, C4) & Clear(R4, C5) & Clear(R4, C6) & ' \ + 'Clear(R5, C1) & Clear(R5, C3) & Clear(R5, C4) & Clear(R5, C5) & Clear(R5, C6) & ' \ + 'Clear(R6, C1) & Clear(R6, C3) & Clear(R6, C4) & Clear(R6, C5) & Clear(R6, C6)' + + goal_state = 'At(RedCar, R3, C5)' + + domain = 'Vehicle(RedCar) & Vehicle(GreenTruck) & Vehicle(BlueCar) & ' \ + 'Row(R1) & Row(R2) & Row(R3) & Row(R4) & Row(R5) & Row(R6) & ' \ + 'Col(C1) & Col(C2) & Col(C3) & Col(C4) & Col(C5) & Col(C6) & ' \ + 'NextTo(R1, R2) & NextTo(R2, R3) & NextTo(R3, R4) & NextTo(R4, R5) & NextTo(R5, R6) & ' \ + 'NextTo(C1, C2) & NextTo(C2, C3) & NextTo(C3, C4) & NextTo(C4, C5) & NextTo(C5, C6)' + + actions = [ + # RedCar is horizontal on row R3, length 2 + Action('MoveRedCarRight(c1, c2, c3)', + precond='At(RedCar, R3, c1) & NextTo(c1, c2) & NextTo(c2, c3) & Clear(R3, c3)', + effect='At(RedCar, R3, c2) & ~At(RedCar, R3, c1) & Clear(R3, c1) & ~Clear(R3, c3)', + domain='Col(c1) & Col(c2) & Col(c3)'), + Action('MoveRedCarLeft(c1, c2, c3)', + precond='At(RedCar, R3, c2) & NextTo(c1, c2) & NextTo(c2, c3) & Clear(R3, c1)', + effect='At(RedCar, R3, c1) & ~At(RedCar, R3, c2) & Clear(R3, c3) & ~Clear(R3, c1)', + domain='Col(c1) & Col(c2) & Col(c3)'), + # GreenTruck is vertical on column C4, length 3 + Action('MoveGreenTruckDown(r1, r2, r3, r4)', + precond='At(GreenTruck, r1, C4) & NextTo(r1, r2) & NextTo(r2, r3) & NextTo(r3, r4) & Clear(r4, C4)', + effect='At(GreenTruck, r2, C4) & ~At(GreenTruck, r1, C4) & Clear(r1, C4) & ~Clear(r4, C4)', + domain='Row(r1) & Row(r2) & Row(r3) & Row(r4)'), + Action('MoveGreenTruckUp(r1, r2, r3, r4)', + precond='At(GreenTruck, r2, C4) & NextTo(r1, r2) & NextTo(r2, r3) & NextTo(r3, r4) & Clear(r1, C4)', + effect='At(GreenTruck, r1, C4) & ~At(GreenTruck, r2, C4) & Clear(r4, C4) & ~Clear(r1, C4)', + domain='Row(r1) & Row(r2) & Row(r3) & Row(r4)'), + # BlueCar is vertical on column C2, length 2 + Action('MoveBlueCarDown(r1, r2, r3)', + precond='At(BlueCar, r1, C2) & NextTo(r1, r2) & NextTo(r2, r3) & Clear(r3, C2)', + effect='At(BlueCar, r2, C2) & ~At(BlueCar, r1, C2) & Clear(r1, C2) & ~Clear(r3, C2)', + domain='Row(r1) & Row(r2) & Row(r3)'), + Action('MoveBlueCarUp(r1, r2, r3)', + precond='At(BlueCar, r2, C2) & NextTo(r1, r2) & NextTo(r2, r3) & Clear(r1, C2)', + effect='At(BlueCar, r1, C2) & ~At(BlueCar, r2, C2) & Clear(r3, C2) & ~Clear(r1, C2)', + domain='Row(r1) & Row(r2) & Row(r3)')] + + return PlanningProblem(initial=initial_state, + goals=goal_state, + actions=actions, + domain=domain) + + def double_tennis_problem(): """ [Figure 11.10] DOUBLE-TENNIS-PROBLEM @@ -749,6 +977,11 @@ def expand_transitions(state, actions): associate('&', sorted(planning_problem.goals)), solution_length, SAT_solver=SAT_solver) +def predicate_negate(e): + """Return the logical negation of an Expr predicate, avoiding a double 'Not' prefix.""" + return Expr(e.op[3:], *e.args) if e.op.startswith('Not') else Expr('Not' + e.op, *e.args) + + class Level: """ Contains the state of the planning problem @@ -763,20 +996,37 @@ def __init__(self, kb): # current state self.current_state = kb.clauses # current action to state link + # Action -> preconditions for that action self.current_action_links = {} # current state to action link + # Precondition -> what is applicable (actions) self.current_state_links = {} - # current action to next state link + # current action to next state link (e.g. Go(Home, HW) --> At(HW) and NotAt(Home)) + # aka forward link in time (dependency) self.next_action_links = {} - # next state to current action link + # next state to current action link (e.g. NotAt(Home): [Go(Home, HW), Go(Home, SM)]) + # aka backwards link in time (dependency) self.next_state_links = {} # mutually exclusive actions - self.mutex = [] + self.action_mutexes = [] + # mutually exclusive states + self.state_mutexes = [] def __call__(self, actions, objects): self.build(actions, objects) self.find_mutex() + def __str__(self): + state_str = ', '.join(str(s) for s in self.current_state) + action_str = ', '.join(str(a) for a in self.current_action_links.keys()) + mutex_str = ', '.join(str(m) for m in self.action_mutexes) + return ('\n' + ' Current State: {{{}}}\n' + ' Actions: {{{}}}\n' + ' Mutex: {{{}}}\n'.format(state_str, action_str, mutex_str)) + + __repr__ = __str__ + def separate(self, e): """Separates an iterable of elements into positive and negative parts""" @@ -792,43 +1042,128 @@ def separate(self, e): def find_mutex(self): """Finds mutually exclusive actions""" - # Inconsistent effects - pos_nsl, neg_nsl = self.separate(self.next_state_links) - - for negeff in neg_nsl: - new_negeff = Expr(negeff.op[3:], *negeff.args) - for poseff in pos_nsl: - if new_negeff == poseff: - for a in self.next_state_links[poseff]: - for b in self.next_state_links[negeff]: - if {a, b} not in self.mutex: - self.mutex.append({a, b}) - - # Interference will be calculated with the last step - pos_csl, neg_csl = self.separate(self.current_state_links) - - # Competing needs - for pos_precond in pos_csl: - for neg_precond in neg_csl: - new_neg_precond = Expr(neg_precond.op[3:], *neg_precond.args) - if new_neg_precond == pos_precond: - for a in self.current_state_links[pos_precond]: - for b in self.current_state_links[neg_precond]: - if {a, b} not in self.mutex: - self.mutex.append({a, b}) - - # Inconsistent support + # clear out effects from state mutex prior computation + self.action_mutexes = [] + + # Competing needs - two actions are mutex if any of their preconditions + # are mutex at the previous state level + for a1, a2 in itertools.combinations(self.current_action_links.keys(), 2): + preconds_a1 = self.current_action_links[a1] + preconds_a2 = self.current_action_links[a2] + + if any({p, q} in self.state_mutexes for p in preconds_a1 for q in preconds_a2): + mutex_pair = {a1, a2} + if mutex_pair not in self.action_mutexes: + self.action_mutexes.append(mutex_pair) + + # Interference and inconsistent effects mutex calculation + for a1, a2 in itertools.combinations(self.next_action_links.keys(), 2): + preconds_a1 = self.current_action_links.get(a1, []) + preconds_a2 = self.current_action_links.get(a2, []) + effects_a1 = self.next_action_links.get(a1, []) + effects_a2 = self.next_action_links.get(a2, []) + + interference = False + # Interference check + for p1 in preconds_a1: + if predicate_negate(p1) in effects_a2: + interference = True + for p2 in preconds_a2: + if predicate_negate(p2) in effects_a1: + interference = True + + # Inconsistent effects check + for e1 in effects_a1: + if predicate_negate(e1) in effects_a2: + interference = True + for e2 in effects_a2: + if predicate_negate(e2) in effects_a1: + interference = True + + if interference: + mutex_pair = {a1, a2} + if mutex_pair not in self.action_mutexes: + self.action_mutexes.append(mutex_pair) + + def populate_prop_mutexes(self): + """Compute the next level's proposition mutexes based on the current action mutexes""" + + # Inconsistent support - two props cannot be true given competing supporting actions state_mutex = [] - for pair in self.mutex: - next_state_0 = self.next_action_links[list(pair)[0]] - if len(pair) == 2: - next_state_1 = self.next_action_links[list(pair)[1]] - else: - next_state_1 = self.next_action_links[list(pair)[0]] - if (len(next_state_0) == 1) and (len(next_state_1) == 1): - state_mutex.append({next_state_0[0], next_state_1[0]}) - - self.mutex = self.mutex + state_mutex + next_state_pairs = itertools.combinations(self.next_state_links.keys(), 2) + for next_state_pair in list(next_state_pairs): + s1, s2 = list(next_state_pair) + acts_to_s1 = self.next_state_links.get(s1, []) + acts_to_s2 = self.next_state_links.get(s2, []) + + # ensure our mutexes only apply to pairs, not single states + if acts_to_s1 == [] or acts_to_s2 == []: + continue + + # if any two actions that lead to these states are not mutex, + # do not add a mutex to these states + if all({a1, a2} in self.action_mutexes or {a2, a1} in self.action_mutexes + for a1 in acts_to_s1 for a2 in acts_to_s2): + mutex_pair = {s1, s2} + if mutex_pair not in state_mutex: + state_mutex.append(mutex_pair) + + # If there are pairs of propositions that are negations of each other, they must be mutex + for s1i in range(len(self.current_state)): + for s2i in range(s1i, len(self.current_state)): + s1, s2 = self.current_state[s1i], self.current_state[s2i] + if (repr(s2)[0:3] == 'Not' and repr(s1) == repr(s2)[3:] or + repr(s1)[0:3] == 'Not' and repr(s1)[3:] == repr(s2)): + mutex_pair = {s1, s2} + if mutex_pair not in state_mutex: + state_mutex.append(mutex_pair) + + return state_mutex + + def prune_invalid_actions(self): + """Remove actions whose own preconditions are mutex (unsupportable)""" + + to_remove = [] + + # Normalize state mutex set for fast membership checks + state_mutex_lookup = set() + for m in self.state_mutexes: + state_mutex_lookup.add(frozenset(m)) + + for action, preconds in list(self.current_action_links.items()): + invalid = False + for p1, p2 in itertools.combinations(preconds, 2): + if frozenset({p1, p2}) in state_mutex_lookup: + invalid = True + break + if invalid: + to_remove.append(action) + + # Remove invalid actions from all mappings + for action in to_remove: + # forward mappings + self.current_action_links.pop(action, None) + self.next_action_links.pop(action, None) + + # reverse mapping: state -> actions (current_state_links) + for precond in list(self.current_state_links.keys()): + actions_for_pre = self.current_state_links.get(precond, []) + if action in actions_for_pre: + actions_for_pre.remove(action) + if not actions_for_pre: + self.current_state_links.pop(precond, None) + else: + self.current_state_links[precond] = actions_for_pre + + # reverse mapping: next_state -> actions (next_state_links) + for effect in list(self.next_state_links.keys()): + actions_for_eff = self.next_state_links.get(effect, []) + if action in actions_for_eff: + actions_for_eff.remove(action) + if not actions_for_eff: + self.next_state_links.pop(effect, None) + else: + self.next_state_links[effect] = actions_for_eff def build(self, actions, objects): """Populates the lists and dictionaries containing the state action dependencies""" @@ -895,19 +1230,32 @@ def __init__(self, planning_problem): def __call__(self): self.expand_graph() + def __str__(self): + levels_str = '\n'.join('Level {}:\n{}'.format(i, level) + for i, level in enumerate(self.levels)) + return '\n Objects: {}\n{}\n'.format(self.objects, levels_str) + + __repr__ = __str__ + def expand_graph(self): """Expands the graph by a level""" last_level = self.levels[-1] + # populate state/actions/mutexes last_level(self.planning_problem.actions, self.objects) - self.levels.append(last_level.perform_actions()) + last_level.prune_invalid_actions() + # create new level + new_level = last_level.perform_actions() + # populate the mutexes for the next state level to come + new_level.state_mutexes = last_level.populate_prop_mutexes() + self.levels.append(new_level) def non_mutex_goals(self, goals, index): """Checks whether the goals are mutually exclusive""" goal_perm = itertools.combinations(goals, 2) for g in goal_perm: - if set(g) in self.levels[index].mutex: + if set(g) in self.levels[index].state_mutexes: return False return True @@ -924,72 +1272,114 @@ def __init__(self, planning_problem): self.no_goods = [] self.solution = [] + def __str__(self): + sol_str = ('No solution found' if not self.solution + else 'Solution with {} steps'.format(len(self.solution))) + return '\n Nogoods: {}\n {}\n'.format(len(self.no_goods), sol_str) + + __repr__ = __str__ + def check_leveloff(self): - """Checks if the graph has levelled off""" + """Checks if the graph has leveled off""" - check = (set(self.graph.levels[-1].current_state) == set(self.graph.levels[-2].current_state)) + if len(self.graph.levels) < 2: + return False - if check: - return True + level = self.graph.levels[-1] + prev_level = self.graph.levels[-2] - def extract_solution(self, goals, index): - """Extracts the solution""" + same_state = set(level.current_state) == set(prev_level.current_state) - level = self.graph.levels[index] - if not self.graph.non_mutex_goals(goals, index): - self.no_goods.append((level, goals)) - return + level_mutex = set(frozenset(m) for m in level.state_mutexes) + prev_mutex = set(frozenset(m) for m in prev_level.state_mutexes) + same_mutex = level_mutex == prev_mutex + + return same_state and same_mutex + + def get_preconditions_for(self, action_set, level): + """Collects all unique preconditions for a given set of actions in a level""" + + all_preconditions = set() + for action in action_set: + preconditions = level.current_action_links.get(action, []) + all_preconditions.update(preconditions) + return all_preconditions - level = self.graph.levels[index - 1] + def find_valid_action_sets(self, goals, level): + """ + Finds sets of actions in the given level that are not mutually exclusive + and that collectively satisfy all the goals. + """ + + valid_sets = [] - # Create all combinations of actions that satisfy the goal - actions = [] - for goal in goals: - actions.append(level.next_state_links[goal]) + actions_for_goal = {g: level.next_state_links.get(g, []) for g in goals} + potential_action_groups = [actions_for_goal[g] for g in goals] - all_actions = list(itertools.product(*actions)) + for action_combination in itertools.product(*potential_action_groups): + action_set = set(action_combination) - # Filter out non-mutex actions - non_mutex_actions = [] - for action_tuple in all_actions: - action_pairs = itertools.combinations(list(set(action_tuple)), 2) - non_mutex_actions.append(list(set(action_tuple))) - for pair in action_pairs: - if set(pair) in level.mutex: - non_mutex_actions.pop(-1) + is_mutex = False + for a1, a2 in itertools.combinations(action_set, 2): + if {a1, a2} in level.action_mutexes: + is_mutex = True break - # Recursion - for action_list in non_mutex_actions: - if [action_list, index] not in self.solution: - self.solution.append([action_list, index]) + if not is_mutex and action_set not in valid_sets: + valid_sets.append(action_set) - new_goals = [] - for act in set(action_list): - if act in level.current_action_links: - new_goals = new_goals + level.current_action_links[act] + return valid_sets - if abs(index) + 1 == len(self.graph.levels): - return - elif (level, new_goals) in self.no_goods: - return - else: - self.extract_solution(new_goals, index - 1) + def extract_solution(self, goals): + """ + Starts the solution extraction process by calling the recursive helper + and returning the final plan. + """ - # Level-Order multiple solutions - solution = [] - for item in self.solution: - if item[1] == -1: - solution.append([]) - solution[-1].append(item[0]) + return self.extract_solution_recursive(set(goals), len(self.graph.levels) - 1) + + def extract_solution_recursive(self, goals, level_index): + """ + Recursively searches for a plan backwards from a given proposition level. + + goals is the set of goal propositions to satisfy and level_index is the + index of the proposition level currently being solved. + """ + + # Base case: we have recursed back to the initial proposition layer (level 0). + # The goals at this point are the preconditions for the very first set of + # actions, so we just check whether they hold in the initial state. + if level_index == 0: + initial_state = set(self.graph.levels[0].current_state) + if goals.issubset(initial_state): + return [] # success, return the empty plan to be built upon else: - solution[-1].append(item[0]) + return None # failure, preconditions are not met - for num, item in enumerate(solution): - item.reverse() - solution[num] = item + # Memoization: check whether we already proved this subproblem unsolvable. + if (level_index, frozenset(goals)) in self.no_goods: + return None - return solution + # Recursive step: to satisfy the goals at level_index we need a set of + # non-mutex actions from the previous level's action layer. + action_level = self.graph.levels[level_index - 1] + valid_action_sets = self.find_valid_action_sets(goals, action_level) + + for action_set in valid_action_sets: + # The new sub-goals are the combined preconditions for this action set. + new_goals = self.get_preconditions_for(action_set, action_level) + + # Recurse to solve for the new goals at the previous proposition layer. + sub_plan = self.extract_solution_recursive(new_goals, level_index - 1) + + if sub_plan is not None: + return sub_plan + [list(action_set)] + + # No solution from this subproblem, so record it as a no-good. + nogood_item = (level_index, frozenset(goals)) + if nogood_item not in self.no_goods: + self.no_goods.append(nogood_item) + return None def goal_test(self, kb): return all(kb.ask(q) is not False for q in self.graph.planning_problem.goals) @@ -999,17 +1389,20 @@ def execute(self): while True: self.graph.expand_graph() - if (self.goal_test(self.graph.levels[-1].kb) and self.graph.non_mutex_goals( - self.graph.planning_problem.goals, -1)): - solution = self.extract_solution(self.graph.planning_problem.goals, -1) + if (self.goal_test(self.graph.levels[-1].kb) and + self.graph.non_mutex_goals(self.graph.planning_problem.goals, -1)): + solution = self.extract_solution(self.graph.planning_problem.goals) if solution: - return solution + return [solution] - if len(self.graph.levels) >= 2 and self.check_leveloff(): + if self.check_leveloff(): return None class Linearize: + """ + Coordinator that linearizes partially ordered solutions generated by a GraphPlan object. + """ def __init__(self, planning_problem): self.planning_problem = planning_problem @@ -1018,12 +1411,14 @@ def filter(self, solution): """Filter out persistence actions from a solution""" new_solution = [] - for section in solution[0]: + for section in solution: new_section = [] for operation in section: if not (operation.op[0] == 'P' and operation.op[1].isupper()): new_section.append(operation) - new_solution.append(new_section) + # filter may remove all actions if all actions are persistent + if new_section != []: + new_solution.append(new_section) return new_solution def orderlevel(self, level, planning_problem): @@ -1039,22 +1434,43 @@ def orderlevel(self, level, planning_problem): except: count = 0 temp = copy.deepcopy(planning_problem) - break + continue if count == len(permutation): return list(permutation), temp - return None + # identifying a linear ordering for the level failed + return None, planning_problem def execute(self): - """Finds total-order solution for a planning graph""" + """Finds a total-order solution for a planning graph (not necessarily unique)""" graph_plan_solution = GraphPlan(self.planning_problem).execute() - filtered_solution = self.filter(graph_plan_solution) - ordered_solution = [] - planning_problem = self.planning_problem - for level in filtered_solution: - level_solution, planning_problem = self.orderlevel(level, planning_problem) - for element in level_solution: - ordered_solution.append(element) + + # exit if no plan found + if graph_plan_solution is None: + return None + + ordered_solution = None + for possible_plan in graph_plan_solution: + filtered_solution = self.filter(possible_plan) + + ordered_solution = [] + # planning_problem maintains the current state as we iterate over the + # levels, allowing test application of the actions + planning_problem = self.planning_problem + for level in filtered_solution: + level_solution, planning_problem = self.orderlevel(level, planning_problem) + if not level_solution: + # level failed to apply, this plan does not work + ordered_solution = None + break + + for element in level_solution: + ordered_solution.append(element) + + if not ordered_solution: + continue + else: + break return ordered_solution @@ -1397,7 +1813,7 @@ def air_cargo_graph_plan(): def have_cake_and_eat_cake_too_graph_plan(): """Solves the cake problem using GraphPlan""" - return [GraphPlan(have_cake_and_eat_cake_too()).execute()[1]] + return GraphPlan(have_cake_and_eat_cake_too()).execute() def shopping_graph_plan(): diff --git a/requirements.txt b/requirements.txt index dd6b1be8a..3604c4e40 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,7 @@ numpy opencv-python pandas pillow +pytest pytest-cov qpsolvers scipy diff --git a/tests/test_graphplan.py b/tests/test_graphplan.py new file mode 100644 index 000000000..0c4ac2e01 --- /dev/null +++ b/tests/test_graphplan.py @@ -0,0 +1,196 @@ +from multiprocessing import Process, Queue + +import pytest + +from planning import * + + +def test_blocksworld_manual(): + sbw = simple_blocks_world() + assert sbw.goal_test() is False + sbw.act(expr('ToTable(A, B)')) + sbw.act(expr('FromTable(B, A)')) + assert sbw.goal_test() is False + sbw.act(expr('FromTable(C, B)')) + assert sbw.goal_test() is True + + +def test_logistics_manual(): + init = 'In(C1, R1) & In(C2, D1) & In(C3, D2) & In(R1, D1) & Holding(R1)' + goal_state = 'In(C2, D3) & In(C3, D3)' + p = logistics_problem(init, goal_state) + assert p.goal_test() is False + p.act(expr('PutDown(R1, C1, D1)')) + p.act(expr('PickUp(R1, C2, D1)')) + p.act(expr('Move(R1, D1, D3)')) + p.act(expr('PutDown(R1, C2, D3)')) + p.act(expr('Move(R1, D3, D2)')) + p.act(expr('PickUp(R1, C3, D2)')) + p.act(expr('Move(R1, D2, D3)')) + assert p.goal_test() is False + p.act(expr('PutDown(R1, C3, D3)')) + assert p.goal_test() is True + + +def test_generalized_blocksworld_manual(): + """ + Manual test for the generalized blocks world problem constructor. + This test case involves stacking four blocks (A, B, C, D) into a single tower. + """ + initial_state = ('On(A, Table) & On(B, Table) & On(C, Table) & On(D, Table) & ' + 'Clear(A) & Clear(B) & Clear(C) & Clear(D)') + goal_state = 'On(A, B) & On(B, C) & On(C, D)' + bw_problem = blocks_world(initial_state, goal_state, ['A', 'B', 'C', 'D']) + assert bw_problem.goal_test() is False + bw_problem.act(expr('Move(C, Table, D)')) + assert bw_problem.goal_test() is False + bw_problem.act(expr('Move(B, Table, C)')) + assert bw_problem.goal_test() is False + bw_problem.act(expr('Move(A, Table, B)')) + assert bw_problem.goal_test() is True + + +def verify_solution(p): + sol = Linearize(p).execute() + assert p.goal_test() is False + for act in sol: + p.act(expr(act)) + assert p.goal_test() is True + + +def test_air_cargo(): + verify_solution(air_cargo()) + + +def test_spare_tire(): + verify_solution(spare_tire()) + + +def test_three_block_tower(): + verify_solution(three_block_tower()) + + +def test_simple_blocks_world(): + verify_solution(simple_blocks_world()) + + +def test_shopping_problem(): + verify_solution(shopping_problem()) + + +def test_socks_and_shoes(): + verify_solution(socks_and_shoes()) + + +def test_have_cake_and_eat_cake_too(): + verify_solution(have_cake_and_eat_cake_too()) + + +@pytest.mark.parametrize('goal_state', [ + 'In(C1, D1)', + 'In(C1, D2)', + 'In(C1, D1) & In(R1, D2)', + 'In(R1, D2) & In(C1, D1)', + 'In(C1, D1) & In(C3, R1)', + 'In(C1, D1) & In(C3, R1) & In(R1, D3)', + 'In(C1, D1) & In(R1, D3) & In(C3, R1)', + 'In(C1, D1) & In(C3, D3)', + 'In(C1, D1) & In(R1, D2) & In(C3, R1)', + 'In(C1, D1) & In(C3, R1) & In(R1, D3)', + 'In(C1, D1) & In(C2, D3)', + 'In(C3, D1)', + 'In(C2, D3)', + 'In(C2, D3) & In(C3, D3)', + 'In(C3, D3) & In(C2, D3)', + 'In(C1, D2) & In(C3, D3)', + 'In(C1, D3) & In(C2, D3) & In(C3, D3)', + 'In(C1, D2) & In(C3, D3) & In(C2, D1)', + 'In(C3, D3)', + 'In(C1, D2) & In(C3, D3) & In(C2, D3) & In(R1, D1)' +]) +def test_logistics_plan_valid(goal_state): + """These should yield a valid (non-crashing) plan, even if empty.""" + init = 'In(C1, R1) & In(C2, D1) & In(C3, D2) & In(R1, D1) & Holding(R1)' + verify_solution(logistics_problem(init, goal_state)) + + +def test_rush_hour_manual_alt_sequence(): + """ + Provides an alternative manual test for the Rush Hour problem. + + This solution is less efficient but still valid. It interleaves the + movements of different vehicles and includes an unnecessary move to verify + that the actions correctly modify the game state without breaking the rules. + """ + problem = rush_hour() + assert not problem.goal_test() + + # Make an unnecessary move with the BlueCar to show it works. + problem.act(expr('MoveUpCar(BlueCar, R4, R5, R6, C2)')) + assert not problem.goal_test(), 'Moving the BlueCar should not solve the puzzle.' + + # Start clearing the main path by moving the GreenTruck down. + problem.act(expr('MoveDownTruck(GreenTruck, R1, R2, R3, R4, C4)')) + + # Move the RedCar into the newly available space. + problem.act(expr('MoveRightCar(RedCar, R3, C1, C2, C3)')) + assert not problem.goal_test() + + # Continue clearing the path. + problem.act(expr('MoveDownTruck(GreenTruck, R2, R3, R4, R5, C4)')) + problem.act(expr('MoveRightCar(RedCar, R3, C2, C3, C4)')) + assert not problem.goal_test() + + # Final moves to solve the puzzle. + problem.act(expr('MoveDownTruck(GreenTruck, R3, R4, R5, R6, C4)')) + problem.act(expr('MoveRightCar(RedCar, R3, C3, C4, C5)')) + problem.act(expr('MoveRightCar(RedCar, R3, C4, C5, C6)')) + assert problem.goal_test() + + +def test_rush_hour_optimized(): + verify_solution(rush_hour_optimized()) + + +def test_planner_leveloff(): + def run_planner_in_queue(problem, queue): + queue.put(Linearize(problem).execute()) + + p = blocks_world( + 'On(A, Table) & On(B, Table) & On(C, Table) & Clear(A) & Clear(B) & Clear(C)', + 'On(A, B) & On(B, C) & On(C, A)', + ['A', 'B', 'C']) + + result_queue = Queue() + proc = Process(target=run_planner_in_queue, args=(p, result_queue)) + proc.start() + proc.join(timeout=3) + + if proc.is_alive(): + proc.terminate() + proc.join() + assert False # ran for 3 seconds and did not exit in level-off + else: + result = result_queue.get() + assert result is None or result == [] or result == [[]] + + +def test_impossible_cake_exits_via_leveloff(): + """Verify that GraphPlan terminates and returns None for the impossible cake problem.""" + + def impossible_cake_problem(): + """ + An impossible planning problem to demonstrate GraphPlan's level-off detection. + + The goal is to both Have(Cake) and Eaten(Cake). However, the only available + action, Eat(Cake), has the effect of ~Have(Cake). The propositions + Have(Cake) and Eaten(Cake) become mutually exclusive at the first level, + and the graph quickly levels off, proving the goal is unreachable. + """ + return PlanningProblem(initial='Have(Cake) & ~Eaten(Cake)', + goals='Have(Cake) & Eaten(Cake)', + actions=[Action('Eat(Cake)', + precond='Have(Cake)', + effect='Eaten(Cake) & ~Have(Cake)')]) + + assert Linearize(impossible_cake_problem()).execute() is None diff --git a/tests/test_planning.py b/tests/test_planning.py index 45f43aae9..640b6c64c 100644 --- a/tests/test_planning.py +++ b/tests/test_planning.py @@ -229,8 +229,12 @@ def test_graph_plan(): shopping_problem_solution = shopping_graph_plan() shopping_problem_solution = linearize(shopping_problem_solution) - assert expr('Go(Home, HW)') in shopping_problem_solution - assert expr('Go(Home, SM)') in shopping_problem_solution + # The plan must visit both stores; the route may be reached either directly + # from Home or by hopping between the stores (e.g. Home -> SM -> HW). + assert (expr('Go(Home, HW)') in shopping_problem_solution or + expr('Go(SM, HW)') in shopping_problem_solution) + assert (expr('Go(Home, SM)') in shopping_problem_solution or + expr('Go(HW, SM)') in shopping_problem_solution) assert expr('Buy(Drill, HW)') in shopping_problem_solution assert expr('Buy(Banana, SM)') in shopping_problem_solution assert expr('Buy(Milk, SM)') in shopping_problem_solution