Source code for synbioweaver.core

from abc import ABCMeta, abstractmethod
import inspect
import warnings
import re
import types


[docs]class ExecutionNode(object): """Abstract Superclass for all nodes in an execution flow, e.g. parts, molecules Parts are the atomistic instructions in the genetic circuit execution flow | *Attributes:* | before : [ExecutionNode] | A list of ExecutionNode objects that precede this node | after : [ExeuctionNode] | A list of ExecutionNode objects that follow this node | scope : class(WeaverOutput) | What woven system is this node part of | additionStack : [Circuit or Aspect] | stack of Aspects and Circuits which are executing to add | this node """ __metaclass__ = ABCMeta def __init__(self): self.before = [] self.after = [] self.scope = None self.additionStack = None;
[docs] def getBeforeNodes(self, filterType=object): """Returns all nodes immediately before this node in the execution flow, with the option to filter by node type | *Args:* | filterType: Filters the list to only include nodes of a given (super)type """ result = [] for executionNode in self.before: if isinstance(executionNode, filterType): result.append(executionNode) return result
[docs] def getAfterNodes(self, filterType): """Returns all nodes immediately after this node in the execution flow, with the option to filter by node type | *Args:* | filterType: Filters the list to only include nodes of a given (super)type """ result = [] for executionNode in self.after: if isinstance(executionNode, filterType): result.append(executionNode) return result
# Parts are always used as objects
[docs]class Part(ExecutionNode): """Abstract Superclass for all Parts Parts are the atomistic instructions in the genetic circuit execution flow, e.g. Promoters, RBSs, Terminators, Coding Regions They inherit their functionality as nodes in the execution flow from their superclass ExecutionNode """ __metaclass__ = ABCMeta def __init__(self): super(Part, self).__init__() self.precompileMoleculesBefore = [] self.precompileMoleculesAfter = []
[docs] def setBeforePart(self, part): """Inserts a part before this part in the execution flow. If there's already a part in front of this part, the part to be inserted will be inserted between the two parts presently connected | *Args:* | part: the Part to be inserted """ oldPart = self.getBeforePart() if oldPart != None: self.before.remove(oldPart) self.before.append(part)
[docs] def setAfterPart(self, part): """Inserts a part after this part in the execution flow. If there's already a part behind this part, the part to be inserted will be inserted between the two parts presently connected | *Args:* | part: the Part to be inserted """ oldPart = self.getAfterPart() if oldPart != None: self.after.remove(oldPart) self.after.append(part)
[docs] def getBeforePart(self): """Returns the part just before this part in the execution flow """ for executionNode in self.before: if isinstance(executionNode, Part): return executionNode return None
[docs] def getAfterPart(self): """Returns the part just after this part in the execution flow """ for executionNode in self.after: if isinstance(executionNode, Part): return executionNode return None
[docs] def weave(self, weaver): """Internal method used during the weaving process: Facilitates molecule scope checking """ for beforeMol in self.precompileMoleculesBefore: mol = weaver.getMoleculeObject(self.scope, beforeMol) self.before.append(mol) mol.after.append(self) for afterMol in self.precompileMoleculesAfter: mol = weaver.getMoleculeObject(self.scope, afterMol, True) self.after.append(mol) mol.before.append(self)
def __str__(self): return self.__class__.__name__
[docs]class Molecule(ExecutionNode): """Abstract Superclass for all Molecules Molecules are the nodes in the execution flow, for example: reactions: (MoleculeA+MoleculeB -> MoleculeC), translation: CodingRegion->MoleculeA, inducing: MoleculeA -> RegulatdPromoter, membrane transfer: MoleculeA (Compartment) -> MoleculeA (Outside Compartment) These nodes in the execution flow don't mean an existence of an element, but rather describe the information flow within the system. """ def __str__(self): return self.__class__.__name__
[docs]class Protein(Molecule): """Class for Molecules that are Proteins""" pass
[docs]class Promoter(Part): """Class for Parts that are Promoters""" pass
[docs]class ConstitutivePromoter(Promoter): """Class for Promoters that are Constitutive Promoters""" pass
[docs]def checkIfTypeReturnInstance(possibleType): """if the parameter is a type, try to return an instance | *Args:* | possibleType - a parameter which may be an instance or a type | *Raises:* | PartInitializationError - if the parameter is a type which can not be constructed | *Returns:* | An instance of the type of possibleType """ if isinstance(possibleType, type): try: return possibleType() except: raise PartInitializationError(str(possibleType) + " can not be initialized without Parameters.") return possibleType
[docs]def checkAndSetMolecule(molecule): """checks if parameter is a class of type Molecule and returns it | *Args:* | molecule - A potential Molecule | *Raises:* | MoleculeValue - If molecule is not a class of type Molecule | *Returns:* | A class of type Molecule """ if inspect.isclass(molecule) and issubclass(molecule, Molecule): return molecule else: raise MoleculeValueError("molecule must be a class of (sub)type Molecule")
[docs]class NegativePromoter(Promoter): """Class for Promoters that are Negative Promoters""" def __init__(self, regulatedBy): """NegativePromoter Constructor | *Args:* | regulatedBy: sets moleculeConnection """ super(NegativePromoter, self).__init__() self.precompileMoleculesBefore.append(checkAndSetMolecule(regulatedBy))
[docs] def getRegulatedBy(self): """Returns a list of Molecules regulating this Promoter""" result = self.getBeforeNodes(Molecule) if len(result) > 0: return result # else: not compiled yet return self.precompileMoleculesBefore
def __str__(self): tmpstr = super(NegativePromoter, self).__str__() + '(regulatedBy = ' tmpstr += 'm'.join(str(self.getBeforeNodes(Molecule)[i]) for i in range(len(self.getBeforeNodes(Molecule)))) tmpstr += ')' return tmpstr
[docs]class PositivePromoter(Promoter): """Class for Promoters that are Positive Promoters""" def __init__(self, regulatedBy): """PositivePromoter Constructor | *Args:* | regulatedBy: sets moleculeConnection """ super(PositivePromoter, self).__init__() self.precompileMoleculesBefore.append(checkAndSetMolecule(regulatedBy))
[docs] def getRegulatedBy(self): """Returns a list of Molecules regulating this Promoter""" result = self.getBeforeNodes(Molecule) if len(result) > 0: return result # else: not compiled yet return self.precompileMoleculesBefore
def __str__(self): tmpstr = super(PositivePromoter, self).__str__() + '(regulatedBy = ' tmpstr += ','.join(str(self.getBeforeNodes(Molecule)[i]) for i in range(len(self.getBeforeNodes(Molecule)))) tmpstr += ')' return tmpstr
[docs]class HybridPromoter(Promoter): """Class for Promoters that are Hybrid Promoters, i.e. with several repressing / inducing operator sites like other regulated Promoters (NegativePromoter, PostiivePromoter), its regulators are accessible by getRegulatedBy(). Additionally, getInducers(), getRepressors(), and isInducer(molecule), isRepressor(molecule) give information about the functionality of the Promoter.""" def __init__(self, regulatedBy, regulatorInfo): """PositivePromoter Constructor | *Args:* | regulatedBy: [] list of regulator Molecules | regulatorInfo: Dictionary of the form {Molecule: Boolean}, where True: Induce, False: Repress """ cleanRegulatorInfo = {} if not type(regulatorInfo) is dict: raise ValueError("regulatorInfo must be a dictionary in the form of {Molecule: Boolean}") super(HybridPromoter, self).__init__() for mol in regulatedBy: self.precompileMoleculesBefore.append(checkAndSetMolecule(mol)) # todo try catch / raise exception if mol not in regulatorInfo cleanRegulatorInfo[mol] = regulatorInfo[mol] self.regulatorInfo = cleanRegulatorInfo def __str__(self): def printRegulatorInfo(mol): if self.regulatorInfo[mol.__class__] is True: return 'inducer:' else: return 'repressor:' tmpstr = super(HybridPromoter, self).__str__() + '(regulatedBy = ' tmpstr += ','.join( printRegulatorInfo(self.getBeforeNodes(Molecule)[i]) + str(self.getBeforeNodes(Molecule)[i]) for i in range(len(self.getBeforeNodes(Molecule)))) tmpstr += ')' return tmpstr
[docs] def getRegulatedBy(self): """Returns a list of Molecules regulating this Promoter""" result = self.getBeforeNodes(Molecule) if len(result) > 0: return result # else: not compiled yet return self.precompileMoleculesBefore
[docs] def getInducers(self): """Returns a list of positively regulating Molecules """ regulators = self.getRegulatedBy() ret = [] for mol in regulators: if self.regulatorInfo[mol.__class__] is True: ret.append(mol) return ret
[docs] def getRepressors(self): """Returns a list of negatively regulating Molecules """ regulators = self.getRegulatedBy() ret = [] for mol in regulators: if self.regulatorInfo[mol.__class__] is False: ret.append(mol) return ret
[docs] def isInducer(self, molecule): if inspect.isclass(molecule): if self.regulatorInfo[molecule] is True: return True elif self.regulatorInfo[molecule.__class__] is True: return True
[docs] def isRepressor(self, molecule): if inspect.isclass(molecule): if self.regulatorInfo[molecule] is False: return True elif self.regulatorInfo[molecule.__class__] is False: return True
[docs]class CodingRegion(Part): """Class for parts that are Coding Regions""" def __init__(self, codesFor): """Constructor for a Coding Region Part | *Args:* | codesFor: sets moleculeConnection """ super(CodingRegion, self).__init__() self.precompileMoleculesAfter.append(checkAndSetMolecule(codesFor)) # todo : is this necessary? # def __del__(self): # self.codesFor.before.remove(self)
[docs] def getCodingFor(self): result = self.getAfterNodes(Molecule) if len(result) > 0: return result #else: not combiled yet return self.precompileMoleculesAfter
def __str__(self): tmpstr = super(CodingRegion, self).__str__() + '(codesFor = ' tmpstr += 'm'.join(str(self.getAfterNodes(Molecule)[i]) for i in range(len(self.getAfterNodes(Molecule)))) tmpstr += ')' return tmpstr
[docs]class RBS(Part): """Class for parts that are Ribosome BindingSites""" def __init__(self): super(RBS,self).__init__()
[docs]class Terminator(Part): """Class for parts that are Terminators""" def __init__(self): super(Terminator,self).__init__()
[docs]class Circuit(Part): """Abstract class for a genetic part circuit. | Will generally be used to represent a design's core concerns | Additionally, a circuit can be used to represent composite parts | *Attributes:* | weaver: Weaver | The weaver which compiles the circuit """ __metaclass__ = ABCMeta def __init__(self): super(Circuit,self).__init__() self.weaver = None
[docs] @abstractmethod def mainCircuit(self): """Entry point for a circuit, analogous to "main" in a program | Needs to be implemented by any sub class. | mainCircuit will be called by the AOSB Weaver.""" pass
[docs] def importMolecule(self, molecule): """Import a particular molecule from the outer compartment to this compartment | *Args:* | molecule """ self.weaver.importMolecule(self, molecule)
[docs] def exportMolecule(self, molecule): """Export a particular molecule from this compartment to the outer compartment | *Args:* | molecule """ self.weaver.exportMolecule(self, molecule)
[docs] def createMolecule(self, molecule): """Declare that a particular molecule exists in the current scope | *Args:* | molecule """ self.weaver.createMolecule(self, molecule)
[docs] def addCircuit(self, circuit): """Add a circuit as a sub-compartment in the current compartment | *Args:* | circuit: Circuit to be added as a sub-compartment """ self.weaver.addCircuit(self, circuit)
[docs] def addPart(self, part): """Used to add parts to the circuit by passing them on to the AOSB Weaver | *Args:* | part: The part to be added to the circuit """ self.weaver.addPart(self, part)
[docs] def reactionFrom(self, *molecules): """Used to add a reaction to the circuit by passing the reactions left side on to the AOSB Weaver | *Args:* | *molecules: a list of one or more molecules on the lhs of the reaction """ return self.weaver.reactionFrom(self, molecules)
[docs] def reactionTo(self, *molecules): """Used to add a reaction to the circuit by passing the reactions right side on to the AOSB Weaver | *Args:* | *molecules: a list of one or more molecules on the rhs of the reaction """ return self.weaver.reactionTo(self, molecules)
[docs] def setWeaver(self, weaver): """Internal - Should not be used outside of the framework. Sets the Weaver Object of this Circuit | *Args:* | weaver: A weaver object that will be used """ self.weaver = weaver
[docs]def declareNewPart(classname, parent=Part, moleculesBefore=[], moleculesAfter=[], regulatorInfoMap={}): ''' Returns a new Part type and exports it to the caller's namespace | *Args* | classname : string | The name for the new type | parent : Part | super class for the new type | moleculesBefore : [Molecule] | optional, if the new Part type should have Molecule node(s) before it, e.g. regulators of a Promoter | moleculesAfter : [Molecule] | optional, if the new Part type should have Molecule node(s) after it, e.g. Proteins created by CodingRegions | regulatorInfoMap : Dictionary | optional, if a parent part needs additional regulator information. | Necessary for HybridPromoters, who need a map in the form of {Molecule: Boolean} | *Returns* | The new Part type | *Raises* | PartValueError -If the parent is not a Part | InvalidSymbolNameError - If classname is not a valid name for a symbol | *Warnings* | SymbolExistsWarning - If classname already exists in the namespace''' if not issubclass(parent, Part): raise PartValueError("parent must be of type Part") validnameregex = re.compile('[a-zA-Z_][a-zA-Z0-9_]*') if not isinstance(classname, str) or not validnameregex.match(classname) or not validnameregex.match( classname).span() == (0, len(classname)): raise InvalidSymbolNameError('name is not a valid symbold name') if not isinstance(moleculesBefore, list): raise ValueError("moleculesBefore must be of type list") if not isinstance(moleculesAfter, list): raise ValueError("moleculesAfter must be of type list") for mol in moleculesBefore: checkAndSetMolecule(mol) for mol in moleculesAfter: checkAndSetMolecule(mol) basestuple = parent, result = None currentframe = inspect.currentframe() # Create warning if name already exists if currentframe.f_back.f_globals.has_key(classname): line = currentframe.f_back.f_lineno warnings.warn( "Line " + str(line) + ": Part " + classname + " already defined. Existing definition will be used", SymbolExistsWarning, 2) # 2 = one stack level above this result = currentframe.f_back.f_globals.get(classname) else: result = type(classname, basestuple, {}) def newTypeInit(self, recursion=0, moleculesBeforeAggregate=[], moleculesAfterAggregate=[], regulatorInfoMapAggregate={}): moleculesBeforeAggregate += moleculesBefore moleculesAfterAggregate += moleculesAfter regulatorInfoMapAggregate = dict(regulatorInfoMapAggregate.items() + regulatorInfoMap.items()) realself = self.__class__ for i in range(0, recursion): realself = realself.__bases__[0] try: super(realself, self).__init__(recursion + 1, moleculesBeforeAggregate, moleculesAfterAggregate) except: # we'll get an exeption if we've hit a core implemented Part type, # because of the parameter incompatibility if realself.__bases__[0] is PositivePromoter: super(realself, self).__init__(moleculesBeforeAggregate[0]) # at moment: only allow one regulator, others are lost # todo: allow multiple regulators for pos and neg promoters? elif realself.__bases__[0] is NegativePromoter: super(realself, self).__init__(moleculesBeforeAggregate[0]) # at moment: only allow one regulator, others are lost # todo: allow multiple regulators for pos and neg promoters? elif realself.__bases__[0] is ConstitutivePromoter: super(realself, self).__init__() # at moment: only allow one regulator, others are lost # todo: allow multiple regulators for pos and neg promoters? elif realself.__bases__[0] is CodingRegion: super(realself, self).__init__(moleculesAfterAggregate[0]) # at moment: only allow one output molecule, others are lost # todo: allow multiple? elif realself.__bases__[0] is HybridPromoter: super(realself, self).__init__(moleculesBeforeAggregate, regulatorInfoMapAggregate) # at moment: only allow one output molecule, others are lost # todo: allow multiple? elif realself.__bases__[0] is RBS: super(realself, self).__init__() # in this case, we'll just add both to follow what the users wants self.precompileMoleculesBefore += moleculesBeforeAggregate self.precompileMoleculesAfter += moleculesAfterAggregate elif realself.__bases__[0] is Terminator: super(realself, self).__init__() # in this case, we'll just add both to follow what the users wants self.precompileMoleculesBefore += moleculesBeforeAggregate self.precompileMoleculesAfter += moleculesAfterAggregate elif realself.__bases__[0] is Part: super(realself, self).__init__() # in this case, we'll just add both to follow what the users wants self.precompileMoleculesBefore += moleculesBeforeAggregate self.precompileMoleculesAfter += moleculesAfterAggregate # this means that parent does not take a moleculeConnection parameter, #but the new subtype should have one. result = type(classname, basestuple, {'__init__': newTypeInit}) currentframe.f_back.f_globals[classname] = result return result
[docs]def declareNewMolecule(classname, *parents): """Returns a new Molecule type and exports it to the caller's namespace | *Args:* | classname: The name for the new type | \*parents: 0 or more Molecule super classes | *Returns:* | The new Part type | *Raises:* | MoleculeValueError: If any parent is not a Molecule | InvalidSymbolNameError: If classname is not a valid name for a symbol | *Warnings:* | SymbolExistsWarning: If classname already exists in the namespace """ validnameregex = re.compile('[a-zA-Z_][a-zA-Z0-9_]*') if not isinstance(classname, str) or not validnameregex.match(classname) or not validnameregex.match( classname).span() == (0, len(classname)): raise InvalidSymbolNameError('name is not a valid symbold name') if len(parents) == 0: parents = Molecule, else: for parent in parents: if not issubclass(parent, Molecule): raise MoleculeValueError("All parents must be of type Molecule") currentframe = inspect.currentframe() result = None # Check if classname already exists if currentframe.f_back.f_globals.has_key(classname): line = currentframe.f_back.f_lineno warnings.warn( "Line " + str(line) + ": Molecule " + classname + " already defined. Existing definition will be used", SymbolExistsWarning, 2) # 2 = one stack level above this result = currentframe.f_back.f_globals.get(classname) else: result = type(classname, parents, {}) currentframe.f_back.f_globals[classname] = result return result
[docs]class PointCutExpressionNode(object): """Abstract superclass for all Nodes in a Point Cut Expression Tree | A complex expression for a Point Cut, using operators such as & (and), | \| (or) or % (concatenation) is represented as a tree of nodes """ __metaclass__ = ABCMeta def __mod__(self, other): """% - concatenates two PointCutExpressionNodes | *Args:* | other - the node on the right of self | *Returns:* | a new PointCutExpressionConcatenate node """ return PointCutExpressionConcatenate(self, other) def __and__(self, other): """& - boolean and evaluation of two PointCutExpressionNodes | *Args:* | other - the node on the right of self | *Returns:* | a new PointCutExpressionNodeAnd node """ return PointCutExpressionAnd(self, other) def __or__(self, other): """| - boolean or evaluation of two PointCutExpressionNodes | *Args:* | other - the node on the right of self | *Returns:* | a new PointCutExpressionOr node """ return PointCutExpressionOr(self, other)
[docs] def numberOfMatchingParts(self, part): """returns the number of parts the expression matches | If an expression uses concatenation, then it might match the current part and a number of preceding parts. | *Args:* | part - The part at which matching starts | *Returns:* | integer - how many parts were matched """ if self.match(part): return 1 else: return 0
[docs] @abstractmethod def match(self, part): """Does a part match this (sub)-expression? An abstract method, must be implemented by each child | *Args:* | part - The part to be matched | *Returns:* | boolean - Whether or not the part was matched """ pass
[docs]class PointCutExpressionOperator(PointCutExpressionNode): """Abstract superclass of an PointCutExpression Node which is an operator | *Attributes:* | left - The first child of the operator | right - The second child of the operator """ __metaclass__ = ABCMeta left = None right = None def __init__(self, left, right): """Constructs a new PointCutExpresionOperator | *Args:* | left - Sets the left child node | right - Sets the right child node | *Raises:* | InvalidPointCutExpressionError - | If either child is not an instance of PointCutExpressionNode """ if isinstance(left, PointCutExpressionNode) and isinstance(right, PointCutExpressionNode): self.left = left self.right = right else: raise InvalidPointCutExpressionError("Invalid type used in a PointCut formula.")
[docs] def expressionUses(self, nodeType): """Confirms if a certain type of node is used in the expression | *Args:* | nodeType: The type of the node whose existence is to be confirmed | *Returns:* | boolean - Whether or not the node exists in the formula """ if isinstance(self, nodeType) or isinstance(self.left, nodeType) or isinstance(self.right, nodeType): return True else: leftresult = False rightresult = False try: leftresult = self.left.expressionUses(nodeType) except: pass try: rightresult = self.right.expressionUses(nodeType) except: pass return leftresult or rightresult
[docs]class PointCutExpressionNot(PointCutExpressionOperator): """A PointCutExpressionOperator which is a Not A special case, only uses one child, acts as the inverse operator """ def __init__(self, pointcutexpression): """Constructs a new PointCutExprresionNot | *Args:* | pointcutexpression - The expression to be negated """ self.right = pointcutexpression
[docs] def match(self, part, within=None): """see PointCutExpressionNode definition""" return not self.right.match(part, within)
[docs]class PointCutExpressionOr(PointCutExpressionOperator): """A PointCutExpressionOperator which is an Or"""
[docs] def match(self, part, within=None): """see PointCutExpressionNode definition""" return self.left.match(part) or self.right.match(part)
[docs]class PointCutExpressionAnd(PointCutExpressionOperator): """A PointCutExpressionOperator which is an And"""
[docs] def match(self, part): """see PointCutExpressionNode definition""" return self.left.match(part) and self.right.match(part)
[docs]class PointCutExpressionConcatenate(PointCutExpressionOperator): """A PointCutExpressionOperator which is an Concatenation"""
[docs] def numberOfMatchingParts(self, part): """see PointCutExpressionNode definition This child overrides it, since a concatenation operator is a node in the expression at which more than one part can be matched. """ if self.match(part): return self.left.numberOfMatchingParts(part.getBeforePart()) + self.right.numberOfMatchingParts(part) else: return 0
[docs] def match(self, part): """see PointCutExpressionNode definition""" try: return self.left.match(part.getBeforePart()) and self.right.match(part) except: return False
[docs]def checkBaseClassesMatch(bases, typename): """Recursively check if the name of the types in bases (or parents) are equal to typename | *Args:* | bases - A tuple of types | typename : str - A name of a type | *Returns:* | True if any of the names of types in bases or any of their parent bases equals typename, | False otherwise """ for baseClass in bases: if (typename == baseClass.__name__): return True else: if checkBaseClassesMatch(baseClass.__bases__, typename): return True
[docs]class PartSignatureElement(object): """A building block of a Part Signature | *Attributes:* | qualifier : ANY / SUBCLASS / CLASSONLY | Whether the element should match precisely, all subclasses or uses a wildcard | element : str - The string of the element (without a qualifier) | inverse : boolean - Whether the element has been negated """ ANY = 1 SUBCLASS = 2 CLASSONLY = 3 def __init__(self, signature): """Constructs a new PartSignatureElement | Analyzes signature and sets internal attributes | *Args:* | signature - the part of the Part signature for this element """ self.inverse = False self.qualifier = None self.element = '' if signature[0] == '!': self.inverse = True signature = signature[1:] if signature[-1] == '*': self.qualifier = PartSignatureElement.ANY self.element = signature[:-1] elif signature[-1] == '+': self.qualifier = PartSignatureElement.SUBCLASS self.element = signature[:-1] else: self.qualifier = PartSignatureElement.CLASSONLY self.element = signature def __str__(self): qualifier = '' if self.qualifier == self.ANY: qualifier = '*' elif self.qualifier == self.SUBCLASS: qualifier = '+' exclamation = '' if self.inverse: exclamation = '!' return exclamation + self.element + qualifier
[docs] def match(self, obj): """see PointCutExpressionNode definition""" if self.inverse: return not self.__match(obj) else: return self.__match(obj)
def __match(self, obj): """internal match method, matching without inverse""" objectName = obj.__class__.__name__ if (isinstance(obj, str)): objectName = obj if (inspect.isclass(obj)): objectName = obj.__name__ if ((self.qualifier == self.CLASSONLY) or (self.qualifier == self.SUBCLASS)) and (objectName == self.element): return True if (self.qualifier == self.ANY) and (objectName.startswith(self.element)): return True if (self.qualifier == self.SUBCLASS): if (inspect.isclass(obj)): return checkBaseClassesMatch(obj.__bases__, self.element) else: return checkBaseClassesMatch(obj.__class__.__bases__, self.element) return False
[docs]class MoleculeSignatureElement(PartSignatureElement): """A PartSignatureElement that is a Molecule Signature | Overloads some methods, since MoleculeSignature is used by the user for | Molecule Type Advice - unlike PartSignatureElement, which is internal """ def __init__(self, signature): """Constructs a new MoleculeSignature | *Args:* | signature : str - The string signature to be cast to a MoleculeSignature | *Raises:* | InvalidSignatureError - If signature does not adhere to the format """ signatureRE = re.compile('!?([a-zA-Z_][a-zA-Z0-9_]*[\+\*]?|\*)') try: if not signatureRE.match(signature).span() == (0, len(signature)): raise InvalidSignatureError() except: raise InvalidSignatureError() return super(MoleculeSignatureElement, self).__init__(signature)
[docs] def match(self, obj): """see PointCutExpressionNode definition""" if self.qualifier == self.ANY and obj == None: if self.inverse: return False return True return super(MoleculeSignatureElement, self).match(obj)
[docs]class MoleculeSignature(): """A MoleculeSignature, used for Type Advice of Molecules | *Attributes:* | namespace : [PartSignatureElement] - List of signature parts of the signature | molecule : MoleculeSignatureElement - The "part" part of the signature """ def __init__(self, signature): """Constructs a new PartSignature | Analyzes signature and sets internal attributes | *Args:* | signature : str - The string signature to be cast to a PartSignature | *Raises:* | InvalidSignatureError - If signature does not adhere to the format """ self.namespace = [] self.molecule = None # should be of format: Circuit.Part(Molecule)" (Molecule) is optional# typeErrorMessage = "signature must be of type String and adhere to PointCut / PartSignature Format" if not isinstance(signature, str): raise InvalidSignatureError(typeErrorMessage) # using a regular expression to make sure the signature is of a valid format signatureRE = re.compile( '(!?(([a-zA-Z_][a-zA-Z0-9_]*[\+\*]?)|\*)\.)+!?(([a-zA-Z_][a-zA-Z0-9_]*[\+\*]?)|\*)') # regular expression has to match entire length of string try: if not signatureRE.match(signature).span() == (0, len(signature)): raise InvalidSignatureError(typeErrorMessage) except: raise InvalidSignatureError(typeErrorMessage) # split signature into components signaturePartRE = re.compile('!?[a-zA-Z_][a-zA-Z0-9_]*[\+\*]?|\*') splitSignature = signaturePartRE.findall(signature) numOfCircuitElements = len(splitSignature) - 1; self.molecule = MoleculeSignatureElement(splitSignature[numOfCircuitElements]) # #Circuit Part # self.namespace = PartSignatureElement(splitSignature[0]) # Part ... Part for i in range(0, numOfCircuitElements): self.namespace.append(PartSignatureElement(splitSignature[i])) def __str__(self): firsthalf = '.'.join(str(self.namespace[i]) for i in range(len(self.namespace))) firsthalf = firsthalf + '.' + str(self.molecule)
[docs] def matchNamespaces(self, namespace, scope): for name in reversed(namespace): if name.match(scope.circuitName): scope = scope.parentCircuit else: return False # todo edge case, i.e. if need to check * is last, or no match? return True
[docs] def match(self, molecule): """see PointCutExpressionNode definition""" if not isinstance(molecule, Molecule): raise MoleculeValueError("Molecule to match must be instance of type Molecule") result = self.molecule.match(molecule) and self.matchNamespaces(self.namespace, molecule.scope) return result
[docs]class NotOnStack(PointCutExpressionNode): def __init__(self, signature): self.name = signature # todo all the error checking...
[docs] def match(self, part): """see PointCutExpressionNode definition""" # todo matching here should also support + and * for withinObject in part.additionStack: if withinObject.__class__.__name__ == self.name: return False return True
[docs]class OnStack(PointCutExpressionNode): def __init__(self, signature): self.name = signature # todo all the error checking...
[docs] def match(self, part): """see PointCutExpressionNode definition""" for withinObject in part.additionStack: if withinObject.__class__.__name__ == self.name: return True return False
[docs]class PartSignature(PointCutExpressionNode): """A PartSignature, used in PointCut expressions or directly for Type Advice | *Attributes:* | namespace : [PartSignatureElement] - List of signature parts of the signature | part : PartSignatureElement - The "part" part of the signature | molcule : PartSignatureElement - The molecule part of the signature | nomolecule : Boolean - | If the PartSignature explicitly should not match parts with molecules """ def __init__(self, signature): """Constructs a new PartSignature | Analyzes signature and sets internal attributes | *Args:* | signature : str - The string signature to be cast to a PartSignature | *Raises:* | InvalidSignatureError - If signature does not adhere to the format """ self.namespace = [] self.part = None self.molecule = None self.nomolecule = False # should be of format: Circuit.Part(Molecule)" (Molecule) is optional# typeErrorMessage = "signature must be of type String and adhere to PointCut / PartSignature Format" if not isinstance(signature, str): raise InvalidSignatureError(typeErrorMessage) # using a regular expression to make sure the signature is of a valid format signatureRE = re.compile( '(!?(([a-zA-Z_][a-zA-Z0-9_]*[\+\*]?)|\*)\.)+!?(([a-zA-Z_][a-zA-Z0-9_]*[\+\*]?)|\*)(\(!?(([a-zA-Z_][a-zA-Z0-9_]*[\+\*]?)|\*)?\))?') # regular expression has to match entire length of string try: if not signatureRE.match(signature).span() == (0, len(signature)): raise InvalidSignatureError(typeErrorMessage) except: raise InvalidSignatureError(typeErrorMessage) # split signature into components signaturePartRE = re.compile('!?[a-zA-Z_][a-zA-Z0-9_]*[\+\*]?|\*') splitSignature = signaturePartRE.findall(signature) numOfCircuitElements = len(splitSignature) - 1; signatureMoleculePartRE = re.compile('\(!?(([a-zA-Z_][a-zA-Z0-9_]*[\+\*]?)|\*)?\)') moleculePartMatch = signatureMoleculePartRE.search(signature) if moleculePartMatch != None: moleculePart = moleculePartMatch.group(0) if len(moleculePart) == 2: self.nomolecule = True else: numOfCircuitElements -= 1 self.molecule = MoleculeSignatureElement(moleculePart[1:-1]) self.nomolecule = False else: self.nomolecule = False self.molecule = MoleculeSignatureElement('*') self.part = PartSignatureElement(splitSignature[numOfCircuitElements]) # #Circuit Part # self.namespace = PartSignatureElement(splitSignature[0]) # Part ... Part for i in range(0, numOfCircuitElements): self.namespace.append(PartSignatureElement(splitSignature[i])) def __str__(self): firsthalf = '.'.join(str(self.namespace[i]) for i in range(len(self.namespace))) firsthalf = firsthalf + '.' + str(self.part) if self.nomolecule: return firsthalf + '()' else: return firsthalf + '(' + str(self.molecule) + ')'
[docs] def matchNamespaces(self, namespace, scope): i = len(scope) for name in reversed(namespace): i -= 1 if(i < 0): return True; if not name.match(str(scope[i])): return False # todo edge case, i.e. if need to check * is last, or no match? return True
[docs] def match(self, part): """see PointCutExpressionNode definition""" if not isinstance(part, Part): raise PartValueError("Part to match must be instance of type Part") result = self.part.match(part) and self.matchNamespaces(self.namespace, part.additionStack) # or scope if not self.nomolecule: # I.E. THERE IS A MOLECULE # this needs to be refined / TODO if len(part.precompileMoleculesAfter) == 0 and len(part.precompileMoleculesBefore) == 0: if self.molecule.qualifier == self.molecule.ANY and self.molecule.element == '': result = result and True else: result = result and False for mol in part.precompileMoleculesAfter: result = result and self.molecule.match(mol) for mol in part.precompileMoleculesBefore: result = result and self.molecule.match(mol) if self.nomolecule: if len(part.getBeforeNodes(Molecule)) != 0 or len(part.getAfterNodes(Molecule)) != 0 or \ len(part.precompileMoleculesAfter) != 0 or len(part.precompileMoleculesBefore) != 0: result = result and False # todo this means we could also do more than one molecule, e.g. Promoter(AB || A) or bool and... return result
[docs]class PointCut(object): """A PointCut to select Join Points in the genetic parts execution flow | *Attributes:* | operator : BEFORE / AFTER / REPLACE | - The operator for this PointCut | signature : PartSignature """ BEFORE = 11 AFTER = 22 REPLACE = 33 def __init__(self, signature, operator): """Construct a new PointCut | *Args:* | signature - A PointCutSignature or a string that will be cast to PointCutSignature | operator - The operator """ self.operator = None self.signature = None if isinstance(signature, PointCutExpressionNode): self.signature = signature else: self.signature = PartSignature(signature) self.checkAndSetOperator(operator) def __str__(self): operator = '' if self.operator == self.BEFORE: operator = 'BEFORE' elif self.operator == self.AFTER: operator = 'AFTER' else: operator = 'REPLACE' return 'PointCut(\'' + str(self.signature) + '\',' + operator + ')'
[docs] def match(self, part): """see PointCutExpressionNode definition""" return self.signature.match(part)
[docs] def checkAndSetOperator(self, operator): """Set the internal operator attribute, if the parameter is a valid operator | *Args:* | operator - The operator to be confirmed | *Raises:* | InvalidPointCutOperatorError - if the operator is invalid | (can be dependent on the signature) """ if (operator == self.BEFORE) or (operator == self.AFTER) or (operator == self.REPLACE): self.operator = operator if (operator == self.BEFORE and isinstance(self.signature, PointCutExpressionOperator) and self.signature.expressionUses( PointCutExpressionConcatenate)): raise InvalidPointCutOperatorError( 'PointCut Operator can not be BEFORE if PartSignature uses concatenation.') else: raise InvalidPointCutOperatorError("operator must be of type PointCut.BEFORE / AFTER / REPLACE")
[docs]class PointCutContext(object): """Container for context at a PointCut | *Attributes:* | within - A stack of the circuits / aspects that within which the PointCut was matched | part - The part matched by the PointCut""" def __init__(self, within, part): self.within = within self.part = part
[docs] def isWithin(self, obj): """Checks if the parameter (a circuit or aspect) is on the within stack | *Args:* | obj : Circuit / Aspect - The object that is to be found on the stack | *Returns:* | Boolean - true if obj is on the within stack """ return self.__isWithinRecursive(obj, len(self.within))
def __isWithinRecursive(self, obj, index): if index > 0: return (obj == self.within[index - 1]) or self.__isWithinRecursive(obj, index - 1) return False
[docs]class Advice(object): """Container for Advice | *Attributes:* | precedence : int - High precedence advice have execution priority over low precedence | - MINPRECEDENCE <= precedence <= MAXPRECEDENCE is the valid range | pointcut : PointCut | adviceMethod : method - The method to be executed at the advice | - must have 2 parameters: self and PointCutContext """ MINPRECEDENCE = 0 MAXPRECEDENCE = 100 def __init__(self, pointcut, adviceMethod, precedence=MINPRECEDENCE): """Constructs a new Advice object, setting internal state | *Args:* | pointcut - The PointCut to be set | adviceMethod - The adviceMethod to be set | precedence - The precedence to be set | *Raises:* | InvalidPointCutError | InvalidAdviceMethodError - If argument is not a method or takes wrong | number of parameters (must take self and PointCutContext object) | PrecedenceOutOfRangeError - If precedence > MAXPRECEDENCE or | precedence < MINPRECEDENCE """ self.pointcut = None self.adviceMethod = None self.precedence = self.MINPRECEDENCE if not isinstance(pointcut, PointCut): raise InvalidPointCutError("pointcut must be of type PointCut") self.pointcut = pointcut if not inspect.ismethod(adviceMethod) or len(inspect.getargspec(adviceMethod)[0]) != 2: raise InvalidAdviceMethodError("adviceMethod must be a method with 2 parameters (self, PointCutContext)") self.adviceMethod = adviceMethod if precedence < (self.MINPRECEDENCE) or (precedence > self.MAXPRECEDENCE): raise PrecedenceOutOfRangeError( "Advice Precedence must be between " + str(self.MINPRECEDENCE) + " and " + str(self.MAXPRECEDENCE)) self.precedence = precedence
[docs]class TypeAdvice(object): """Container for TypeAdvice | *Attributes:* | signature : PartSignature or MoleculeSignature | typeaddition : method or attribute to be added to the type | name : The name the new typeaddition should have in the new type | aspect : Aspect which declares this TypeAdvice """ def __init__(self, signature, typeaddition, name, aspect): """Constructs a new TypeAdvice | *Args:* | signature - PartSignature / MoleculeSignature to be set | typeaddition - attribute to be set | name : str - attribute to be set | aspect : Aspect - attribute to be set | *Raises:* | InvalidSignatureError - | If signature not instance of PartSignature or MoleculeSignature | InvalidSymbolNameError - | If name is not a valid name for a symbol """ self.signature = None self.typeaddition = None self.aspect = None self.name = '' if isinstance(signature, PartSignature) or isinstance(signature, MoleculeSignature): self.signature = signature; else: raise InvalidSignatureError( 'Signature parameter for TypeAdvice must be of type PartSignature or MoleculeSignature') self.typeaddition = typeaddition; self.aspect = aspect validnameregex = re.compile('[a-zA-Z_][a-zA-Z0-9_]*') if not isinstance(name, str) or not validnameregex.match(name) or not validnameregex.match(name).span() == ( 0, len(name)): raise InvalidSymbolNameError('name is not a valid symbold name') self.name = name
[docs] def isPartAdvice(self): """Returns True if TypeAdvice is for Part, False otherwise""" return isinstance(self.signature, PartSignature)
[docs] def isMoleculeAdvice(self): """Returns True if TypeAdvice is for Molecule, False otherwise""" return isinstance(self.signature, MoleculeSignature)
[docs]class Aspect(object): """Abstract class for an Aspect | Will generally be used to represent a design's cross-cutting concerns | *Notes:* | Any child must implement mainAspect method | *Attributes* | weaver: Weaver | The weaver which compiles the aspect | adviceList - A list of all advice this aspect declares | typeAdviceList - A list of all type advice this aspect declares | weaverOutputList - A list of all type advice for the WeaverOutput """ __metaclass__ = ABCMeta def __init__(self): self.adviceList = [] self.typeAdviceList = [] self.weaverOutputList = [] self.weaver = None
[docs] def importMolecule(self, molecule): """Import a particular molecule from the outer compartment to this compartment | *Args:* | molecule """ self.weaver.importMolecule(self, molecule)
[docs] def exportMolecule(self, molecule): """Export a particular molecule from this compartment to the outer compartment | *Args:* | molecule """ self.weaver.exportMolecule(self, molecule)
[docs] def createMolecule(self, molecule): """Declare that a particular molecule exists in the current scope | *Args:* | molecule """ self.weaver.createMolecule(self, molecule)
[docs] def addCircuit(self, circuit): """Add a circuit as a sub-compartment in the current compartment | *Args:* | circuit: Circuit to be added as a sub-compartment """ self.weaver.addCircuit(self, circuit)
[docs] def addPart(self, part): """Used to add parts to the design by passing them on to the AOSB Weaver | *Args:* | part: The part to be added to the circuit """ self.weaver.addPart(self, part)
[docs] def reactionFrom(self, *molecules): """Used to add a reaction to the aspect by passing the reactions left side on to the AOSB Weaver | *Args:* | *molecules: a list of one or more molecules on the lhs of the reaction """ return self.weaver.reactionFrom(self, molecules)
[docs] def reactionTo(self, *molecules): """Used to add a reaction to the aspect by passing the reactions right side on to the AOSB Weaver | *Args:* | *molecules: a list of one or more molecules on the rhs of the reaction """ return self.weaver.reactionTo(self, molecules) return self.weaver.reactionTo(self, molecules)
[docs] def setWeaver(self, weaver): """Internal - Should not be used outside of the framework. Sets the Weaver Object of this Circuit | *Args:* | weaver: A weaver object that will be used """ self.weaver = weaver
[docs] def addAdvice(self, pointcut, adviceMethod, precedence=Advice.MINPRECEDENCE): """Declare a new advice in the aspect | *Args:* | pointcut - The pointcut for the advice | adviceMethod - The method to be executed at the pointcut | should be a method bound to this aspect, with second parameter | expecting a PointCutContext object | precedence : integer (optional) - set precedence of advice, | see notes on Advice.precedence | *Notes:* | addAdvice constructs an Advice object. Further information thus | can be found there. """ self.adviceList.append(Advice(pointcut, adviceMethod, precedence))
[docs] def addTypeAdvice(self, signature, typeaddition, name): """Declare a type advice in the aspect | *Args:* | signature - The signature for the type advice | typeaddition - The attribute / method to be added to the type | name: str - The name for the typeaddition in the new type | *Notes:* | addTypeAdvice constructs an TypeAdvice object. | Further information thus can be found there. """ self.typeAdviceList.append(TypeAdvice(signature, typeaddition, name, self))
[docs] def addWeaverOutput(self, outputmethod): """Declare a new weaver output target | *Args:* | outputMethod - The method to be added to the WeaverOutput | *Raises:* | InvalidWeaverOutputMethodError - | If outputmethod is not a method or has wrong number of parameters | (needs to accept self and a WeaverOutput reference) """ if not inspect.ismethod(outputmethod) or len(inspect.getargspec(outputmethod)[0]) != 2: raise InvalidWeaverOutputMethodError("outputmethod must be a method with 2 parameters (self, WeaverOutput)") self.weaverOutputList.append(outputmethod)
[docs] def getAdviceList(self): """Returns adviceList""" return self.adviceList
[docs] def getTypeAdviceList(self): """Returns typeAdviceList""" return self.typeAdviceList
[docs] def getWeaverOutputList(self): """Returns weaverOutputList""" return self.weaverOutputList
[docs] @abstractmethod def mainAspect(self): """Entry point for an aspect, analogous to "main" in a program | Needs to be implemented by any sub class. | mainAspect will be called by the AOSB Weaver.""" pass
def __str__(self): return self.__class__.__name__
[docs]class Weaver(object): """The "compiler" that weaves core concerns (circuits) and cross-cutting concerns (aspects) and creates a woven execution flow of parts | *Attributes:* | partList - The current list of parts | moleculeList - The current list of molecules | beforeAndReplaceAdviceList - List of all before and replace advice to be woven | afterAdviceList - List of all after advice to be woven | partTypeAdviceList - List of all part type advice | moleculeTypeAdviceList - List of all molecule type advice | circuit - The main circuit | aspects - The list of all aspects to be woven | weaverOutput : WeaverOutput - The "compiled" result """
[docs] class WeaverOutput(object): """Container for the woven result of the Weaver | *Attributes:* | circuitName - the name of the circuit that was woven | partList - finished ordered list of parts in the design | moleculeList - list of all molecules in the design | subcircuitList - list of all subcircuits (itself weaver outputs) (??) | """ def __init__(self, circuitName, parentCircuit=None): self.circuitName = circuitName self.partList = [] self.moleculeList = [] self.wovenCircuitList = [] self.parentCircuit = parentCircuit def __str__(self): result = "\n\n" result = result + self.circuitName + ' Circuit' result = result + '\n--------------------------\n' result = result + 'Parts List:\n' + '+'.join( str(self.partList[i]) for i in range(len(self.partList))) if (len(self.partList) == 0): result = result + "None" result = result + "\nCircuit Molecule List: \n" result = result + self.molecules() result = result + "\nCircuit Molecule Reactions: \n" result = result + self.moleculeGraph() if len(self.wovenCircuitList) > 0: result = result + '\nSubCircuits:\n' for i in range(len(self.wovenCircuitList)): result = result + "Sub Circuit " + str(i + 1) + ":\n" result = result + str(self.wovenCircuitList[i]) result = result + '\n####################\n' result = result + "\n\n" return result
[docs] def molecules(self): if len(self.moleculeList) == 0: return "None" else: return ', '.join(str(self.moleculeList[i]) for i in range(len(self.moleculeList)))
[docs] def moleculeGraph(self): result = '' for i in range(len(self.moleculeList)): if (isinstance(self.moleculeList[i], Part) and self.moleculeList[i].scope != self): continue for j in range(len(self.moleculeList[i].before)): if (isinstance(self.moleculeList[i].before[j], Molecule)) or isinstance( self.moleculeList[i].before[j], Part): if (isinstance(self.moleculeList[i].before[j], Part) and (self.moleculeList[i].before[j].scope == self)): result += str(self.moleculeList[i].before[j].scope.circuitName) + '.' + str( self.moleculeList[i].before[j]) \ + '->' + \ str(self.moleculeList[i].scope.circuitName) + '.' + str( self.moleculeList[i]) + '; ' else: result += ','.join(str(self.moleculeList[i].before[j][k].scope.circuitName) + '.' + str( self.moleculeList[i].before[j][k]) for k in range(len(self.moleculeList[i].before[j]))) \ + '->' \ + str(self.moleculeList[i].scope.circuitName) + '.' + str(self.moleculeList[i]) + '; ' for j in range(len(self.moleculeList[i].after)): if ( (isinstance(self.moleculeList[i].after[j], Part) and self.moleculeList[i].after[j].scope != self)): pass else: result += str(self.moleculeList[i].scope.circuitName) + '.' + str(self.moleculeList[i]) \ + '->' + \ str(self.moleculeList[i].after[j].scope.circuitName) + '.' + str( self.moleculeList[i].after[j]) + '; ' return result
def __init__(self, circuit, *aspects): """Sets of the weaver and compiles the design | *Attributes:* | circuit : Circuit - The main circuit to be set | *aspects : Aspect - list of aspects to be set | *Raises:* | CircuitValueError - If circuit is invalid | AspectValueError - If any aspect is invalid""" self.beforeAndReplaceAdviceList = [] self.afterAdviceList = [] self.partTypeAdviceList = [] self.moleculeTypeAdviceList = [] self.circuit = None self.aspects = [] self.withinStack = [] self.weaverOutput = None if not issubclass(circuit, Circuit) and not isinstance(circuit, Circuit): raise CircuitValueError("circuit must be a class or instance of type Circuit.") if inspect.isclass(circuit): try: self.circuit = circuit() except: raise CircuitValueError("Can not initialize Circuit " + circuit + " without parameters") else: self.circuit = circuit self.circuit.setWeaver(self) for aspect in aspects: if not issubclass(aspect, Aspect) and not isinstance(aspect, Aspect): raise AspectValueError("all aspects must be classes or instances of type Aspect.") if inspect.isclass(aspect): try: self.aspects.append(aspect()) except: raise AspectValueError("Can not initialize Aspect " + aspect + " without parameters") else: self.aspects.append(aspect) self.weaverOutput = self.WeaverOutput(self.circuit.__class__.__name__) self.readAspectsConstructAdviceLists() self.sortAdviceList() # weaving is kicked off here self.currentWeaverOutput = self.weaverOutput self.circuit.mainCircuit() self.constructMoleculeListAndAddTypeAdvice()
[docs] def sortAdviceList(self): """Internal - sorts the advice lists by precedence""" def sortKey(advice): return advice.precedence self.beforeAndReplaceAdviceList = sorted(self.beforeAndReplaceAdviceList, key=sortKey, reverse=True) self.afterAdviceList = sorted(self.afterAdviceList, key=sortKey)
[docs] def constructMoleculeListAndAddTypeAdvice(self): pass
''' """Internal - constructs list of Molecules in design and adds their type advice""" # construct moleculeList - this should probably be done during weaving? for part in self.partList: if part.moleculeConnection: if not part.moleculeConnection in self.moleculeList: self.moleculeList.append(part.moleculeConnection) # add type advice to molecules for molecule in self.moleculeList: for typeAdvice in self.moleculeTypeAdviceList: if typeAdvice.signature.match(molecule): setattr(molecule,typeAdvice.name,typeAdvice.typeaddition)'''
[docs] def readAspectsConstructAdviceLists(self): """Internal - initializes aspects and constructs all advice lists""" for aspect in self.aspects: aspect.setWeaver(self) aspect.mainAspect() for advice in aspect.getAdviceList(): if advice.pointcut.operator == PointCut.AFTER: self.afterAdviceList.append(advice) else: self.beforeAndReplaceAdviceList.append(advice) for typeAdvice in aspect.getTypeAdviceList(): if typeAdvice.isPartAdvice(): self.partTypeAdviceList.append(typeAdvice) else: self.moleculeTypeAdviceList.append(typeAdvice) weaverOutput = self.weaverOutput # to make weaverOutput visible to closure scope if len(aspect.getWeaverOutputList()) > 0: # this aspect wants to add a new weaver output target # TODO check if a similarly named output target already exists and raise # useful exception (containing name of clashed method, name of advice) for outputMethod in aspect.getWeaverOutputList(): def outputMethodWrapper(method): def outputMethodWrapperClosure(self): return method(weaverOutput) return outputMethodWrapperClosure setattr(self.weaverOutput, outputMethod.__name__, types.MethodType(outputMethodWrapper(outputMethod), aspect))
# todo clean up, possible split lookup and creation
[docs] def getMoleculeObject(self, scope, moleculeClass, createIt=False): for molecule in scope.moleculeList: if isinstance(molecule, moleculeClass): return molecule # TODO what about subclases? # molecule does not yet exist in the scope # (this means it's not been created or imported from another scope) if createIt == False: raise MoleculeValueError("Molecule " + moleculeClass.__name__ + " does not exist in this scope") else: result = moleculeClass() result.scope = scope result.additionStack = self.withinStack[:] # important: needs to be a deep copy! scope.moleculeList.append(result) self.addElemTypeAdvice(result) return result
[docs] def addElemTypeAdvice(self, elem): """Internal - adds type advice to a part""" list = self.partTypeAdviceList if isinstance(elem, Molecule): list = self.moleculeTypeAdviceList for typeAdvice in list: if typeAdvice.signature.match(elem): # TODO check for name clashes... if inspect.ismethod(typeAdvice.typeaddition): def adviceMethodWrapper(method): def adviceMethodWrapperClosure(self): return method(elem) return adviceMethodWrapperClosure setattr(elem, typeAdvice.name, types.MethodType(adviceMethodWrapper(typeAdvice.typeaddition), typeAdvice.aspect)) else: setattr(elem, typeAdvice.name, typeAdvice.typeaddition)
[docs] def runBeforeAndReplaceAdvice(self, callingObject, part): """Internal - runs Before and Replace Advice of a part | *Args:* | part - The part for which the advice matches | callingObject - the circuit or advice which added the part | *Returns:* | "continue" | True - If part should still be added | False - If part has been replaced and remaining after advice executed """ for advice in self.beforeAndReplaceAdviceList: if len(self.currentWeaverOutput.partList) > 0: part.setBeforePart(self.currentWeaverOutput.partList[-1]) if advice.pointcut.match(part): if (advice.pointcut.operator == PointCut.REPLACE): numberOfMatchingParts = advice.pointcut.signature.numberOfMatchingParts(part) while numberOfMatchingParts > 1: self.currentWeaverOutput.partList.pop() numberOfMatchingParts -= 1 # Advice should return False if not replaced, True if Replaced AdviceResult = advice.adviceMethod(PointCutContext(self.withinStack, part)) self.runAfterAdvice(callingObject, part, advice.precedence) return not AdviceResult advice.adviceMethod(PointCutContext(self.withinStack, part)) return True
[docs] def runAfterAdvice(self, callingObject, part, precedence=Advice.MINPRECEDENCE): """Internal - runs After Advice of a part | *Args:* | part - The part for which the advice matches | callingObject - the circuit or advice which added the part | precedence - Only run advice with a precedence greater or equal this | Used if replacement advice has been executed """ # after advice are ordered by ascending precedence # the one with the lowest priority is executed #if precedenceLevel is set by a replacement advice, we will only #execute those advice with >= precedence for advice in self.afterAdviceList: if (advice.precedence >= precedence) and (advice.pointcut.match(part)): advice.adviceMethod(PointCutContext(self.withinStack, part))
[docs] def addReaction(self, callingObject, fromMoleculeList, toMoleculeList): realFromMoleculeList = [] realToMoleculeList = [] for element in fromMoleculeList: realFromMoleculeList.append(self.getMoleculeObject(self.currentWeaverOutput, element)) # todo needs to be adapted to actually search scope ??? or report error for element in toMoleculeList: realToMoleculeList.append(self.getMoleculeObject(self.currentWeaverOutput, element, True)) # todo each element in fromMoleculeList does not need toMoleculeList in its before... # need to show that here is a gate... # if both are just one molecule, can hook them up if (len(realFromMoleculeList) == 1) and (len(realToMoleculeList) == 1): realFromMoleculeList[0].after.append(realToMoleculeList[0]) realToMoleculeList[0].before.append(realFromMoleculeList[0]) elif (len(realFromMoleculeList) > 1) and (len(realToMoleculeList) == 1): realToMoleculeList[0].before.append(realFromMoleculeList) elif (len(realFromMoleculeList) == 1) and (len(realToMoleculeList) > 1): realFromMoleculeList[0].after.append(realToMoleculeList)
[docs] class MoleculeReactionTo: """Class to represent the right hand side of a molecule reaction """ def __init__(self, molecules): self.molecules = molecules
[docs] class MoleculeReactionFrom: """Class to represent the left hand side of a molecule reaction and enable convenient syntax by overloading the >> (rshift) operator | *Attributes:* | before : [ExecutionNode] | A list of ExecutionNode objects that precede this node | after : [ExeuctionNode] | A list of ExecutionNode objects that follow this node | scope : class(WeaverOutput) | What woven system is this node part of | additionStack : [Circuit or Aspect] | stack of Aspects and Circuits which are executing to add | this node """ def __init__(self, weaver, callingObject, molecules): self.weaver = weaver self.callingObject = callingObject self.fromMolecules = molecules def __rshift__(self, other): # todo check type of other, should be MoleculeReactionTo self.weaver.addReaction(self.callingObject, self.fromMolecules, other.molecules)
[docs] def reactionFrom(self, callingObject, molecules): """Used to add the left hand side of a molecule reaction to the design, called by either a circuit or aspect | *Args:* | callingObject - the circuit or aspect calling this | molecules - the list of molecules on the lhs of the reaction """ return self.MoleculeReactionFrom(self, callingObject, molecules)
[docs] def reactionTo(self, callingObject, molecules): """Used to add the right hand side of a molecule reaction to the design, called by either a circuit or aspect | *Args:* | callingObject - the circuit or aspect calling this | molecules - the list of molecules on the rhs of the reaction """ return self.MoleculeReactionTo(molecules)
[docs] def importMolecule(self, callingObject, molecule): """Indicates that a molecule should be "imported" by the current compartment from the outer compartment | *Args:* | callingObject - the circuit or aspect calling this | molecule - the molecule to be imported """ # means that we are getting a molecule from the next larger scope # todo check that we are not in outest scope # todo rethink - why should getMOlecule... take an object?? it's creating them afterall... should take class name? # todo no? maybe? importedMoleculeParent = self.getMoleculeObject(self.currentWeaverOutput.parentCircuit, molecule) # case 1: molecule already exists in scope and needs to be merged for mol in self.currentWeaverOutput.moleculeList: if isinstance(mol, molecule): mol.before.append(importedMoleculeParent) importedMoleculeParent.after.append(mol) return # otherwise: #case 2: molecule doesn't exist yet self.createMolecule(callingObject, molecule) # now we can add it via case 1: self.importMolecule(callingObject, molecule)
[docs] def exportMolecule(self, callingObject, molecule): """Indicates that a molecule should be "exported" by the current compartment to the outer compartment | *Args:* | callingObject - the circuit or aspect calling this | molecule - the molecule to be exported """ # todo error checking # e.g. if molecule already exists (but possible differnet instances?? warn of clash? # if parent doesn't exist exportMolecule = self.getMoleculeObject(self.currentWeaverOutput, molecule) # case 1: the molecule already exists in the outer scope and needs to be added to its graph for mol in self.currentWeaverOutput.parentCircuit.moleculeList: if isinstance(mol, molecule): mol.before.append(exportMolecule) exportMolecule.after.append(mol) return # otherwise: #case 2: molecule doens't exist yet in outer scope self.getMoleculeObject(self.currentWeaverOutput.parentCircuit, molecule, True) self.exportMolecule(callingObject, molecule)
[docs] def createMolecule(self, callingObject, molecule): """Indicates that a molecule is potentially present in the current compartment, i.e. it does not necessarily need to be created by a Coding Region, by an import, etc. | *Args:* | callingObject - the circuit or aspect calling this | molecule - the molecule to be present """ self.getMoleculeObject(self.currentWeaverOutput, molecule, True)
# todo, if create is set to true, should actually report an error if molecule already exists.
[docs] def addCircuit(self, callingObject, circuit): """Called by circuit or aspect to add a circuit as a sub-compartment in the current compartment | *Args:* | callingObject - the circuit or aspect calling this | circuit - The circuit to be added as a sub-compartment """ # todo check circuit circuitObject = circuit() newCircuit = self.WeaverOutput(circuitObject.__class__.__name__, self.currentWeaverOutput) self.currentWeaverOutput.wovenCircuitList.append(newCircuit) self.currentWeaverOutput = newCircuit # new sub circuit is set up. no we'll weave it circuitObject.setWeaver(self) circuitObject.mainCircuit() self.currentWeaverOutput = self.currentWeaverOutput.parentCircuit
[docs] def addPart(self, callingObject, part): """Called by circuit or aspect to add a part in the execution flow | *Args:* | callingObject - the circuit or aspect calling this | part - The part supposed to be added """ part = checkIfTypeReturnInstance(part) # part.namespace = self.currentWeaverOutput part.scope = self.currentWeaverOutput self.withinStack.append(callingObject) part.additionStack = self.withinStack[:] continueAddingPart = self.runBeforeAndReplaceAdvice(callingObject, part) # before adding the part, add any type advice if continueAddingPart: part.weave(self) # TODO clean this up, the namespace issue... self.addElemTypeAdvice(part) # If the part is a composite, we unpack it here. otherwise we finally add the part if isinstance(part, Circuit) == False: # add before / after information if len(self.currentWeaverOutput.partList) > 0: self.currentWeaverOutput.partList[-1].setAfterPart(part) part.setBeforePart(self.currentWeaverOutput.partList[-1]) self.currentWeaverOutput.partList.append(part) else: # the circuit is not initialized with a weaver part.setWeaver(self) part.mainCircuit() self.runAfterAdvice(callingObject, part) self.withinStack.pop()
[docs] def output(self): """Returns the weaverOutput Element""" return self.weaverOutput
[docs]class MoleculeValueError(ValueError): """Molecule was expected, but something else was given""" pass
[docs]class PartInitializationError(Exception): """Part was expected, but something else was given"""
[docs]class PartValueError(ValueError): """Part was expected, but something else was given"""
[docs]class InvalidSymbolNameError(ValueError): """A symbol name (string) was not correctly formatted""" pass
[docs]class SymbolExistsWarning(UserWarning): """A Part or Molecule declaration uses an existing name""" pass
[docs]class InvalidSignatureError(ValueError): """A signature (PointCut / Part / Molecule) is incorrectly formatted or typed""" pass
[docs]class InvalidPointCutExpressionError(ValueError): """There is an error in a Point Cut expression""" pass
[docs]class InvalidPointCutOperatorError(ValueError): """An unknown or illegal operator was used for the point cut""" pass
[docs]class InvalidPointCutError(ValueError): """object of type PointCut expected, but something else given""" pass
[docs]class InvalidAdviceMethodError(ValueError): """method with 2 parameters expected, but something else given""" pass
[docs]class InvalidWeaverOutputMethodError(ValueError): """ with 2 parameters expected, but something else given""" pass
[docs]class PrecedenceOutOfRangeError(ValueError): """An unknown or illegal operator was used for the point cut""" pass
[docs]class AspectValueError(ValueError): """An Aspect class was expected, but something else given"""
[docs]class CircuitValueError(ValueError): """A Circuit class was expected, but something else given"""
[docs]class RNACodingRegion(Part): def __init__(self, codesFor): super(RNACodingRegion, self).__init__() self.precompileMoleculesAfter.append(checkAndSetMolecule(codesFor))
[docs] def getCodingFor(self): result = self.getAfterNodes(Molecule) if len(result) > 0: return result return self.precompileMoleculesAfter