from abc import ABCMeta, abstractmethod
import inspect
import warnings
import re
import types
[docs]class ExecutionNode(object):
"""Abstract Superclass for all nodes in an execution flow,
e.g. parts, molecules
Parts are the atomistic instructions in the genetic circuit execution flow
| *Attributes:*
| before : [ExecutionNode]
| A list of ExecutionNode objects that precede this node
| after : [ExeuctionNode]
| A list of ExecutionNode objects that follow this node
| scope : class(WeaverOutput)
| What woven system is this node part of
| additionStack : [Circuit or Aspect]
| stack of Aspects and Circuits which are executing to add
| this node
"""
__metaclass__ = ABCMeta
def __init__(self):
self.before = []
self.after = []
self.scope = None
self.additionStack = None;
[docs] def getBeforeNodes(self, filterType=object):
"""Returns all nodes immediately before this node in the execution flow, with the option to filter by node type
| *Args:*
| filterType: Filters the list to only include nodes of a given (super)type
"""
result = []
for executionNode in self.before:
if isinstance(executionNode, filterType):
result.append(executionNode)
return result
[docs] def getAfterNodes(self, filterType):
"""Returns all nodes immediately after this node in the execution flow, with the option to filter by node type
| *Args:*
| filterType: Filters the list to only include nodes of a given (super)type
"""
result = []
for executionNode in self.after:
if isinstance(executionNode, filterType):
result.append(executionNode)
return result
# Parts are always used as objects
[docs]class Part(ExecutionNode):
"""Abstract Superclass for all Parts
Parts are the atomistic instructions in the genetic circuit execution flow,
e.g. Promoters, RBSs, Terminators, Coding Regions
They inherit their functionality as nodes in the execution flow from their
superclass ExecutionNode
"""
__metaclass__ = ABCMeta
def __init__(self):
super(Part, self).__init__()
self.precompileMoleculesBefore = []
self.precompileMoleculesAfter = []
[docs] def setBeforePart(self, part):
"""Inserts a part before this part in the execution flow. If there's already a part in front of this part,
the part to be inserted will be inserted between the two parts presently connected
| *Args:*
| part: the Part to be inserted
"""
oldPart = self.getBeforePart()
if oldPart != None:
self.before.remove(oldPart)
self.before.append(part)
[docs] def setAfterPart(self, part):
"""Inserts a part after this part in the execution flow. If there's already a part behind this part,
the part to be inserted will be inserted between the two parts presently connected
| *Args:*
| part: the Part to be inserted
"""
oldPart = self.getAfterPart()
if oldPart != None:
self.after.remove(oldPart)
self.after.append(part)
[docs] def getBeforePart(self):
"""Returns the part just before this part in the execution flow
"""
for executionNode in self.before:
if isinstance(executionNode, Part):
return executionNode
return None
[docs] def getAfterPart(self):
"""Returns the part just after this part in the execution flow
"""
for executionNode in self.after:
if isinstance(executionNode, Part):
return executionNode
return None
[docs] def weave(self, weaver):
"""Internal method used during the weaving process:
Facilitates molecule scope checking
"""
for beforeMol in self.precompileMoleculesBefore:
mol = weaver.getMoleculeObject(self.scope, beforeMol)
self.before.append(mol)
mol.after.append(self)
for afterMol in self.precompileMoleculesAfter:
mol = weaver.getMoleculeObject(self.scope, afterMol, True)
self.after.append(mol)
mol.before.append(self)
def __str__(self):
return self.__class__.__name__
[docs]class Molecule(ExecutionNode):
"""Abstract Superclass for all Molecules
Molecules are the nodes in the execution flow, for example:
reactions: (MoleculeA+MoleculeB -> MoleculeC),
translation: CodingRegion->MoleculeA,
inducing: MoleculeA -> RegulatdPromoter,
membrane transfer: MoleculeA (Compartment) -> MoleculeA (Outside Compartment)
These nodes in the execution flow don't mean an existence of an element, but rather
describe the information flow within the system.
"""
def __str__(self):
return self.__class__.__name__
[docs]class Protein(Molecule):
"""Class for Molecules that are Proteins"""
pass
[docs]def checkIfTypeReturnInstance(possibleType):
"""if the parameter is a type, try to return an instance
| *Args:*
| possibleType - a parameter which may be an instance or a type
| *Raises:*
| PartInitializationError - if the parameter is a type which can not be constructed
| *Returns:*
| An instance of the type of possibleType
"""
if isinstance(possibleType, type):
try:
return possibleType()
except:
raise PartInitializationError(str(possibleType) + " can not be initialized without Parameters.")
return possibleType
[docs]def checkAndSetMolecule(molecule):
"""checks if parameter is a class of type Molecule and returns it
| *Args:*
| molecule - A potential Molecule
| *Raises:*
| MoleculeValue - If molecule is not a class of type Molecule
| *Returns:*
| A class of type Molecule
"""
if inspect.isclass(molecule) and issubclass(molecule, Molecule):
return molecule
else:
raise MoleculeValueError("molecule must be a class of (sub)type Molecule")
[docs]class CodingRegion(Part):
"""Class for parts that are Coding Regions"""
def __init__(self, codesFor):
"""Constructor for a Coding Region Part
| *Args:*
| codesFor: sets moleculeConnection
"""
super(CodingRegion, self).__init__()
self.precompileMoleculesAfter.append(checkAndSetMolecule(codesFor))
# todo : is this necessary?
# def __del__(self):
# self.codesFor.before.remove(self)
[docs] def getCodingFor(self):
result = self.getAfterNodes(Molecule)
if len(result) > 0:
return result
#else: not combiled yet
return self.precompileMoleculesAfter
def __str__(self):
tmpstr = super(CodingRegion, self).__str__() + '(codesFor = '
tmpstr += 'm'.join(str(self.getAfterNodes(Molecule)[i]) for i in range(len(self.getAfterNodes(Molecule))))
tmpstr += ')'
return tmpstr
[docs]class RBS(Part):
"""Class for parts that are Ribosome BindingSites"""
def __init__(self):
super(RBS,self).__init__()
[docs]class Terminator(Part):
"""Class for parts that are Terminators"""
def __init__(self):
super(Terminator,self).__init__()
[docs]class Circuit(Part):
"""Abstract class for a genetic part circuit.
| Will generally be used to represent a design's core concerns
| Additionally, a circuit can be used to represent composite parts
| *Attributes:*
| weaver: Weaver
| The weaver which compiles the circuit
"""
__metaclass__ = ABCMeta
def __init__(self):
super(Circuit,self).__init__()
self.weaver = None
[docs] @abstractmethod
def mainCircuit(self):
"""Entry point for a circuit, analogous to "main" in a program
| Needs to be implemented by any sub class.
| mainCircuit will be called by the AOSB Weaver."""
pass
[docs] def importMolecule(self, molecule):
"""Import a particular molecule from the outer compartment to this compartment
| *Args:*
| molecule
"""
self.weaver.importMolecule(self, molecule)
[docs] def exportMolecule(self, molecule):
"""Export a particular molecule from this compartment to the outer compartment
| *Args:*
| molecule
"""
self.weaver.exportMolecule(self, molecule)
[docs] def createMolecule(self, molecule):
"""Declare that a particular molecule exists in the current scope
| *Args:*
| molecule
"""
self.weaver.createMolecule(self, molecule)
[docs] def addCircuit(self, circuit):
"""Add a circuit as a sub-compartment in the current compartment
| *Args:*
| circuit: Circuit to be added as a sub-compartment
"""
self.weaver.addCircuit(self, circuit)
[docs] def addPart(self, part):
"""Used to add parts to the circuit by passing them on to the AOSB Weaver
| *Args:*
| part: The part to be added to the circuit
"""
self.weaver.addPart(self, part)
[docs] def reactionFrom(self, *molecules):
"""Used to add a reaction to the circuit by passing the reactions left side on to the AOSB Weaver
| *Args:*
| *molecules: a list of one or more molecules on the lhs of the reaction
"""
return self.weaver.reactionFrom(self, molecules)
[docs] def reactionTo(self, *molecules):
"""Used to add a reaction to the circuit by passing the reactions right side on to the AOSB Weaver
| *Args:*
| *molecules: a list of one or more molecules on the rhs of the reaction
"""
return self.weaver.reactionTo(self, molecules)
[docs] def setWeaver(self, weaver):
"""Internal - Should not be used outside of the framework.
Sets the Weaver Object of this Circuit
| *Args:*
| weaver: A weaver object that will be used
"""
self.weaver = weaver
[docs]def declareNewPart(classname, parent=Part, moleculesBefore=[], moleculesAfter=[], regulatorInfoMap={}):
''' Returns a new Part type and exports it to the caller's namespace
| *Args*
| classname : string
| The name for the new type
| parent : Part
| super class for the new type
| moleculesBefore : [Molecule]
| optional, if the new Part type should have Molecule node(s) before it, e.g. regulators of a Promoter
| moleculesAfter : [Molecule]
| optional, if the new Part type should have Molecule node(s) after it, e.g. Proteins created by CodingRegions
| regulatorInfoMap : Dictionary
| optional, if a parent part needs additional regulator information.
| Necessary for HybridPromoters, who need a map in the form of {Molecule: Boolean}
| *Returns*
| The new Part type
| *Raises*
| PartValueError -If the parent is not a Part
| InvalidSymbolNameError - If classname is not a valid name for a symbol
| *Warnings*
| SymbolExistsWarning - If classname already exists in the namespace'''
if not issubclass(parent, Part):
raise PartValueError("parent must be of type Part")
validnameregex = re.compile('[a-zA-Z_][a-zA-Z0-9_]*')
if not isinstance(classname, str) or not validnameregex.match(classname) or not validnameregex.match(
classname).span() == (0, len(classname)):
raise InvalidSymbolNameError('name is not a valid symbold name')
if not isinstance(moleculesBefore, list):
raise ValueError("moleculesBefore must be of type list")
if not isinstance(moleculesAfter, list):
raise ValueError("moleculesAfter must be of type list")
for mol in moleculesBefore:
checkAndSetMolecule(mol)
for mol in moleculesAfter:
checkAndSetMolecule(mol)
basestuple = parent,
result = None
currentframe = inspect.currentframe()
# Create warning if name already exists
if currentframe.f_back.f_globals.has_key(classname):
line = currentframe.f_back.f_lineno
warnings.warn(
"Line " + str(line) + ": Part " + classname + " already defined. Existing definition will be used",
SymbolExistsWarning, 2) # 2 = one stack level above this
result = currentframe.f_back.f_globals.get(classname)
else:
result = type(classname, basestuple, {})
def newTypeInit(self, recursion=0, moleculesBeforeAggregate=[], moleculesAfterAggregate=[],
regulatorInfoMapAggregate={}):
moleculesBeforeAggregate += moleculesBefore
moleculesAfterAggregate += moleculesAfter
regulatorInfoMapAggregate = dict(regulatorInfoMapAggregate.items() + regulatorInfoMap.items())
realself = self.__class__
for i in range(0, recursion):
realself = realself.__bases__[0]
try:
super(realself, self).__init__(recursion + 1, moleculesBeforeAggregate, moleculesAfterAggregate)
except:
# we'll get an exeption if we've hit a core implemented Part type,
# because of the parameter incompatibility
if realself.__bases__[0] is PositivePromoter:
super(realself, self).__init__(moleculesBeforeAggregate[0])
# at moment: only allow one regulator, others are lost
# todo: allow multiple regulators for pos and neg promoters?
elif realself.__bases__[0] is NegativePromoter:
super(realself, self).__init__(moleculesBeforeAggregate[0])
# at moment: only allow one regulator, others are lost
# todo: allow multiple regulators for pos and neg promoters?
elif realself.__bases__[0] is ConstitutivePromoter:
super(realself, self).__init__()
# at moment: only allow one regulator, others are lost
# todo: allow multiple regulators for pos and neg promoters?
elif realself.__bases__[0] is CodingRegion:
super(realself, self).__init__(moleculesAfterAggregate[0])
# at moment: only allow one output molecule, others are lost
# todo: allow multiple?
elif realself.__bases__[0] is HybridPromoter:
super(realself, self).__init__(moleculesBeforeAggregate, regulatorInfoMapAggregate)
# at moment: only allow one output molecule, others are lost
# todo: allow multiple?
elif realself.__bases__[0] is RBS:
super(realself, self).__init__()
# in this case, we'll just add both to follow what the users wants
self.precompileMoleculesBefore += moleculesBeforeAggregate
self.precompileMoleculesAfter += moleculesAfterAggregate
elif realself.__bases__[0] is Terminator:
super(realself, self).__init__()
# in this case, we'll just add both to follow what the users wants
self.precompileMoleculesBefore += moleculesBeforeAggregate
self.precompileMoleculesAfter += moleculesAfterAggregate
elif realself.__bases__[0] is Part:
super(realself, self).__init__()
# in this case, we'll just add both to follow what the users wants
self.precompileMoleculesBefore += moleculesBeforeAggregate
self.precompileMoleculesAfter += moleculesAfterAggregate
# this means that parent does not take a moleculeConnection parameter,
#but the new subtype should have one.
result = type(classname, basestuple, {'__init__': newTypeInit})
currentframe.f_back.f_globals[classname] = result
return result
[docs]def declareNewMolecule(classname, *parents):
"""Returns a new Molecule type and exports it to the caller's namespace
| *Args:*
| classname: The name for the new type
| \*parents: 0 or more Molecule super classes
| *Returns:*
| The new Part type
| *Raises:*
| MoleculeValueError: If any parent is not a Molecule
| InvalidSymbolNameError: If classname is not a valid name for a symbol
| *Warnings:*
| SymbolExistsWarning: If classname already exists in the namespace
"""
validnameregex = re.compile('[a-zA-Z_][a-zA-Z0-9_]*')
if not isinstance(classname, str) or not validnameregex.match(classname) or not validnameregex.match(
classname).span() == (0, len(classname)):
raise InvalidSymbolNameError('name is not a valid symbold name')
if len(parents) == 0:
parents = Molecule,
else:
for parent in parents:
if not issubclass(parent, Molecule):
raise MoleculeValueError("All parents must be of type Molecule")
currentframe = inspect.currentframe()
result = None
# Check if classname already exists
if currentframe.f_back.f_globals.has_key(classname):
line = currentframe.f_back.f_lineno
warnings.warn(
"Line " + str(line) + ": Molecule " + classname + " already defined. Existing definition will be used",
SymbolExistsWarning, 2) # 2 = one stack level above this
result = currentframe.f_back.f_globals.get(classname)
else:
result = type(classname, parents, {})
currentframe.f_back.f_globals[classname] = result
return result
[docs]class PointCutExpressionNode(object):
"""Abstract superclass for all Nodes in a Point Cut Expression Tree
| A complex expression for a Point Cut, using operators such as & (and),
| \| (or) or % (concatenation) is represented as a tree of nodes
"""
__metaclass__ = ABCMeta
def __mod__(self, other):
"""% - concatenates two PointCutExpressionNodes
| *Args:*
| other - the node on the right of self
| *Returns:*
| a new PointCutExpressionConcatenate node
"""
return PointCutExpressionConcatenate(self, other)
def __and__(self, other):
"""& - boolean and evaluation of two PointCutExpressionNodes
| *Args:*
| other - the node on the right of self
| *Returns:*
| a new PointCutExpressionNodeAnd node
"""
return PointCutExpressionAnd(self, other)
def __or__(self, other):
"""| - boolean or evaluation of two PointCutExpressionNodes
| *Args:*
| other - the node on the right of self
| *Returns:*
| a new PointCutExpressionOr node
"""
return PointCutExpressionOr(self, other)
[docs] def numberOfMatchingParts(self, part):
"""returns the number of parts the expression matches
| If an expression uses concatenation, then it might match the current part and a number of preceding parts.
| *Args:*
| part - The part at which matching starts
| *Returns:*
| integer - how many parts were matched
"""
if self.match(part):
return 1
else:
return 0
[docs] @abstractmethod
def match(self, part):
"""Does a part match this (sub)-expression? An abstract method, must be implemented by each child
| *Args:*
| part - The part to be matched
| *Returns:*
| boolean - Whether or not the part was matched
"""
pass
[docs]class PointCutExpressionOperator(PointCutExpressionNode):
"""Abstract superclass of an PointCutExpression Node which is an operator
| *Attributes:*
| left - The first child of the operator
| right - The second child of the operator
"""
__metaclass__ = ABCMeta
left = None
right = None
def __init__(self, left, right):
"""Constructs a new PointCutExpresionOperator
| *Args:*
| left - Sets the left child node
| right - Sets the right child node
| *Raises:*
| InvalidPointCutExpressionError -
| If either child is not an instance of PointCutExpressionNode
"""
if isinstance(left, PointCutExpressionNode) and isinstance(right, PointCutExpressionNode):
self.left = left
self.right = right
else:
raise InvalidPointCutExpressionError("Invalid type used in a PointCut formula.")
[docs] def expressionUses(self, nodeType):
"""Confirms if a certain type of node is used in the expression
| *Args:*
| nodeType: The type of the node whose existence is to be confirmed
| *Returns:*
| boolean - Whether or not the node exists in the formula
"""
if isinstance(self, nodeType) or isinstance(self.left, nodeType) or isinstance(self.right, nodeType):
return True
else:
leftresult = False
rightresult = False
try:
leftresult = self.left.expressionUses(nodeType)
except:
pass
try:
rightresult = self.right.expressionUses(nodeType)
except:
pass
return leftresult or rightresult
[docs]class PointCutExpressionNot(PointCutExpressionOperator):
"""A PointCutExpressionOperator which is a Not
A special case, only uses one child, acts as the inverse operator
"""
def __init__(self, pointcutexpression):
"""Constructs a new PointCutExprresionNot
| *Args:*
| pointcutexpression - The expression to be negated
"""
self.right = pointcutexpression
[docs] def match(self, part, within=None):
"""see PointCutExpressionNode definition"""
return not self.right.match(part, within)
[docs]class PointCutExpressionOr(PointCutExpressionOperator):
"""A PointCutExpressionOperator which is an Or"""
[docs] def match(self, part, within=None):
"""see PointCutExpressionNode definition"""
return self.left.match(part) or self.right.match(part)
[docs]class PointCutExpressionAnd(PointCutExpressionOperator):
"""A PointCutExpressionOperator which is an And"""
[docs] def match(self, part):
"""see PointCutExpressionNode definition"""
return self.left.match(part) and self.right.match(part)
[docs]class PointCutExpressionConcatenate(PointCutExpressionOperator):
"""A PointCutExpressionOperator which is an Concatenation"""
[docs] def numberOfMatchingParts(self, part):
"""see PointCutExpressionNode definition
This child overrides it, since a concatenation operator is
a node in the expression at which more than one part can be
matched.
"""
if self.match(part):
return self.left.numberOfMatchingParts(part.getBeforePart()) + self.right.numberOfMatchingParts(part)
else:
return 0
[docs] def match(self, part):
"""see PointCutExpressionNode definition"""
try:
return self.left.match(part.getBeforePart()) and self.right.match(part)
except:
return False
[docs]def checkBaseClassesMatch(bases, typename):
"""Recursively check if the name of the types in bases (or parents) are equal to typename
| *Args:*
| bases - A tuple of types
| typename : str - A name of a type
| *Returns:*
| True if any of the names of types in bases or any of their parent bases equals typename,
| False otherwise
"""
for baseClass in bases:
if (typename == baseClass.__name__):
return True
else:
if checkBaseClassesMatch(baseClass.__bases__, typename):
return True
[docs]class PartSignatureElement(object):
"""A building block of a Part Signature
| *Attributes:*
| qualifier : ANY / SUBCLASS / CLASSONLY
| Whether the element should match precisely, all subclasses or uses a wildcard
| element : str - The string of the element (without a qualifier)
| inverse : boolean - Whether the element has been negated
"""
ANY = 1
SUBCLASS = 2
CLASSONLY = 3
def __init__(self, signature):
"""Constructs a new PartSignatureElement
| Analyzes signature and sets internal attributes
| *Args:*
| signature - the part of the Part signature for this element
"""
self.inverse = False
self.qualifier = None
self.element = ''
if signature[0] == '!':
self.inverse = True
signature = signature[1:]
if signature[-1] == '*':
self.qualifier = PartSignatureElement.ANY
self.element = signature[:-1]
elif signature[-1] == '+':
self.qualifier = PartSignatureElement.SUBCLASS
self.element = signature[:-1]
else:
self.qualifier = PartSignatureElement.CLASSONLY
self.element = signature
def __str__(self):
qualifier = ''
if self.qualifier == self.ANY:
qualifier = '*'
elif self.qualifier == self.SUBCLASS:
qualifier = '+'
exclamation = ''
if self.inverse:
exclamation = '!'
return exclamation + self.element + qualifier
[docs] def match(self, obj):
"""see PointCutExpressionNode definition"""
if self.inverse:
return not self.__match(obj)
else:
return self.__match(obj)
def __match(self, obj):
"""internal match method, matching without inverse"""
objectName = obj.__class__.__name__
if (isinstance(obj, str)):
objectName = obj
if (inspect.isclass(obj)):
objectName = obj.__name__
if ((self.qualifier == self.CLASSONLY) or (self.qualifier == self.SUBCLASS)) and (objectName == self.element):
return True
if (self.qualifier == self.ANY) and (objectName.startswith(self.element)):
return True
if (self.qualifier == self.SUBCLASS):
if (inspect.isclass(obj)):
return checkBaseClassesMatch(obj.__bases__, self.element)
else:
return checkBaseClassesMatch(obj.__class__.__bases__, self.element)
return False
[docs]class MoleculeSignatureElement(PartSignatureElement):
"""A PartSignatureElement that is a Molecule Signature
| Overloads some methods, since MoleculeSignature is used by the user for
| Molecule Type Advice - unlike PartSignatureElement, which is internal
"""
def __init__(self, signature):
"""Constructs a new MoleculeSignature
| *Args:*
| signature : str - The string signature to be cast to a MoleculeSignature
| *Raises:*
| InvalidSignatureError - If signature does not adhere to the format
"""
signatureRE = re.compile('!?([a-zA-Z_][a-zA-Z0-9_]*[\+\*]?|\*)')
try:
if not signatureRE.match(signature).span() == (0, len(signature)):
raise InvalidSignatureError()
except:
raise InvalidSignatureError()
return super(MoleculeSignatureElement, self).__init__(signature)
[docs] def match(self, obj):
"""see PointCutExpressionNode definition"""
if self.qualifier == self.ANY and obj == None:
if self.inverse:
return False
return True
return super(MoleculeSignatureElement, self).match(obj)
[docs]class MoleculeSignature():
"""A MoleculeSignature, used for Type Advice of Molecules
| *Attributes:*
| namespace : [PartSignatureElement] - List of signature parts of the signature
| molecule : MoleculeSignatureElement - The "part" part of the signature
"""
def __init__(self, signature):
"""Constructs a new PartSignature
| Analyzes signature and sets internal attributes
| *Args:*
| signature : str - The string signature to be cast to a PartSignature
| *Raises:*
| InvalidSignatureError - If signature does not adhere to the format
"""
self.namespace = []
self.molecule = None
# should be of format: Circuit.Part(Molecule)" (Molecule) is optional#
typeErrorMessage = "signature must be of type String and adhere to PointCut / PartSignature Format"
if not isinstance(signature, str):
raise InvalidSignatureError(typeErrorMessage)
# using a regular expression to make sure the signature is of a valid format
signatureRE = re.compile(
'(!?(([a-zA-Z_][a-zA-Z0-9_]*[\+\*]?)|\*)\.)+!?(([a-zA-Z_][a-zA-Z0-9_]*[\+\*]?)|\*)')
# regular expression has to match entire length of string
try:
if not signatureRE.match(signature).span() == (0, len(signature)):
raise InvalidSignatureError(typeErrorMessage)
except:
raise InvalidSignatureError(typeErrorMessage)
# split signature into components
signaturePartRE = re.compile('!?[a-zA-Z_][a-zA-Z0-9_]*[\+\*]?|\*')
splitSignature = signaturePartRE.findall(signature)
numOfCircuitElements = len(splitSignature) - 1;
self.molecule = MoleculeSignatureElement(splitSignature[numOfCircuitElements])
# #Circuit Part
# self.namespace = PartSignatureElement(splitSignature[0])
# Part ... Part
for i in range(0, numOfCircuitElements):
self.namespace.append(PartSignatureElement(splitSignature[i]))
def __str__(self):
firsthalf = '.'.join(str(self.namespace[i]) for i in range(len(self.namespace)))
firsthalf = firsthalf + '.' + str(self.molecule)
[docs] def matchNamespaces(self, namespace, scope):
for name in reversed(namespace):
if name.match(scope.circuitName):
scope = scope.parentCircuit
else:
return False
# todo edge case, i.e. if need to check * is last, or no match?
return True
[docs] def match(self, molecule):
"""see PointCutExpressionNode definition"""
if not isinstance(molecule, Molecule):
raise MoleculeValueError("Molecule to match must be instance of type Molecule")
result = self.molecule.match(molecule) and self.matchNamespaces(self.namespace, molecule.scope)
return result
[docs]class NotOnStack(PointCutExpressionNode):
def __init__(self, signature):
self.name = signature
# todo all the error checking...
[docs] def match(self, part):
"""see PointCutExpressionNode definition"""
# todo matching here should also support + and *
for withinObject in part.additionStack:
if withinObject.__class__.__name__ == self.name:
return False
return True
[docs]class OnStack(PointCutExpressionNode):
def __init__(self, signature):
self.name = signature
# todo all the error checking...
[docs] def match(self, part):
"""see PointCutExpressionNode definition"""
for withinObject in part.additionStack:
if withinObject.__class__.__name__ == self.name:
return True
return False
[docs]class PartSignature(PointCutExpressionNode):
"""A PartSignature, used in PointCut expressions or directly for Type Advice
| *Attributes:*
| namespace : [PartSignatureElement] - List of signature parts of the signature
| part : PartSignatureElement - The "part" part of the signature
| molcule : PartSignatureElement - The molecule part of the signature
| nomolecule : Boolean -
| If the PartSignature explicitly should not match parts with molecules
"""
def __init__(self, signature):
"""Constructs a new PartSignature
| Analyzes signature and sets internal attributes
| *Args:*
| signature : str - The string signature to be cast to a PartSignature
| *Raises:*
| InvalidSignatureError - If signature does not adhere to the format
"""
self.namespace = []
self.part = None
self.molecule = None
self.nomolecule = False
# should be of format: Circuit.Part(Molecule)" (Molecule) is optional#
typeErrorMessage = "signature must be of type String and adhere to PointCut / PartSignature Format"
if not isinstance(signature, str):
raise InvalidSignatureError(typeErrorMessage)
# using a regular expression to make sure the signature is of a valid format
signatureRE = re.compile(
'(!?(([a-zA-Z_][a-zA-Z0-9_]*[\+\*]?)|\*)\.)+!?(([a-zA-Z_][a-zA-Z0-9_]*[\+\*]?)|\*)(\(!?(([a-zA-Z_][a-zA-Z0-9_]*[\+\*]?)|\*)?\))?')
# regular expression has to match entire length of string
try:
if not signatureRE.match(signature).span() == (0, len(signature)):
raise InvalidSignatureError(typeErrorMessage)
except:
raise InvalidSignatureError(typeErrorMessage)
# split signature into components
signaturePartRE = re.compile('!?[a-zA-Z_][a-zA-Z0-9_]*[\+\*]?|\*')
splitSignature = signaturePartRE.findall(signature)
numOfCircuitElements = len(splitSignature) - 1;
signatureMoleculePartRE = re.compile('\(!?(([a-zA-Z_][a-zA-Z0-9_]*[\+\*]?)|\*)?\)')
moleculePartMatch = signatureMoleculePartRE.search(signature)
if moleculePartMatch != None:
moleculePart = moleculePartMatch.group(0)
if len(moleculePart) == 2:
self.nomolecule = True
else:
numOfCircuitElements -= 1
self.molecule = MoleculeSignatureElement(moleculePart[1:-1])
self.nomolecule = False
else:
self.nomolecule = False
self.molecule = MoleculeSignatureElement('*')
self.part = PartSignatureElement(splitSignature[numOfCircuitElements])
# #Circuit Part
# self.namespace = PartSignatureElement(splitSignature[0])
# Part ... Part
for i in range(0, numOfCircuitElements):
self.namespace.append(PartSignatureElement(splitSignature[i]))
def __str__(self):
firsthalf = '.'.join(str(self.namespace[i]) for i in range(len(self.namespace)))
firsthalf = firsthalf + '.' + str(self.part)
if self.nomolecule:
return firsthalf + '()'
else:
return firsthalf + '(' + str(self.molecule) + ')'
[docs] def matchNamespaces(self, namespace, scope):
i = len(scope)
for name in reversed(namespace):
i -= 1
if(i < 0):
return True;
if not name.match(str(scope[i])):
return False
# todo edge case, i.e. if need to check * is last, or no match?
return True
[docs] def match(self, part):
"""see PointCutExpressionNode definition"""
if not isinstance(part, Part):
raise PartValueError("Part to match must be instance of type Part")
result = self.part.match(part) and self.matchNamespaces(self.namespace, part.additionStack) # or scope
if not self.nomolecule:
# I.E. THERE IS A MOLECULE
# this needs to be refined / TODO
if len(part.precompileMoleculesAfter) == 0 and len(part.precompileMoleculesBefore) == 0:
if self.molecule.qualifier == self.molecule.ANY and self.molecule.element == '':
result = result and True
else:
result = result and False
for mol in part.precompileMoleculesAfter:
result = result and self.molecule.match(mol)
for mol in part.precompileMoleculesBefore:
result = result and self.molecule.match(mol)
if self.nomolecule:
if len(part.getBeforeNodes(Molecule)) != 0 or len(part.getAfterNodes(Molecule)) != 0 or \
len(part.precompileMoleculesAfter) != 0 or len(part.precompileMoleculesBefore) != 0:
result = result and False
# todo this means we could also do more than one molecule, e.g. Promoter(AB || A) or bool and...
return result
[docs]class PointCut(object):
"""A PointCut to select Join Points in the genetic parts execution flow
| *Attributes:*
| operator : BEFORE / AFTER / REPLACE
| - The operator for this PointCut
| signature : PartSignature
"""
BEFORE = 11
AFTER = 22
REPLACE = 33
def __init__(self, signature, operator):
"""Construct a new PointCut
| *Args:*
| signature - A PointCutSignature or a string that will be cast to PointCutSignature
| operator - The operator
"""
self.operator = None
self.signature = None
if isinstance(signature, PointCutExpressionNode):
self.signature = signature
else:
self.signature = PartSignature(signature)
self.checkAndSetOperator(operator)
def __str__(self):
operator = ''
if self.operator == self.BEFORE:
operator = 'BEFORE'
elif self.operator == self.AFTER:
operator = 'AFTER'
else:
operator = 'REPLACE'
return 'PointCut(\'' + str(self.signature) + '\',' + operator + ')'
[docs] def match(self, part):
"""see PointCutExpressionNode definition"""
return self.signature.match(part)
[docs] def checkAndSetOperator(self, operator):
"""Set the internal operator attribute, if the parameter is a valid operator
| *Args:*
| operator - The operator to be confirmed
| *Raises:*
| InvalidPointCutOperatorError - if the operator is invalid
| (can be dependent on the signature)
"""
if (operator == self.BEFORE) or (operator == self.AFTER) or (operator == self.REPLACE):
self.operator = operator
if (operator == self.BEFORE and isinstance(self.signature,
PointCutExpressionOperator) and self.signature.expressionUses(
PointCutExpressionConcatenate)):
raise InvalidPointCutOperatorError(
'PointCut Operator can not be BEFORE if PartSignature uses concatenation.')
else:
raise InvalidPointCutOperatorError("operator must be of type PointCut.BEFORE / AFTER / REPLACE")
[docs]class PointCutContext(object):
"""Container for context at a PointCut
| *Attributes:*
| within - A stack of the circuits / aspects that within which the PointCut was matched
| part - The part matched by the PointCut"""
def __init__(self, within, part):
self.within = within
self.part = part
[docs] def isWithin(self, obj):
"""Checks if the parameter (a circuit or aspect) is on the within stack
| *Args:*
| obj : Circuit / Aspect - The object that is to be found on the stack
| *Returns:*
| Boolean - true if obj is on the within stack
"""
return self.__isWithinRecursive(obj, len(self.within))
def __isWithinRecursive(self, obj, index):
if index > 0:
return (obj == self.within[index - 1]) or self.__isWithinRecursive(obj, index - 1)
return False
[docs]class Advice(object):
"""Container for Advice
| *Attributes:*
| precedence : int - High precedence advice have execution priority over low precedence
| - MINPRECEDENCE <= precedence <= MAXPRECEDENCE is the valid range
| pointcut : PointCut
| adviceMethod : method - The method to be executed at the advice
| - must have 2 parameters: self and PointCutContext
"""
MINPRECEDENCE = 0
MAXPRECEDENCE = 100
def __init__(self, pointcut, adviceMethod, precedence=MINPRECEDENCE):
"""Constructs a new Advice object, setting internal state
| *Args:*
| pointcut - The PointCut to be set
| adviceMethod - The adviceMethod to be set
| precedence - The precedence to be set
| *Raises:*
| InvalidPointCutError
| InvalidAdviceMethodError - If argument is not a method or takes wrong
| number of parameters (must take self and PointCutContext object)
| PrecedenceOutOfRangeError - If precedence > MAXPRECEDENCE or
| precedence < MINPRECEDENCE
"""
self.pointcut = None
self.adviceMethod = None
self.precedence = self.MINPRECEDENCE
if not isinstance(pointcut, PointCut):
raise InvalidPointCutError("pointcut must be of type PointCut")
self.pointcut = pointcut
if not inspect.ismethod(adviceMethod) or len(inspect.getargspec(adviceMethod)[0]) != 2:
raise InvalidAdviceMethodError("adviceMethod must be a method with 2 parameters (self, PointCutContext)")
self.adviceMethod = adviceMethod
if precedence < (self.MINPRECEDENCE) or (precedence > self.MAXPRECEDENCE):
raise PrecedenceOutOfRangeError(
"Advice Precedence must be between " + str(self.MINPRECEDENCE) + " and " + str(self.MAXPRECEDENCE))
self.precedence = precedence
[docs]class TypeAdvice(object):
"""Container for TypeAdvice
| *Attributes:*
| signature : PartSignature or MoleculeSignature
| typeaddition : method or attribute to be added to the type
| name : The name the new typeaddition should have in the new type
| aspect : Aspect which declares this TypeAdvice
"""
def __init__(self, signature, typeaddition, name, aspect):
"""Constructs a new TypeAdvice
| *Args:*
| signature - PartSignature / MoleculeSignature to be set
| typeaddition - attribute to be set
| name : str - attribute to be set
| aspect : Aspect - attribute to be set
| *Raises:*
| InvalidSignatureError -
| If signature not instance of PartSignature or MoleculeSignature
| InvalidSymbolNameError -
| If name is not a valid name for a symbol
"""
self.signature = None
self.typeaddition = None
self.aspect = None
self.name = ''
if isinstance(signature, PartSignature) or isinstance(signature, MoleculeSignature):
self.signature = signature;
else:
raise InvalidSignatureError(
'Signature parameter for TypeAdvice must be of type PartSignature or MoleculeSignature')
self.typeaddition = typeaddition;
self.aspect = aspect
validnameregex = re.compile('[a-zA-Z_][a-zA-Z0-9_]*')
if not isinstance(name, str) or not validnameregex.match(name) or not validnameregex.match(name).span() == (
0, len(name)):
raise InvalidSymbolNameError('name is not a valid symbold name')
self.name = name
[docs] def isPartAdvice(self):
"""Returns True if TypeAdvice is for Part, False otherwise"""
return isinstance(self.signature, PartSignature)
[docs] def isMoleculeAdvice(self):
"""Returns True if TypeAdvice is for Molecule, False otherwise"""
return isinstance(self.signature, MoleculeSignature)
[docs]class Aspect(object):
"""Abstract class for an Aspect
| Will generally be used to represent a design's cross-cutting concerns
| *Notes:*
| Any child must implement mainAspect method
| *Attributes*
| weaver: Weaver
| The weaver which compiles the aspect
| adviceList - A list of all advice this aspect declares
| typeAdviceList - A list of all type advice this aspect declares
| weaverOutputList - A list of all type advice for the WeaverOutput
"""
__metaclass__ = ABCMeta
def __init__(self):
self.adviceList = []
self.typeAdviceList = []
self.weaverOutputList = []
self.weaver = None
[docs] def importMolecule(self, molecule):
"""Import a particular molecule from the outer compartment to this compartment
| *Args:*
| molecule
"""
self.weaver.importMolecule(self, molecule)
[docs] def exportMolecule(self, molecule):
"""Export a particular molecule from this compartment to the outer compartment
| *Args:*
| molecule
"""
self.weaver.exportMolecule(self, molecule)
[docs] def createMolecule(self, molecule):
"""Declare that a particular molecule exists in the current scope
| *Args:*
| molecule
"""
self.weaver.createMolecule(self, molecule)
[docs] def addCircuit(self, circuit):
"""Add a circuit as a sub-compartment in the current compartment
| *Args:*
| circuit: Circuit to be added as a sub-compartment
"""
self.weaver.addCircuit(self, circuit)
[docs] def addPart(self, part):
"""Used to add parts to the design by passing them on to the AOSB Weaver
| *Args:*
| part: The part to be added to the circuit
"""
self.weaver.addPart(self, part)
[docs] def reactionFrom(self, *molecules):
"""Used to add a reaction to the aspect by passing the reactions left side on to the AOSB Weaver
| *Args:*
| *molecules: a list of one or more molecules on the lhs of the reaction
"""
return self.weaver.reactionFrom(self, molecules)
[docs] def reactionTo(self, *molecules):
"""Used to add a reaction to the aspect by passing the reactions right side on to the AOSB Weaver
| *Args:*
| *molecules: a list of one or more molecules on the rhs of the reaction
"""
return self.weaver.reactionTo(self, molecules)
return self.weaver.reactionTo(self, molecules)
[docs] def setWeaver(self, weaver):
"""Internal - Should not be used outside of the framework.
Sets the Weaver Object of this Circuit
| *Args:*
| weaver: A weaver object that will be used
"""
self.weaver = weaver
[docs] def addAdvice(self, pointcut, adviceMethod, precedence=Advice.MINPRECEDENCE):
"""Declare a new advice in the aspect
| *Args:*
| pointcut - The pointcut for the advice
| adviceMethod - The method to be executed at the pointcut
| should be a method bound to this aspect, with second parameter
| expecting a PointCutContext object
| precedence : integer (optional) - set precedence of advice,
| see notes on Advice.precedence
| *Notes:*
| addAdvice constructs an Advice object. Further information thus
| can be found there.
"""
self.adviceList.append(Advice(pointcut, adviceMethod, precedence))
[docs] def addTypeAdvice(self, signature, typeaddition, name):
"""Declare a type advice in the aspect
| *Args:*
| signature - The signature for the type advice
| typeaddition - The attribute / method to be added to the type
| name: str - The name for the typeaddition in the new type
| *Notes:*
| addTypeAdvice constructs an TypeAdvice object.
| Further information thus can be found there.
"""
self.typeAdviceList.append(TypeAdvice(signature, typeaddition, name, self))
[docs] def addWeaverOutput(self, outputmethod):
"""Declare a new weaver output target
| *Args:*
| outputMethod - The method to be added to the WeaverOutput
| *Raises:*
| InvalidWeaverOutputMethodError -
| If outputmethod is not a method or has wrong number of parameters
| (needs to accept self and a WeaverOutput reference)
"""
if not inspect.ismethod(outputmethod) or len(inspect.getargspec(outputmethod)[0]) != 2:
raise InvalidWeaverOutputMethodError("outputmethod must be a method with 2 parameters (self, WeaverOutput)")
self.weaverOutputList.append(outputmethod)
[docs] def getAdviceList(self):
"""Returns adviceList"""
return self.adviceList
[docs] def getTypeAdviceList(self):
"""Returns typeAdviceList"""
return self.typeAdviceList
[docs] def getWeaverOutputList(self):
"""Returns weaverOutputList"""
return self.weaverOutputList
[docs] @abstractmethod
def mainAspect(self):
"""Entry point for an aspect, analogous to "main" in a program
| Needs to be implemented by any sub class.
| mainAspect will be called by the AOSB Weaver."""
pass
def __str__(self):
return self.__class__.__name__
[docs]class Weaver(object):
"""The "compiler" that weaves core concerns (circuits) and cross-cutting
concerns (aspects) and creates a woven execution flow of parts
| *Attributes:*
| partList - The current list of parts
| moleculeList - The current list of molecules
| beforeAndReplaceAdviceList - List of all before and replace advice to be woven
| afterAdviceList - List of all after advice to be woven
| partTypeAdviceList - List of all part type advice
| moleculeTypeAdviceList - List of all molecule type advice
| circuit - The main circuit
| aspects - The list of all aspects to be woven
| weaverOutput : WeaverOutput - The "compiled" result
"""
[docs] class WeaverOutput(object):
"""Container for the woven result of the Weaver
| *Attributes:*
| circuitName - the name of the circuit that was woven
| partList - finished ordered list of parts in the design
| moleculeList - list of all molecules in the design
| subcircuitList - list of all subcircuits (itself weaver outputs) (??)
|
"""
def __init__(self, circuitName, parentCircuit=None):
self.circuitName = circuitName
self.partList = []
self.moleculeList = []
self.wovenCircuitList = []
self.parentCircuit = parentCircuit
def __str__(self):
result = "\n\n"
result = result + self.circuitName + ' Circuit'
result = result + '\n--------------------------\n'
result = result + 'Parts List:\n' + '+'.join(
str(self.partList[i]) for i in range(len(self.partList)))
if (len(self.partList) == 0):
result = result + "None"
result = result + "\nCircuit Molecule List: \n"
result = result + self.molecules()
result = result + "\nCircuit Molecule Reactions: \n"
result = result + self.moleculeGraph()
if len(self.wovenCircuitList) > 0:
result = result + '\nSubCircuits:\n'
for i in range(len(self.wovenCircuitList)):
result = result + "Sub Circuit " + str(i + 1) + ":\n"
result = result + str(self.wovenCircuitList[i])
result = result + '\n####################\n'
result = result + "\n\n"
return result
[docs] def molecules(self):
if len(self.moleculeList) == 0:
return "None"
else:
return ', '.join(str(self.moleculeList[i]) for i in range(len(self.moleculeList)))
[docs] def moleculeGraph(self):
result = ''
for i in range(len(self.moleculeList)):
if (isinstance(self.moleculeList[i], Part) and self.moleculeList[i].scope != self):
continue
for j in range(len(self.moleculeList[i].before)):
if (isinstance(self.moleculeList[i].before[j], Molecule)) or isinstance(
self.moleculeList[i].before[j], Part):
if (isinstance(self.moleculeList[i].before[j], Part)
and (self.moleculeList[i].before[j].scope == self)):
result += str(self.moleculeList[i].before[j].scope.circuitName) + '.' + str(
self.moleculeList[i].before[j]) \
+ '->' + \
str(self.moleculeList[i].scope.circuitName) + '.' + str(
self.moleculeList[i]) + '; '
else:
result += ','.join(str(self.moleculeList[i].before[j][k].scope.circuitName) + '.' + str(
self.moleculeList[i].before[j][k])
for k in range(len(self.moleculeList[i].before[j]))) \
+ '->' \
+ str(self.moleculeList[i].scope.circuitName) + '.' + str(self.moleculeList[i]) + '; '
for j in range(len(self.moleculeList[i].after)):
if (
(isinstance(self.moleculeList[i].after[j], Part) and self.moleculeList[i].after[j].scope != self)):
pass
else:
result += str(self.moleculeList[i].scope.circuitName) + '.' + str(self.moleculeList[i]) \
+ '->' + \
str(self.moleculeList[i].after[j].scope.circuitName) + '.' + str(
self.moleculeList[i].after[j]) + '; '
return result
def __init__(self, circuit, *aspects):
"""Sets of the weaver and compiles the design
| *Attributes:*
| circuit : Circuit - The main circuit to be set
| *aspects : Aspect - list of aspects to be set
| *Raises:*
| CircuitValueError - If circuit is invalid
| AspectValueError - If any aspect is invalid"""
self.beforeAndReplaceAdviceList = []
self.afterAdviceList = []
self.partTypeAdviceList = []
self.moleculeTypeAdviceList = []
self.circuit = None
self.aspects = []
self.withinStack = []
self.weaverOutput = None
if not issubclass(circuit, Circuit) and not isinstance(circuit, Circuit):
raise CircuitValueError("circuit must be a class or instance of type Circuit.")
if inspect.isclass(circuit):
try:
self.circuit = circuit()
except:
raise CircuitValueError("Can not initialize Circuit " + circuit + " without parameters")
else:
self.circuit = circuit
self.circuit.setWeaver(self)
for aspect in aspects:
if not issubclass(aspect, Aspect) and not isinstance(aspect, Aspect):
raise AspectValueError("all aspects must be classes or instances of type Aspect.")
if inspect.isclass(aspect):
try:
self.aspects.append(aspect())
except:
raise AspectValueError("Can not initialize Aspect " + aspect + " without parameters")
else:
self.aspects.append(aspect)
self.weaverOutput = self.WeaverOutput(self.circuit.__class__.__name__)
self.readAspectsConstructAdviceLists()
self.sortAdviceList()
# weaving is kicked off here
self.currentWeaverOutput = self.weaverOutput
self.circuit.mainCircuit()
self.constructMoleculeListAndAddTypeAdvice()
[docs] def sortAdviceList(self):
"""Internal - sorts the advice lists by precedence"""
def sortKey(advice):
return advice.precedence
self.beforeAndReplaceAdviceList = sorted(self.beforeAndReplaceAdviceList, key=sortKey, reverse=True)
self.afterAdviceList = sorted(self.afterAdviceList, key=sortKey)
[docs] def constructMoleculeListAndAddTypeAdvice(self):
pass
''' """Internal - constructs list of Molecules in design and adds their type advice"""
# construct moleculeList - this should probably be done during weaving?
for part in self.partList:
if part.moleculeConnection:
if not part.moleculeConnection in self.moleculeList:
self.moleculeList.append(part.moleculeConnection)
# add type advice to molecules
for molecule in self.moleculeList:
for typeAdvice in self.moleculeTypeAdviceList:
if typeAdvice.signature.match(molecule):
setattr(molecule,typeAdvice.name,typeAdvice.typeaddition)'''
[docs] def readAspectsConstructAdviceLists(self):
"""Internal - initializes aspects and constructs all advice lists"""
for aspect in self.aspects:
aspect.setWeaver(self)
aspect.mainAspect()
for advice in aspect.getAdviceList():
if advice.pointcut.operator == PointCut.AFTER:
self.afterAdviceList.append(advice)
else:
self.beforeAndReplaceAdviceList.append(advice)
for typeAdvice in aspect.getTypeAdviceList():
if typeAdvice.isPartAdvice():
self.partTypeAdviceList.append(typeAdvice)
else:
self.moleculeTypeAdviceList.append(typeAdvice)
weaverOutput = self.weaverOutput # to make weaverOutput visible to closure scope
if len(aspect.getWeaverOutputList()) > 0:
# this aspect wants to add a new weaver output target
# TODO check if a similarly named output target already exists and raise
# useful exception (containing name of clashed method, name of advice)
for outputMethod in aspect.getWeaverOutputList():
def outputMethodWrapper(method):
def outputMethodWrapperClosure(self):
return method(weaverOutput)
return outputMethodWrapperClosure
setattr(self.weaverOutput, outputMethod.__name__,
types.MethodType(outputMethodWrapper(outputMethod), aspect))
# todo clean up, possible split lookup and creation
[docs] def getMoleculeObject(self, scope, moleculeClass, createIt=False):
for molecule in scope.moleculeList:
if isinstance(molecule, moleculeClass):
return molecule # TODO what about subclases?
# molecule does not yet exist in the scope
# (this means it's not been created or imported from another scope)
if createIt == False:
raise MoleculeValueError("Molecule " + moleculeClass.__name__ + " does not exist in this scope")
else:
result = moleculeClass()
result.scope = scope
result.additionStack = self.withinStack[:] # important: needs to be a deep copy!
scope.moleculeList.append(result)
self.addElemTypeAdvice(result)
return result
[docs] def addElemTypeAdvice(self, elem):
"""Internal - adds type advice to a part"""
list = self.partTypeAdviceList
if isinstance(elem, Molecule):
list = self.moleculeTypeAdviceList
for typeAdvice in list:
if typeAdvice.signature.match(elem):
# TODO check for name clashes...
if inspect.ismethod(typeAdvice.typeaddition):
def adviceMethodWrapper(method):
def adviceMethodWrapperClosure(self):
return method(elem)
return adviceMethodWrapperClosure
setattr(elem, typeAdvice.name,
types.MethodType(adviceMethodWrapper(typeAdvice.typeaddition), typeAdvice.aspect))
else:
setattr(elem, typeAdvice.name, typeAdvice.typeaddition)
[docs] def runBeforeAndReplaceAdvice(self, callingObject, part):
"""Internal - runs Before and Replace Advice of a part
| *Args:*
| part - The part for which the advice matches
| callingObject - the circuit or advice which added the part
| *Returns:*
| "continue"
| True - If part should still be added
| False - If part has been replaced and remaining after advice executed
"""
for advice in self.beforeAndReplaceAdviceList:
if len(self.currentWeaverOutput.partList) > 0:
part.setBeforePart(self.currentWeaverOutput.partList[-1])
if advice.pointcut.match(part):
if (advice.pointcut.operator == PointCut.REPLACE):
numberOfMatchingParts = advice.pointcut.signature.numberOfMatchingParts(part)
while numberOfMatchingParts > 1:
self.currentWeaverOutput.partList.pop()
numberOfMatchingParts -= 1
# Advice should return False if not replaced, True if Replaced
AdviceResult = advice.adviceMethod(PointCutContext(self.withinStack, part))
self.runAfterAdvice(callingObject, part, advice.precedence)
return not AdviceResult
advice.adviceMethod(PointCutContext(self.withinStack, part))
return True
[docs] def runAfterAdvice(self, callingObject, part, precedence=Advice.MINPRECEDENCE):
"""Internal - runs After Advice of a part
| *Args:*
| part - The part for which the advice matches
| callingObject - the circuit or advice which added the part
| precedence - Only run advice with a precedence greater or equal this
| Used if replacement advice has been executed
"""
# after advice are ordered by ascending precedence
# the one with the lowest priority is executed
#if precedenceLevel is set by a replacement advice, we will only
#execute those advice with >= precedence
for advice in self.afterAdviceList:
if (advice.precedence >= precedence) and (advice.pointcut.match(part)):
advice.adviceMethod(PointCutContext(self.withinStack, part))
[docs] def addReaction(self, callingObject, fromMoleculeList, toMoleculeList):
realFromMoleculeList = []
realToMoleculeList = []
for element in fromMoleculeList:
realFromMoleculeList.append(self.getMoleculeObject(self.currentWeaverOutput, element))
# todo needs to be adapted to actually search scope ??? or report error
for element in toMoleculeList:
realToMoleculeList.append(self.getMoleculeObject(self.currentWeaverOutput, element, True))
# todo each element in fromMoleculeList does not need toMoleculeList in its before...
# need to show that here is a gate...
# if both are just one molecule, can hook them up
if (len(realFromMoleculeList) == 1) and (len(realToMoleculeList) == 1):
realFromMoleculeList[0].after.append(realToMoleculeList[0])
realToMoleculeList[0].before.append(realFromMoleculeList[0])
elif (len(realFromMoleculeList) > 1) and (len(realToMoleculeList) == 1):
realToMoleculeList[0].before.append(realFromMoleculeList)
elif (len(realFromMoleculeList) == 1) and (len(realToMoleculeList) > 1):
realFromMoleculeList[0].after.append(realToMoleculeList)
[docs] class MoleculeReactionTo:
"""Class to represent the right hand side of a molecule reaction
"""
def __init__(self, molecules):
self.molecules = molecules
[docs] class MoleculeReactionFrom:
"""Class to represent the left hand side of a molecule reaction
and enable convenient syntax by overloading the >> (rshift) operator
| *Attributes:*
| before : [ExecutionNode]
| A list of ExecutionNode objects that precede this node
| after : [ExeuctionNode]
| A list of ExecutionNode objects that follow this node
| scope : class(WeaverOutput)
| What woven system is this node part of
| additionStack : [Circuit or Aspect]
| stack of Aspects and Circuits which are executing to add
| this node
"""
def __init__(self, weaver, callingObject, molecules):
self.weaver = weaver
self.callingObject = callingObject
self.fromMolecules = molecules
def __rshift__(self, other):
# todo check type of other, should be MoleculeReactionTo
self.weaver.addReaction(self.callingObject, self.fromMolecules, other.molecules)
[docs] def reactionFrom(self, callingObject, molecules):
"""Used to add the left hand side of a molecule reaction to the design,
called by either a circuit or aspect
| *Args:*
| callingObject - the circuit or aspect calling this
| molecules - the list of molecules on the lhs of the reaction
"""
return self.MoleculeReactionFrom(self, callingObject, molecules)
[docs] def reactionTo(self, callingObject, molecules):
"""Used to add the right hand side of a molecule reaction to the design,
called by either a circuit or aspect
| *Args:*
| callingObject - the circuit or aspect calling this
| molecules - the list of molecules on the rhs of the reaction
"""
return self.MoleculeReactionTo(molecules)
[docs] def importMolecule(self, callingObject, molecule):
"""Indicates that a molecule should be "imported" by the current compartment from
the outer compartment
| *Args:*
| callingObject - the circuit or aspect calling this
| molecule - the molecule to be imported
"""
# means that we are getting a molecule from the next larger scope
# todo check that we are not in outest scope
# todo rethink - why should getMOlecule... take an object?? it's creating them afterall... should take class name?
# todo no? maybe?
importedMoleculeParent = self.getMoleculeObject(self.currentWeaverOutput.parentCircuit, molecule)
# case 1: molecule already exists in scope and needs to be merged
for mol in self.currentWeaverOutput.moleculeList:
if isinstance(mol, molecule):
mol.before.append(importedMoleculeParent)
importedMoleculeParent.after.append(mol)
return
# otherwise:
#case 2: molecule doesn't exist yet
self.createMolecule(callingObject, molecule)
# now we can add it via case 1:
self.importMolecule(callingObject, molecule)
[docs] def exportMolecule(self, callingObject, molecule):
"""Indicates that a molecule should be "exported" by the current compartment to
the outer compartment
| *Args:*
| callingObject - the circuit or aspect calling this
| molecule - the molecule to be exported
"""
# todo error checking
# e.g. if molecule already exists (but possible differnet instances?? warn of clash?
# if parent doesn't exist
exportMolecule = self.getMoleculeObject(self.currentWeaverOutput, molecule)
# case 1: the molecule already exists in the outer scope and needs to be added to its graph
for mol in self.currentWeaverOutput.parentCircuit.moleculeList:
if isinstance(mol, molecule):
mol.before.append(exportMolecule)
exportMolecule.after.append(mol)
return
# otherwise:
#case 2: molecule doens't exist yet in outer scope
self.getMoleculeObject(self.currentWeaverOutput.parentCircuit, molecule, True)
self.exportMolecule(callingObject, molecule)
[docs] def createMolecule(self, callingObject, molecule):
"""Indicates that a molecule is potentially present in the current compartment,
i.e. it does not necessarily need to be created by a Coding Region, by an import, etc.
| *Args:*
| callingObject - the circuit or aspect calling this
| molecule - the molecule to be present
"""
self.getMoleculeObject(self.currentWeaverOutput, molecule, True)
# todo, if create is set to true, should actually report an error if molecule already exists.
[docs] def addCircuit(self, callingObject, circuit):
"""Called by circuit or aspect to add a circuit as a sub-compartment in the current compartment
| *Args:*
| callingObject - the circuit or aspect calling this
| circuit - The circuit to be added as a sub-compartment
"""
# todo check circuit
circuitObject = circuit()
newCircuit = self.WeaverOutput(circuitObject.__class__.__name__, self.currentWeaverOutput)
self.currentWeaverOutput.wovenCircuitList.append(newCircuit)
self.currentWeaverOutput = newCircuit
# new sub circuit is set up. no we'll weave it
circuitObject.setWeaver(self)
circuitObject.mainCircuit()
self.currentWeaverOutput = self.currentWeaverOutput.parentCircuit
[docs] def addPart(self, callingObject, part):
"""Called by circuit or aspect to add a part in the execution flow
| *Args:*
| callingObject - the circuit or aspect calling this
| part - The part supposed to be added
"""
part = checkIfTypeReturnInstance(part)
# part.namespace = self.currentWeaverOutput
part.scope = self.currentWeaverOutput
self.withinStack.append(callingObject)
part.additionStack = self.withinStack[:]
continueAddingPart = self.runBeforeAndReplaceAdvice(callingObject, part)
# before adding the part, add any type advice
if continueAddingPart:
part.weave(self)
# TODO clean this up, the namespace issue...
self.addElemTypeAdvice(part)
# If the part is a composite, we unpack it here. otherwise we finally add the part
if isinstance(part, Circuit) == False:
# add before / after information
if len(self.currentWeaverOutput.partList) > 0:
self.currentWeaverOutput.partList[-1].setAfterPart(part)
part.setBeforePart(self.currentWeaverOutput.partList[-1])
self.currentWeaverOutput.partList.append(part)
else:
# the circuit is not initialized with a weaver
part.setWeaver(self)
part.mainCircuit()
self.runAfterAdvice(callingObject, part)
self.withinStack.pop()
[docs] def output(self):
"""Returns the weaverOutput Element"""
return self.weaverOutput
[docs]class MoleculeValueError(ValueError):
"""Molecule was expected, but something else was given"""
pass
[docs]class PartInitializationError(Exception):
"""Part was expected, but something else was given"""
[docs]class PartValueError(ValueError):
"""Part was expected, but something else was given"""
[docs]class InvalidSymbolNameError(ValueError):
"""A symbol name (string) was not correctly formatted"""
pass
[docs]class SymbolExistsWarning(UserWarning):
"""A Part or Molecule declaration uses an existing name"""
pass
[docs]class InvalidSignatureError(ValueError):
"""A signature (PointCut / Part / Molecule) is incorrectly formatted or typed"""
pass
[docs]class InvalidPointCutExpressionError(ValueError):
"""There is an error in a Point Cut expression"""
pass
[docs]class InvalidPointCutOperatorError(ValueError):
"""An unknown or illegal operator was used for the point cut"""
pass
[docs]class InvalidPointCutError(ValueError):
"""object of type PointCut expected, but something else given"""
pass
[docs]class InvalidAdviceMethodError(ValueError):
"""method with 2 parameters expected, but something else given"""
pass
[docs]class InvalidWeaverOutputMethodError(ValueError):
""" with 2 parameters expected, but something else given"""
pass
[docs]class PrecedenceOutOfRangeError(ValueError):
"""An unknown or illegal operator was used for the point cut"""
pass
[docs]class AspectValueError(ValueError):
"""An Aspect class was expected, but something else given"""
[docs]class CircuitValueError(ValueError):
"""A Circuit class was expected, but something else given"""
[docs]class RNACodingRegion(Part):
def __init__(self, codesFor):
super(RNACodingRegion, self).__init__()
self.precompileMoleculesAfter.append(checkAndSetMolecule(codesFor))
[docs] def getCodingFor(self):
result = self.getAfterNodes(Molecule)
if len(result) > 0:
return result
return self.precompileMoleculesAfter