defuse_technique(self, tech): """ Use an exploration technique with this SimulationManager. Techniques can be found in :mod:`angr.exploration_techniques`. :param tech: An ExplorationTechnique object that contains code to modify this SimulationManager's behavior. :type tech: ExplorationTechnique :return: The technique that was added, for convenience """ ifnotisinstance(tech, ExplorationTechnique): raise SimulationManagerError
# XXX: as promised tech.project = self._project tech.setup(self)
defexplore(self, stash='active', n=None, find=None, avoid=None, find_stash='found', avoid_stash='avoid', cfg=None, num_find=1, **kwargs): """ Tick stash "stash" forward (up to "n" times or until "num_find" states are found), looking for condition "find", avoiding condition "avoid". Stores found states into "find_stash' and avoided states into "avoid_stash". The "find" and "avoid" parameters may be any of: - An address to find - A set or list of addresses to find - A function that takes a state and returns whether or not it matches. If an angr CFG is passed in as the "cfg" parameter and "find" is either a number or a list or a set, then any states which cannot possibly reach a success state without going through a failure state will be preemptively avoided. """ num_find += len(self._stashes[find_stash]) if find_stash in self._stashes else0 tech = self.use_technique(Explorer(find, avoid, find_stash, avoid_stash, cfg, num_find))
# Modify first Veritesting so that they can work together. deviation_filter_saved = None for t in self._techniques: ifisinstance(t,Veritesting): deviation_filter_saved = t.options.get("deviation_filter",None) if deviation_filter_saved isnotNone: t.options["deviation_filter"] = lambda s: tech.find(s) or tech.avoid(s) or deviation_filter_saved(s) else: t.options["deviation_filter"] = lambda s: tech.find(s) or tech.avoid(s) break
for t in self._techniques: ifisinstance(t,Veritesting): if deviation_filter_saved isNone: del t.options["deviation_filter"] else: t.options["deviation_filter"] = deviation_filter_saved break
defrun(self, stash='active', n=None, until=None, **kwargs): """ Run until the SimulationManager has reached a completed state, according to the current exploration techniques. If no exploration techniques that define a completion state are being used, run until there is nothing left to run. :param stash: Operate on this stash :param n: Step at most this many times :param until: If provided, should be a function that takes a SimulationManager and returns True or False. Stepping will terminate when it is True. :return: The simulation manager, for chaining. :rtype: SimulationManager """ for _ in (itertools.count() if n isNoneelserange(0, n)): ifnot self.complete() and self._stashes[stash]: self.step(stash=stash, **kwargs) ifnot (until and until(self)): continue break return self
defstep(self, stash='active', n=None, selector_func=None, step_func=None, successor_func=None, until=None, filter_func=None, **run_args): """ Step a stash of states forward and categorize the successors appropriately. The parameters to this function allow you to control everything about the stepping and categorization process. :param stash: The name of the stash to step (default: 'active') :param selector_func: If provided, should be a function that takes a state and returns a boolean. If True, the state will be stepped. Otherwise, it will be kept as-is. :param step_func: If provided, should be a function that takes a SimulationManager and returns a SimulationManager. Will be called with the SimulationManager at every step. Note that this function should not actually perform any stepping - it is meant to be a maintenance function called after each step. :param successor_func: If provided, should be a function that takes a state and return its successors. Otherwise, project.factory.successors will be used. :param filter_func: If provided, should be a function that takes a state and return the name of the stash, to which the state should be moved. :param until: (DEPRECATED) If provided, should be a function that takes a SimulationManager and returns True or False. Stepping will terminate when it is True. :param n: (DEPRECATED) The number of times to step (default: 1 if "until" is not provided) Additionally, you can pass in any of the following keyword args for project.factory.successors: :param jumpkind: The jumpkind of the previous exit :param addr: An address to execute at instead of the state's ip. :param stmt_whitelist: A list of stmt indexes to which to confine execution. :param last_stmt: A statement index at which to stop execution. :param thumb: Whether the block should be lifted in ARM's THUMB mode. :param backup_state: A state to read bytes from instead of using project memory. :param opt_level: The VEX optimization level to use. :param insn_bytes: A string of bytes to use for the block instead of the project. :param size: The maximum size of the block, in bytes. :param num_inst: The maximum number of instructions. :param traceflags: traceflags to be passed to VEX. Default: 0 :returns: The simulation manager, for chaining. :rtype: SimulationManager """ l.info("Stepping %s of %s", stash, self) # 8<----------------- Compatibility layer ----------------- if n isnotNoneor until isnotNone: if once('simgr_step_n_until'): print("\x1b[31;1mDeprecation warning: the use of `n` and `until` arguments is deprecated. " "Consider using simgr.run() with the same arguments if you want to specify " "a number of steps or an additional condition on when to stop the execution.\x1b[0m") return self.run(stash, n, until, selector_func=selector_func, step_func=step_func, successor_func=successor_func, filter_func=filter_func, **run_args) # ------------------ Compatibility layer ---------------->8 bucket = defaultdict(list)
for state in self._fetch_states(stash=stash):
goto = self.filter(state, filter_func=filter_func) ifisinstance(goto, tuple): goto, state = goto
if goto notin (None, stash): bucket[goto].append(state) continue
successors = self.step_state(state, successor_func=successor_func, **run_args) # handle degenerate stepping cases here. desired behavior: # if a step produced only unsat states, always add them to the unsat stash since this usually indicates a bug # if a step produced sat states and save_unsat is False, drop the unsats # if a step produced no successors, period, add the original state to deadended
# first check if anything happened besides unsat. that gates all this behavior ifnotany(v for k, v in successors.items() if k != 'unsat') andlen(self._errored) == pre_errored: # then check if there were some unsats if successors.get('unsat', []): # only unsats. current setup is acceptable. pass else: # no unsats. we've deadended. bucket['deadended'].append(state) continue else: # there were sat states. it's okay to drop the unsat ones if the user said so. ifnot self._save_unsat: successors.pop('unsat', None)
for to_stash, successor_states in successors.items(): bucket[to_stash or stash].extend(successor_states)
self._clear_states(stash=stash) for to_stash, states in bucket.items(): self._store_states(to_stash or stash, states)
if step_func isnotNone: return step_func(self) return self
defstep_state(self, state, successor_func=None, **run_args): """ Don't use this function manually - it is meant to interface with exploration techniques. """ try: successors = self.successors(state, successor_func=successor_func, **run_args) stashes = {None: successors.flat_successors, 'unsat': successors.unsat_successors, 'unconstrained': successors.unconstrained_successors} except: ... return stashes
由于step_state函数可能会发生很多错误,因此后续的代码是去做后继节点错误状态的处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
for state in self._fetch_states(stash=stash): ... #如果有后继节点有任何一个unsat状态或者发生了新的错误 ifnotany(v for k, v in successors.items() if k != 'unsat') andlen(self._errored) == pre_errored: #对于unsat状态,就先不管他 if successors.get('unsat', []): # only unsats. current setup is acceptable. pass else: #如果不是unsat,那说明遇到某些原因终止了,把该状态加到deadended的stash里去。 bucket['deadended'].append(state) continue else: # 如果没有设置保留unsat状态,就把后继节点的unsat状态丢出去。 ifnot self._save_unsat: successors.pop('unsat', None)
for state in self._fetch_states(stash=stash): ... for to_stash, successor_states in successors.items(): bucket[to_stash or stash].extend(successor_states)