summaryrefslogtreecommitdiff
path: root/pyxb/binding/content.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyxb/binding/content.py')
-rw-r--r--pyxb/binding/content.py1253
1 files changed, 617 insertions, 636 deletions
diff --git a/pyxb/binding/content.py b/pyxb/binding/content.py
index 4943e1d..6150739 100644
--- a/pyxb/binding/content.py
+++ b/pyxb/binding/content.py
@@ -21,16 +21,7 @@ the Python field in which the values are stored. They also provide the
low-level interface to set and get the corresponding values in a binding
instance.
-L{ContentModelTransition}, L{ContentModelState}, and L{ContentModel} are used
-to store a deterministic finite automaton which is used to translate between
-binding instances and other representations (e.g., DOM nodes)
-
-L{ModelGroupAllAlternative} and L{ModelGroupAll} represent special nodes in
-the DFA that support a model group with compositor "all" in a way that does
-not result in an exponential state explosion in the DFA.
-
-L{DFAStack} and its related internal classes are used in stream-based
-processing of content.
+@todo: Document new content model
L{Wildcard} holds content-related information used in the content model.
"""
@@ -41,6 +32,137 @@ import basis
import xml.dom
+class ContentState_mixin (pyxb.cscRoot):
+ """Declares methods used by classes that hold state while validating a
+ content model component."""
+
+ def accepts (self, particle_state, instance, value, element_use):
+ """Determine whether the provided value can be added to the instance
+ without violating state validation.
+
+ This method must not throw any non-catastrophic exceptions; general
+ failures should be transformed to a C{False} return value.
+
+ @param particle_state: The L{ParticleState} instance serving as the
+ parent to this state. The implementation must inform that state when
+ the proposed value completes the content model.
+
+ @param instance: An instance of a subclass of
+ {basis.complexTypeDefinition}, into which the provided value will be
+ stored if it is consistent with the current model state.
+
+ @param value: The value that is being validated against the state.
+
+ @param element_use: An optional L{ElementUse} instance that specifies
+ the element to which the value corresponds. This will be available
+ when the value is extracted by parsing a document, but will be absent
+ if the value was passed as a constructor positional parameter.
+
+ @return: C{True} if the value was successfully matched against the
+ state. C{False} if the value did not match against the state."""
+ raise Exception('ContentState_mixin.accepts not implemented in %s' % (type(self),))
+
+ def notifyFailure (self, sub_state, particle_ok):
+ """Invoked by a sub-state to indicate that validation cannot proceed
+ in the current state.
+
+ Normally this is used when an intermediate content model must reset
+ itself to permit alternative models to be evaluated.
+
+ @param sub_state: the state that was unable to accept a value
+
+ @param particle_ok: C{True} if the particle that rejected the value is
+ in an accepting terminal state
+
+ """
+ raise Exception('ContentState_mixin.notifyFailure not implemented in %s' % (type(self),))
+
+ def _verifyComplete (self, parent_particle_state):
+ """Determine whether the deep state is complete without further elements.
+
+ No-op for non-aggregate state. For aggregate state, all contained
+ particles should be checked to see whether the overall model can be
+ satisfied if no additional elements are provided.
+
+ This method does not have a meaningful return value; violations of the
+ content model should produce the corresponding exception (generally,
+ L{MissingContentError}).
+
+ @param parent_particle_state: the L{ParticleState} for which this state
+ is the term.
+ """
+ pass
+
+class ContentModel_mixin (pyxb.cscRoot):
+ """Declares methods used by classes representing content model components."""
+
+ def newState (self, parent_particle_state):
+ """Return a L{ContentState_mixin} instance that will validate the
+ state of this model component.
+
+ @param parent_particle_state: The L{ParticleState} instance for which
+ this instance is a term. C{None} for the top content model of a
+ complex data type.
+ """
+ raise Exception('ContentModel_mixin.newState not implemented in %s' % (type(self),))
+
+ def _validateCloneSymbolSet (self, symbol_set_im):
+ """Create a mutable copy of the symbol set.
+
+ The top-level map is copied, as are the lists of values. The values
+ within the lists are unchanged, as validation does not affect them."""
+ rv = symbol_set_im.copy()
+ for (k, v) in rv.items():
+ rv[k] = v[:]
+ return rv
+
+ def _validateCloneOutputSequence (self, output_sequence_im):
+ return output_sequence_im[:]
+
+ def _validateReplaceResults (self, symbol_set_out, symbol_set_new, output_sequence_out, output_sequence_new):
+ """In-place update of symbol set and output sequence structures.
+
+ Use this to copy from temporary mutable structures updated by local
+ validation into the structures that were passed in once the validation
+ has succeeded."""
+ symbol_set_out.clear()
+ symbol_set_out.update(symbol_set_new)
+ output_sequence_out[:] = output_sequence_new
+
+ def _validate (self, symbol_set, output_sequence):
+ """Determine whether an output sequence created from the symbols can
+ be made consistent with the model.
+
+ The symbol set represents letters in an alphabet; the output sequence
+ orders those letters in a way that satisfies the regular expression
+ expressed in the model. Both are changed as a result of a successful
+ validation; both remain unchanged if the validation failed. In
+ recursing, implementers may assume that C{output_sequence} is
+ monotonic: its length remains unchanged after an invocation iff the
+ symbol set also remains unchanged. The L{_validateCloneSymbolSet},
+ L{_validateCloneOutputSequence}, and L{_validateReplaceResults}
+ methods are available to help preserve this behavior.
+
+ @param symbol_set: A map from L{ElementUse} instances to a list of
+ values. The order of the values corresponds to the order in which
+ they should appear. A key of C{None} identifies values that are
+ stored as wildcard elements. Values are removed from the lists as
+ they are used; when the last value of a list is removed, its key is
+ removed from the map. Thus an empty dictionary is the indicator that
+ no more symbols are available.
+
+ @param output_sequence: A mutable list to which should be appended
+ tuples C{( eu, val )} where C{eu} is an L{ElementUse} from the set of
+ symbol keys, and C{val} is a value from the corresponding list. A
+ document generated by producing the elements in the given order is
+ expected to validate.
+
+ @return: C{True} iff the model validates. C{symbol_set} and
+ C{output_path} must be unmodified if returns C{False}.
+ """
+ raise Exception('ContentState_mixin._validate not implemented in %s' % (type(self),))
+
+
class AttributeUse (pyxb.cscRoot):
"""A helper class that encapsulates everything we need to know
about the way an attribute is used within a binding class.
@@ -281,7 +403,7 @@ class AttributeUse (pyxb.cscRoot):
desc.extend(['=', self.__unicodeDefault ])
return ''.join(desc)
-class ElementUse (pyxb.cscRoot):
+class ElementUse (ContentState_mixin, ContentModel_mixin):
"""Aggregate the information relevant to an element of a complex type.
This includes the L{original tag name<name>}, the spelling of L{the
@@ -469,641 +591,54 @@ class ElementUse (pyxb.cscRoot):
desc.append(self.elementBinding()._description(user_documentation=user_documentation))
return ''.join(desc)
-class _DFAState (object):
- """Base class for a suspended DFA interpretation."""
- __contentModel = None
- __state = None
-
- def __init__ (self, content_model, state=1):
- self.__contentModel = content_model
- self.__state = state
-
- def state (self):
- """The current state of the automaton, represented as an integer."""
- return self.__state
- def contentModel (self):
- """The L{ContentModel} to which the state belongs."""
- return self.__contentModel
- def updateState (self, state):
- """Change the automaton state recorded in this DFA state."""
- self.__state = state
+ def newState (self, parent_particle_state):
+ """Implement parent class method."""
return self
- def step (self, dfa_stack, ctd_instance, value, element_use):
- """Execute a step within the content model.
-
- This determines whether the current state in the content model allows
- a transition on the given value. If a transition can be performed,
- the instance element use corresponding to the value is used to record
- the value.
-
- The input value should be an instance of L{basis._TypeBinding_mixin},
- or a value that can be uniquely converted into such a instance using
- the transitions from the current state as clues.
-
- @param dfa_stack: The current state of the parse. Upon return, this may have been augmented with suspended content models.
- @type dfa_stack: L{DFAStack}
- @param value: A value upon which transition should occur.
- @type value: C{xml.dom.Node} or L{basis._TypeBinding_mixin} or other value
- @return: C{True} iff a transition successfully consumed the value
- """
-
- assert isinstance(ctd_instance, basis.complexTypeDefinition)
- self.__state = self.contentModel().step(ctd_instance, self.state(), value, element_use, dfa_stack)
- return self.__state is not None
-
- def isFinal (self):
- """Return C{True} iff the current state of the content model is a final state."""
- return self.contentModel().isFinal(self.state())
-
-class _MGAllState (object):
- """The state of a suspended interpretation of a L{ModelGroupAll}
- transition. This state comprises a set of alternatives, and optionally a
- L{DFAStack} corresponding to the current position within one of the
- alternatives.
- """
-
- __modelGroup = None
- __alternatives = None
- __currentStack = None
- __isFinal = None
-
- def __init__ (self, model_group):
- self.__modelGroup = model_group
- self.__alternatives = self.__modelGroup.alternatives()
-
- def step (self, dfa_stack, ctd_instance, value, element_use):
- """Execute a step within the model group.
-
- If an automaton stack is currently being executed, the step defers to
- that automaton. If a step is succesfully taken, the invocation
- returns; otherwise, the automaton stack is discarded.
-
- If no automaton stack is active, a step is attempted on each automaton
- remaining in the alternatives. If the step is successful, that
- automaton is recorded as being the current one for execution, and the
- invocation returns.
-
- If no automaton can be found within which progress can be made, the
- step fails.
-
- @param dfa_stack: The current state of the parse. Upon return, this
- may have been augmented with suspended content models.
- @type dfa_stack: L{DFAStack}
- @param value: A value upon which transition should occur.
- @type value: C{xml.dom.Node} or L{basis._TypeBinding_mixin} or other value
- @return: C{True} iff a transition was found that consumed the value.
- """
-
- assert isinstance(ctd_instance, basis.complexTypeDefinition)
- if self.__currentStack is not None:
- if self.__currentStack.step(ctd_instance, value, element_use):
- return True
- if not self.__currentStack.isTerminal():
- # I think this is probably a problem, but don't have an
- # example yet to use to analyze it. The issue is that we've
- # already committed to executing the automaton; if we end up
- # in a non-final state, then that execution failed, and
- # probably the whole validation should just abort.
- print '******** Non-terminal state reached in all group parsing; please contact support'
- self.__currentStack = None
- found_match = True
- for alt in self.__alternatives:
- try:
- new_stack = alt.contentModel().initialDFAStack()
- if new_stack.step(ctd_instance, value, element_use):
- self.__currentStack = new_stack
- self.__alternatives.remove(alt)
- return True
- except pyxb.BadDocumentError, e:
- #print 'Failed with alternative %s: %s' % (alt, type(e))
- pass
- return False
-
- def isFinal (self):
- """Return C{True} iff no required alternatives remain."""
- for alt in self.__alternatives:
- # Any required alternative that must consume a symbol prevents
- # this from being an acceptable final state for the model group.
- if alt.required() and not alt.contentModel().allowsEpsilonTransitionToFinal():
- #print "\n\n***Required alternative %s still present\n\n" % (alt,)
- return False
- return True
-
-class DFAStack (object):
- """A stack of states and content models representing the current status of
- an interpretation of a content model, including invocations of nested
- content models reached through L{ModelGroupAll} instances."""
-
- __stack = None
- def __init__ (self, content_model):
- self.__stack = []
- self.pushModelState(_DFAState(content_model))
-
- def pushModelState (self, model_state):
- """Add the given model state as the new top (actively executing) model ."""
- self.__stack.append(model_state)
- return model_state
-
- def isTerminal (self):
- """Return C{True} iff the stack is in a state where the top-level
- model execution has reached a final state."""
- return (0 == len(self.__stack)) or self.topModelState().isFinal()
-
- def popModelState (self):
- """Remove and return the model state currently being executed."""
- if 0 == len(self.__stack):
- raise pyxb.LogicError('Attempt to underflow content model stack')
- return self.__stack.pop()
-
- def topModelState (self):
- """Return a reference to the model state currently being executed.
-
- The state is not removed from the stack."""
- if 0 == len(self.__stack):
- raise pyxb.LogicError('Attempt to underflow content model stack')
- return self.__stack[-1]
-
- def step (self, ctd_instance, value, element_use):
- """Take a step using the value and the current model state.
-
- Execution of the step may add a new model state to the stack.
-
- @return: C{True} iff the value was consumed by a transition."""
- assert isinstance(ctd_instance, basis.complexTypeDefinition)
- if 0 == len(self.__stack):
- return False
- ok = self.topModelState().step(self, ctd_instance, value, element_use)
- if not ok:
- self.popModelState()
- return ok
-
-class ContentModelTransition (pyxb.cscRoot):
- """Represents a transition in the content model DFA.
-
- If the next object in the DOM model conforms to the specified term, it is
- consumed and the specified next state is entered."""
-
- def term (self):
- """The matching term for this transition to succeed."""
- if self.__term is None:
- self.__term = self.__elementUse.elementBinding()
- assert self.__term is not None
- return self.__term
- __term = None
-
- def currentStateRef (self):
- return self.__currentStateRef
- __currentStateRef = None
- def _currentStateRef (self, current_state_ref):
- self.__currentStateRef = current_state_ref
-
- def nextState (self):
- """The next state in the DFA"""
- return self.__nextState
- __nextState = None
-
- # The ElementUse instance used to store a successful match in the
- # complex type definition instance.
- def elementUse (self):
- return self.__elementUse
- __elementUse = None
-
- # Types of transition that can be taken, in order of preferred match
- TT_element = 0x01 #<<< The transition is on an element
- TT_modelGroupAll = 0x02 #<<< The transition is on an ALL model group
- TT_wildcard = 0x03 #<<< The transition is on a wildcard
-
- # What type of term this transition covers
- __termType = None
- def termType (self):
- return self.__termType
-
- def __init__ (self, next_state, element_use=None, term=None):
- """Create a transition to a new state upon receipt of a term,
- storing the successful match using the provided ElementUse."""
- self.__nextState = next_state
- assert self.__nextState is not None
- self.__elementUse = element_use
- if self.__elementUse is not None:
- self.__term = None
- self.__termType = self.TT_element
- else:
- self.__term = term
- assert self.__term is not None
- if isinstance(self.__term, ModelGroupAll):
- self.__termType = self.TT_modelGroupAll
- elif isinstance(self.__term, Wildcard):
- self.__termType = self.TT_wildcard
- else:
- raise pyxb.LogicError('Unexpected transition term %s' % (self.__term,))
-
- def __cmp__ (self, other):
- """Sort transitions so elements precede model groups precede
- wildcards. Also sort within each subsequence."""
- rv = cmp(self.__termType, other.__termType)
- if 0 == rv:
- # In a vain attempt at determinism, sort the element transitions
- # by name
- if (self.TT_element == self.__termType):
- rv = cmp(self.__elementUse.name(), other.__elementUse.name())
- else:
- rv = cmp(self.__term, other.__term)
+ def accepts (self, particle_state, instance, value, element_use):
+ rv = self._accepts(instance, value, element_use)
+ if rv:
+ particle_state.incrementCount()
return rv
- def __processElementTransition (self, value, element_use):
- # First, identify the element
+ def _accepts (self, instance, value, element_use):
+ if element_use == self:
+ self.setOrAppend(instance, value)
+ return True
+ if element_use is not None:
+ # If there's a known element, and it's not this one, the content
+ # does not match. This assumes we handled xsi:type and
+ # substitution groups earlier, which may be true.
+ return False
if isinstance(value, xml.dom.Node):
- # If we got here, it's because we couldn't figure out what element
- # the node conformed to and had to try the element transitions in
- # hopes of matching a wildcard. If we couldn't find an element
- # before, we can't do it now, so just fail.
- return None
+ # If we haven't been able to identify an element for this before,
+ # then we don't recognize it, and will have to treat it as a
+ # wildcard.
+ return False
try:
- # The _convert_string_values=False setting prevents string
- # arguments to element/type constructors from being automatically
- # converted to another type (like int) if they just happen to be
- # convertible. Without this, it's too easy to accept a
- # sub-optimal transition (e.g., match a float when an alternative
- # string is available).
- return self.term().compatibleValue(value, _convert_string_values=False)
+ self.setOrAppend(instance, self.__elementBinding.compatibleValue(value, _convert_string_values=False))
+ return True
except pyxb.BadTypeValueError, e:
- # Silently fail the transition
pass
- return None
-
- def __validateConsume (self, key, available_symbols_im, output_sequence_im, candidates):
- # Update candidates to incorporate the path abstraction associated
- # with the element that is this term.
-
- # Create a mutable copy of the available symbols
- next_symbols = available_symbols_im.copy()
-
- # If the transition is a loop back to the current state, or if the
- # transition is a simple type definition with variety list, we can
- # consume multiple instances. Might as well consume all of them.
- # When we do consume, we can do either one transition, or one
- # transition for each element in a list/vector.
- key_type = type(None)
- elt_plural = False
- if key is not None:
- key_type = key.elementBinding().typeDefinition()
- elt_plural = key.isPlural()
- multiple_values = False
- try:
- iter(next_symbols[key][0])
- multiple_values = True
- except TypeError:
- pass
-
- if (self.__nextState == self.__currentStateRef.state()):
- consume_all = True
- consume_singles = True
- else:
- consume_all = False
- consume_singles = True
- if consume_all:
- consumed = next_symbols[key]
- del next_symbols[key]
- else:
- # Make sure we pop from a copy of the available_symbols_im entry value.
- next_left = next_symbols[key][:]
- consumed = [ next_left.pop(0) ]
- if 0 == len(next_left):
- del next_symbols[key]
- else:
- next_symbols[key] = next_left
- if consume_singles:
- output_sequence = output_sequence_im + [ (key, _c) for _c in consumed ]
- else:
- output_sequence = output_sequence_im + [ (key, key_type(consumed)) ]
- assert (not (key in next_symbols)) or (0 < len(next_symbols[key]))
- candidate = (self.__nextState, next_symbols, output_sequence)
- candidates.append(candidate)
- return True
-
- def validate (self, available_symbols_im, output_sequence_im, candidates):
- """Determine whether it is possible to take this transition using the
- available symbols.
-
- @param available_symbols_im: As with L{ContentModel.validate}. The
- map will not be modified by this method.
-
- @param output_sequence_im: As with the return value of
- L{ContentModel.validate}. The sequence will not be modified by this
- event (it is used as a prefix for new candidates).
-
- @param candidates: A list of candidate validation paths.
-
- @return: C{True} iff the transition could be made."""
- if self.TT_element == self.__termType:
- if not (self.__elementUse in available_symbols_im):
- # No symbol available for this transition
- return False
- assert 0 < len(available_symbols_im[self.__elementUse])
- return self.__validateConsume(self.__elementUse, available_symbols_im, output_sequence_im, candidates)
- elif self.TT_modelGroupAll == self.__termType:
- return self.term().validate(available_symbols_im, output_sequence_im, self.__nextState, candidates)
- elif self.TT_wildcard == self.__termType:
- if not (None in available_symbols_im):
- return False
- assert 0 < len(available_symbols_im[None])
- return self.__validateConsume(None, available_symbols_im, output_sequence_im, candidates)
+ #print '%s %s %s in %s' % (instance, value, element_use, self)
return False
- def allowsEpsilonTransition (self):
- """Determine whether it is possible to take this transition without
- consuming any symbols.
-
- This is only possible if this is a transition to a final state using
- an "all" model group for which every alternative is effectively
- optional.
- """
- if self.TT_modelGroupAll != self.__termType:
+ def _validate (self, symbol_set, output_sequence):
+ values = symbol_set.get(self)
+ #print 'values %s' % (values,)
+ if values is None:
return False
- dfa_state = _MGAllState(self.__term)
- return dfa_state.isFinal()
-
- def attemptTransition (self, ctd_instance, value, element_use, dfa_stack):
- """Attempt to make the appropriate transition.
-
- @param ctd_instance: The binding instance for which we are attempting
- to set values by walking the content model.
- @type ctd_instance: L{basis.complexTypeDefinition}
-
- @param value: The potential value that would be consumed if this
- transition can be made.
- @type value: C{xml.dom.Node} or L{basis._TypeBinding_mixin}
-
- @param dfa_stack: The current state of processing this and enclosing
- content models.
- @type dfa_stack: L{DFAStack}
-
- @return: C{True} iff C{value} is acceptable for this transition
-
- """
-
- if self.TT_element == self.__termType:
- element = None
- # If the element use matched one of the terms in its state, we
- # would never have gotten here, so don't even try. We're only
- # walking the terms to see if an ALL or Wildcard is allowed.
- if (element_use is not None):
- return None
- element = self.__processElementTransition(value, element_use)
- if element is None:
- return False
- self.__elementUse.setOrAppend(ctd_instance, element)
- return True
- if self.TT_modelGroupAll == self.__termType:
- return dfa_stack.pushModelState(_MGAllState(self.__term)).step(dfa_stack, ctd_instance, value, element_use)
- if self.TT_wildcard == self.__termType:
- value_desc = 'value of type %s' % (type(value),)
- if isinstance(value, xml.dom.Node):
- value_desc = 'DOM node %s' % (pyxb.namespace.ExpandedName(value),)
- elif not isinstance(value, basis._TypeBinding_mixin):
- return False
- if not self.__term.matches(ctd_instance, value):
- raise pyxb.UnexpectedContentError(value)
- if not isinstance(value, basis._TypeBinding_mixin):
- print 'NOTE: Created unbound wildcard element from %s' % (value_desc,)
- assert isinstance(ctd_instance.wildcardElements(), list), 'Uninitialized wildcard list in %s' % (ctd_instance._ExpandedName,)
- ctd_instance._appendWildcardElement(value)
- return True
- raise pyxb.LogicError('Unexpected transition term %s' % (self.__term,))
-
-class ContentModelState (pyxb.cscRoot):
- """Represents a state in a ContentModel DFA.
-
- The state identifier is an integer. State 1 is the starting state of the
- DFA. A flag indicates whether the state is a legitimate final state for
- the DFA. The transitions are an ordered sequence of
- ContentModelTransition instances."""
-
- # Integer
- __state = None
- # Sequence of ContentModelTransition instances
- __transitions = None
-
- # Map from ElementUse instances to the term that transitions on that use.
- __elementTermMap = None
-
- def isFinal (self):
- """If True, this state can successfully complete the element
- reduction."""
- return self.__isFinal
- __isFinal = None
-
- def state (self):
- return self.__state
-
- def __init__ (self, state, is_final, transitions):
- self.__state = state
- self.__isFinal = is_final
- self.__transitions = transitions
- [ _t._currentStateRef(self) for _t in self.__transitions ]
- self.__transitions.sort()
- self.__elementTermMap = { }
- for t in self.__transitions:
- if t.TT_element == t.termType():
- assert t.elementUse() is not None
- self.__elementTermMap[t.elementUse()] = t
-
- def transitions (self):
- return self.__transitions
-
- def allowsEpsilonTransitionToFinal (self, content_model):
- """Determine can reach a final state in the content model without
- consuming anything."""
- if self.isFinal():
- return True
- for transition in self.__transitions:
- if transition.allowsEpsilonTransition() and content_model.isFinal(transition.nextState()):
- return True
- return False
-
- def evaluateContent (self, ctd_instance, value, element_use, dfa_stack):
- """Try to make a single transition with the given value.
-
- @param ctd_instance: The binding instance for which we are attempting
- to set values by walking the content model.
- @type ctd_instance: L{basis.complexTypeDefinition}
-
- @param value: The value that would be consumed if a transition can be
- made.
- @type value: C{xml.dom.Node} or L{basis._TypeBinding_mixin}
-
- @param element_use: The L{ElementUse<pyxb.binding.content.ElementUse>}
- corresponding to the provided value, if known (for example, because
- the value was parsed from an XML document).
-
- @param dfa_stack: The current state of processing this and enclosing
- content models.
- @type dfa_stack: L{DFAStack}
-
- @return: If a transition could be taken, the next state in the content
- model. C{None} if no transition could be taken and this state is
- final.
-
- @raise pyxb.UnrecognizedContentError: No transition on C{value} is
- possible, and this is not a final state.
- """
-
- if element_use is not None:
- transition = self.__elementTermMap.get(element_use)
- if transition is not None:
- element_use.setOrAppend(ctd_instance, value)
- return transition.nextState()
- # Might get here legitimately if we need to descend into ALL, or
- # if this value is a wildcard for which we happen to have a
- # binding class available.
- for transition in self.__transitions:
- if transition.attemptTransition(ctd_instance, value, element_use, dfa_stack):
- return transition.nextState()
- if self.isFinal():
- return None
- raise pyxb.UnrecognizedContentError(value, element_use=element_use)
-
-class ContentModel (pyxb.cscRoot):
- """The ContentModel is a deterministic finite state automaton which can be
- traversed using a sequence of DOM nodes which are matched on transitions
- against the legal content model of a complex type."""
-
- # Map from integers to ContentModelState instances
- __stateMap = None
-
- # All DFAs begin at this state
- __InitialState = 1
-
- def __init__ (self, state_map=None):
- self.__stateMap = state_map
-
- def initialDFAStack (self):
- return DFAStack(self)
-
- def step (self, ctd_instance, state, value, element_use, dfa_stack):
- """Perform a single step in the content model. This is a pass-through
- to L{ContentModelState.evaluateContent} for the appropriate state.
-
- @param state: The starting state in this content model.
- @type state: C{int}
- """
-
- return self.__stateMap[state].evaluateContent(ctd_instance, value, element_use, dfa_stack)
-
- def isFinal (self, state):
- return self.__stateMap[state].allowsEpsilonTransitionToFinal(self)
-
- def allowsEpsilonTransitionToFinal (self):
- return self.__stateMap[self.__InitialState].allowsEpsilonTransitionToFinal(self)
-
- def validate (self, available_symbols, succeed_at_dead_end=False):
- """Determine whether this content model can be satisfied using the
- provided elements.
-
- The general idea is to treat the transitions of the DFA as symbols in
- an alphabet. For each such transition, a sequence of values is
- provided to be associated with the transition. One transition is
- permitted for each value associated with the symbol. The symbol (key)
- C{None} represents wildcard values.
-
- If a path is found that uses every symbol in valid transitions and
- ends in a final state, the return value is a pair consisting of the
- unconsumed symbols and a sequence of term, value pairs that define the
- acceptable path. If no valid path through the DFA can be taken,
- C{None} is returned.
-
- @param available_symbols: A map from leaf DFA terms to a sequence of
- values associated with the term in a binding instance. The key
- C{None} is used to represent wildcard elements. If a key appears in
- this map, it must have at least one value in its sequence.
-
- @param succeed_at_dead_end: If C{True}, states from which no
- transition can be made are accepted as final states. This is used
- when processing "all" model groups, where the content model for the
- current alternative must succeed while retaining the symbols that are
- needed for other alternatives.
- """
-
- candidates = []
- candidates.append( (1, available_symbols, []) )
- while 0 < len(candidates):
- (state_id, symbols, sequence) = candidates.pop(0)
- state = self.__stateMap[state_id]
- if 0 == len(symbols):
- # No symbols available for transitions in this state. If this
- # places us in a final state, we've got a successful path and
- # should return it. Otherwise, this path failed, and we go on
- # to the next candidate.
- if state.allowsEpsilonTransitionToFinal(self):
- return (symbols, sequence)
- continue
- # Collect all the alternatives that are possible by taking
- # transitions from this state.
- num_transitions = 0
- for transition in state.transitions():
- num_transitions += transition.validate(symbols, sequence, candidates)
- if (0 == num_transitions) and succeed_at_dead_end:
- return (symbols, sequence)
- return None
-
-class ModelGroupAllAlternative (pyxb.cscRoot):
- """Represents a single alternative in an "all" model group."""
-
- def contentModel (self):
- """The content model definition for the alternative."""
- return self.__contentModel
- __contentModel = None
-
- def required (self):
- """True iff this alternative must be present (min_occurs=1)"""
- return self.__required
- __required = None
+ used = values.pop(0)
+ output_sequence.append( (self, used) )
+ if 0 == len(values):
+ del symbol_set[self]
+ return True
- def __init__ (self, content_model, required):
- #print '%s created MGA alternative model %s required %s' % (self, content_model, required)
- self.__contentModel = content_model
- self.__required = required
+ def __str__ (self):
+ return 'EU.%s@%x' % (self.__name, id(self))
-class ModelGroupAll (pyxb.cscRoot):
- """Content model class that represents a ModelGroup with an "all"
- compositor."""
-
- __alternatives = None
- def alternatives (self):
- return set(self.__alternatives)
-
- def __init__ (self, alternatives):
- self.__alternatives = alternatives
-
- def validate (self, available_symbols_im, output_sequence_im, next_state, candidates):
- num_matches = 0
- alternatives = set(self.__alternatives)
- symbols = available_symbols_im
- output_sequence = output_sequence_im[:]
- found_match = True
- while (0 < len(alternatives)) and found_match:
- found_match = False
- for alt in alternatives:
- path = alt.contentModel().validate(symbols, succeed_at_dead_end=True)
- if path is None:
- break
- (new_symbols, new_sequence) = path
- found_match = (0 < len(new_sequence))
- if found_match:
- output_sequence.extend(new_sequence)
- symbols = new_symbols
- alternatives.remove(alt)
- found_match = True
- break
- for alt in alternatives:
- if alt.required():
- return False
- candidates.append( (next_state, symbols, output_sequence) )
- return True
-
-class Wildcard (pyxb.cscRoot):
+class Wildcard (ContentState_mixin, ContentModel_mixin):
"""Placeholder for wildcard objects."""
NC_any = '##any' #<<< The namespace constraint "##any"
@@ -1134,21 +669,467 @@ class Wildcard (pyxb.cscRoot):
__processContents = None
def processContents (self): return self.__processContents
+ def __normalizeNamespace (self, nsv):
+ if nsv is None:
+ return None
+ if isinstance(nsv, basestring):
+ nsv = pyxb.namespace.NamespaceForURI(nsv, create_if_missing=True)
+ assert isinstance(nsv, pyxb.namespace.Namespace), 'unexpected non-namespace %s' % (nsv,)
+ return nsv
+
def __init__ (self, *args, **kw):
# Namespace constraint and process contents are required parameters.
- self.__namespaceConstraint = kw['namespace_constraint']
+ nsc = kw['namespace_constraint']
+ if isinstance(nsc, tuple):
+ nsc = (nsc[0], self.__normalizeNamespace(nsc[1]))
+ elif isinstance(nsc, set):
+ nsc = set([ self.__normalizeNamespace(_uri) for _uri in nsc ])
+ self.__namespaceConstraint = nsc
self.__processContents = kw['process_contents']
- def matches (self, ctd_instance, value):
+ def matches (self, instance, value):
"""Return True iff the value is a valid match against this wildcard.
- Not implemented yet: all wildcards are assumed to match all values.
-
+ Validation per U{Wildcard allows Namespace Name<http://www.w3.org/TR/xmlschema-1/#cvc-wildcard-namespace>}.
"""
+
+ ns = None
+ if isinstance(value, xml.dom.Node):
+ if value.namespaceURI is not None:
+ ns = pyxb.namespace.NamespaceForURI(value.namespaceURI)
+ elif isinstance(value, basis._TypeBinding_mixin):
+ elt = value._element()
+ if elt is not None:
+ ns = elt.name().namespace()
+ else:
+ ns = value._ExpandedName.namespace()
+ else:
+ raise pyxb.LogicError('Need namespace from value')
+ if isinstance(ns, pyxb.namespace.Namespace) and ns.isAbsentNamespace():
+ ns = None
+ if self.NC_any == self.__namespaceConstraint:
+ return True
+ if isinstance(self.__namespaceConstraint, tuple):
+ (_, constrained_ns) = self.__namespaceConstraint
+ assert self.NC_not == _
+ if ns is None:
+ return False
+ if constrained_ns == ns:
+ return False
+ return True
+ return ns in self.__namespaceConstraint
+
+ def newState (self, parent_particle_state):
+ return self
+
+ def accepts (self, particle_state, instance, value, element_use):
+ if isinstance(value, xml.dom.Node):
+ value_desc = 'value in %s' % (value.nodeName,)
+ else:
+ value_desc = 'value of type %s' % (type(value),)
+ if not self.matches(instance, value):
+ return False
+ if not isinstance(value, basis._TypeBinding_mixin):
+ print 'NOTE: Created unbound wildcard element from %s' % (value_desc,)
+ assert isinstance(instance.wildcardElements(), list), 'Uninitialized wildcard list in %s' % (instance._ExpandedName,)
+ instance._appendWildcardElement(value)
+ particle_state.incrementCount()
+ return True
+
+ def _validate (self, symbol_set, output_sequence):
# @todo check node against namespace constraint and process contents
#print 'WARNING: Accepting node as wildcard match without validating.'
+ wc_values = symbol_set.get(None)
+ if wc_values is None:
+ return False
+ used = wc_values.pop(0)
+ output_sequence.append( (None, used) )
+ if 0 == len(wc_values):
+ del symbol_set[None]
return True
+class SequenceState (ContentState_mixin):
+ __failed = False
+ __satisfied = False
+
+ def __init__ (self, group, parent_particle_state):
+ super(SequenceState, self).__init__(group)
+ self.__sequence = group
+ self.__parentParticleState = parent_particle_state
+ self.__particles = group.particles()
+ self.__index = -1
+ self.__satisfied = False
+ self.__failed = False
+ self.notifyFailure(None, False)
+ #print 'SS.CTOR %s: %d elts' % (self, len(self.__particles))
+
+ def accepts (self, particle_state, instance, value, element_use):
+ assert self.__parentParticleState == particle_state
+ assert not self.__failed
+ #print 'SS.ACC %s: %s %s %s' % (self, instance, value, element_use)
+ while self.__particleState is not None:
+ (consume, underflow_exc) = self.__particleState.step(instance, value, element_use)
+ if consume:
+ return True
+ if underflow_exc is not None:
+ self.__failed = True
+ raise underflow_exc
+ return False
+
+ def _verifyComplete (self, parent_particle_state):
+ while self.__particleState is not None:
+ self.__particleState.verifyComplete()
+
+ def notifyFailure (self, sub_state, particle_ok):
+ self.__index += 1
+ self.__particleState = None
+ if self.__index < len(self.__particles):
+ self.__particleState = ParticleState(self.__particles[self.__index], self)
+ else:
+ self.__satisfied = particle_ok
+ if particle_ok:
+ self.__parentParticleState.incrementCount()
+ #print 'SS.NF %s: %d %s %s' % (self, self.__index, particle_ok, self.__particleState)
+
+class ChoiceState (ContentState_mixin):
+ def __init__ (self, group, parent_particle_state):
+ self.__parentParticleState = parent_particle_state
+ super(ChoiceState, self).__init__(group)
+ self.__choices = [ ParticleState(_p, self) for _p in group.particles() ]
+ self.__activeChoice = None
+ #print 'CS.CTOR %s: %d choices' % (self, len(self.__choices))
+
+ def accepts (self, particle_state, instance, value, element_use):
+ #print 'CS.ACC %s %s: %s %s %s' % (self, self.__activeChoice, instance, value, element_use)
+ if self.__activeChoice is None:
+ for choice in self.__choices:
+ #print 'CS.ACC %s candidate %s' % (self, choice)
+ try:
+ (consume, underflow_exc) = choice.step(instance, value, element_use)
+ except Exception, e:
+ consume = False
+ underflow_exc = e
+ #print 'CS.ACC %s: candidate %s : %s' % (self, choice, consume)
+ if consume:
+ self.__activeChoice = choice
+ self.__choices = None
+ return True
+ return False
+ (consume, underflow_exc) = self.__activeChoice.step(instance, value, element_use)
+ #print 'CS.ACC %s : active choice %s %s %s' % (self, self.__activeChoice, consume, underflow_exc)
+ if consume:
+ return True
+ if underflow_exc is not None:
+ self.__failed = True
+ raise underflow_exc
+ return False
+
+ def _verifyComplete (self, parent_particle_state):
+ rv = True
+ #print 'CS.VC %s: %s' % (self, self.__activeChoice)
+ if self.__activeChoice is None:
+ # Use self.__activeChoice as the iteration value so that it's
+ # non-None when notifyFailure is invoked.
+ for self.__activeChoice in self.__choices:
+ try:
+ #print 'CS.VC: try %s' % (self.__activeChoice,)
+ self.__activeChoice.verifyComplete()
+ return
+ except Exception, e:
+ pass
+ #print 'Missing components %s' % ("\n".join([ "\n ".join([str(_p2.term()) for _p2 in _p.particle().term().particles()]) for _p in self.__choices ]),)
+ raise pyxb.MissingContentError('choice')
+ self.__activeChoice.verifyComplete()
+
+ def notifyFailure (self, sub_state, particle_ok):
+ #print 'CS.NF %s %s' % (self, particle_ok)
+ if particle_ok and (self.__activeChoice is not None):
+ self.__parentParticleState.incrementCount()
+ pass
+
+class AllState (ContentState_mixin):
+ __activeChoice = None
+ __needRetry = False
+ def __init__ (self, group, parent_particle_state):
+ self.__parentParticleState = parent_particle_state
+ super(AllState, self).__init__(group)
+ self.__choices = set([ ParticleState(_p, self) for _p in group.particles() ])
+ #print 'AS.CTOR %s: %d choices' % (self, len(self.__choices))
+
+ def accepts (self, particle_state, instance, value, element_use):
+ #print 'AS.ACC %s %s: %s %s %s' % (self, self.__activeChoice, instance, value, element_use)
+ self.__needRetry = True
+ while self.__needRetry:
+ self.__needRetry = False
+ if self.__activeChoice is None:
+ for choice in self.__choices:
+ #print 'AS.ACC %s candidate %s' % (self, choice)
+ try:
+ (consume, underflow_exc) = choice.step(instance, value, element_use)
+ except Exception, e:
+ consume = False
+ underflow_exc = e
+ #print 'AS.ACC %s: candidate %s : %s' % (self, choice, consume)
+ if consume:
+ self.__activeChoice = choice
+ self.__choices.discard(self.__activeChoice)
+ return True
+ return False
+ (consume, underflow_exc) = self.__activeChoice.step(instance, value, element_use)
+ #print 'AS.ACC %s : active choice %s %s %s' % (self, self.__activeChoice, consume, underflow_exc)
+ if consume:
+ return True
+ if underflow_exc is not None:
+ self.__failed = True
+ raise underflow_exc
+ return False
+
+ def _verifyComplete (self, parent_particle_state):
+ #print 'AS.VC %s: %s, %d left' % (self, self.__activeChoice, len(self.__choices))
+ if self.__activeChoice is not None:
+ self.__activeChoice.verifyComplete()
+ while self.__choices:
+ self.__activeChoice = self.__choices.pop()
+ self.__activeChoice.verifyComplete()
+
+ def notifyFailure (self, sub_state, particle_ok):
+ #print 'AS.NF %s %s' % (self, particle_ok)
+ self.__needRetry = True
+ self.__activeChoice = None
+ if particle_ok and (0 == len(self.__choices)):
+ self.__parentParticleState.incrementCount()
+
+class ParticleState (pyxb.cscRoot):
+ def __init__ (self, particle, parent_state=None):
+ self.__particle = particle
+ self.__parentState = parent_state
+ self.__count = -1
+ #print 'PS.CTOR %s: particle %s' % (self, particle)
+ self.incrementCount()
+
+ def particle (self): return self.__particle
+
+ def incrementCount (self):
+ #print 'PS.IC %s' % (self,)
+ self.__count += 1
+ self.__termState = self.__particle.term().newState(self)
+ self.__tryAccept = True
+
+ def verifyComplete (self):
+ # @TODO@ Set a flag so we can make verifyComplete safe to call
+ # multiple times?
+ #print 'PS.VC %s entry' % (self,)
+ if not self.__particle.satisfiesOccurrences(self.__count):
+ self.__termState._verifyComplete(self)
+ if not self.__particle.satisfiesOccurrences(self.__count):
+ print 'PS.VC %s incomplete' % (self,)
+ raise pyxb.MissingContentError('incomplete')
+ if self.__parentState is not None:
+ self.__parentState.notifyFailure(self, True)
+
+ def step (self, instance, value, element_use):
+ """Attempt to apply the value as a new instance of the particle's term.
+
+ The L{ContentState_mixin} created for the particle's term is consulted
+ to determine whether the instance can accept the given value. If so,
+ the particle's maximum occurrence limit is checked; if not, and the
+ particle has a parent state, it is informed of the failure.
+
+ @param instance: An instance of a subclass of
+ {basis.complexTypeDefinition}, into which the provided value will be
+ stored if it is consistent with the current model state.
+
+ @param value: The value that is being validated against the state.
+
+ @param element_use: An optional L{ElementUse} instance that specifies
+ the element to which the value corresponds. This will be available
+ when the value is extracted by parsing a document, but will be absent
+ if the value was passed as a constructor positional parameter.
+
+ @return: C{( consumed, underflow_exc )} A tuple where the first element
+ is C{True} iff the provided value was accepted in the current state.
+ When this first element is C{False}, the second element will be
+ C{None} if the particle's occurrence requirements have been met, and
+ is an instance of C{MissingElementError} if the observed number of
+ terms is less than the minimum occurrence count. Depending on
+ context, the caller may raise this exception, or may try an
+ alternative content model.
+
+ @raise pyxb.UnexpectedElementError: if the value satisfies the particle,
+ but having done so exceeded the allowable number of instances of the
+ term.
+ """
+
+ #print 'PS.STEP %s: %s %s %s' % (self, instance, value, element_use)
+
+ # Only try if we're not already at the upper limit on occurrences
+ consumed = False
+ underflow_exc = None
+
+ # We can try the value against the term if we aren't at the maximum
+ # count for the term. Also, if we fail to consume, but as a side
+ # effect of the test the term may have reset itself, we can try again.
+ self.__tryAccept = True
+ while self.__tryAccept and (self.__count != self.__particle.maxOccurs()):
+ self.__tryAccept = False
+ consumed = self.__termState.accepts(self, instance, value, element_use)
+ #print 'PS.STEP %s: ta %s %s' % (self, self.__tryAccept, consumed)
+ self.__tryAccept = self.__tryAccept and (not consumed)
+ #print 'PS.STEP %s: %s' % (self, consumed)
+ if consumed:
+ if not self.__particle.meetsMaximum(self.__count):
+ raise pyxb.UnexpectedElementError('too many')
+ else:
+ if self.__parentState is not None:
+ self.__parentState.notifyFailure(self, self.__particle.satisfiesOccurrences(self.__count))
+ if not self.__particle.meetsMinimum(self.__count):
+ # @TODO@ Use better exception; changing this will require
+ # changing some unit tests.
+ #underflow_exc = pyxb.MissingElementError('too few')
+ underflow_exc = pyxb.UnrecognizedContentError('too few')
+ return (consumed, underflow_exc)
+
+ def __str__ (self):
+ particle = self.__particle
+ return 'ParticleState(%d:%d,%s:%s)@%x' % (self.__count, particle.minOccurs(), particle.maxOccurs(), particle.term(), id(self))
+
+class ParticleModel (ContentModel_mixin):
+ """Content model dealing with particles: terms with occurrence restrictions"""
+
+ def minOccurs (self): return self.__minOccurs
+ def maxOccurs (self): return self.__maxOccurs
+ def term (self): return self.__term
+
+ def meetsMaximum (self, count):
+ """@return: C{True} iff there is no maximum on term occurrence, or the
+ provided count does not exceed that maximum"""
+ return (self.__maxOccurs is None) or (count <= self.__maxOccurs)
+
+ def meetsMinimum (self, count):
+ """@return: C{True} iff the provided count meets the minimum number of
+ occurrences"""
+ return count >= self.__minOccurs
+
+ def satisfiesOccurrences (self, count):
+ """@return: C{True} iff the provided count satisfies the occurrence
+ requirements"""
+ return self.meetsMinimum(count) and self.meetsMaximum(count)
+
+ def __init__ (self, term, min_occurs=1, max_occurs=1):
+ self.__term = term
+ self.__minOccurs = min_occurs
+ self.__maxOccurs = max_occurs
+
+ def newState (self):
+ return ParticleState(self)
+
+ def validate (self, symbol_set):
+ """Determine whether the particle requirements are satisfiable by the
+ given symbol set.
+
+ The symbol set represents letters in an alphabet. If those letters
+ can be combined in a way that satisfies the regular expression
+ expressed in the model, a satisfying sequence is returned and the
+ symbol set is reduced by the letters used to form the sequence. If
+ the content model cannot be satisfied, C{None} is returned and the
+ symbol set remains unchanged.
+
+ @param symbol_set: A map from L{ElementUse} instances to a list of
+ values. The order of the values corresponds to the order in which
+ they should appear. A key of C{None} identifies values that are
+ stored as wildcard elements. Values are removed from the lists as
+ they are used; when the last value of a list is removed, its key is
+ removed from the map. Thus an empty dictionary is the indicator that
+ no more symbols are available.
+
+ @return: returns C{None}, or a list of tuples C{( eu, val )} where
+ C{eu} is an L{ElementUse} from the set of symbol keys, and C{val} is a
+ value from the corresponding list.
+ """
+
+ output_sequence = []
+ #print 'Start: %d %s %s : %s' % (self.__minOccurs, self.__maxOccurs, self.__term, symbol_set)
+ result = self._validate(symbol_set, output_sequence)
+ #print 'End: %s %s %s' % (result, symbol_set, output_sequence)
+ if result:
+ return (symbol_set, output_sequence)
+ return None
+
+ def _validate (self, symbol_set, output_sequence):
+ symbol_set_mut = self._validateCloneSymbolSet(symbol_set)
+ output_sequence_mut = self._validateCloneOutputSequence(output_sequence)
+ count = 0
+ #print 'VAL start %s: %d %s' % (self.__term, self.__minOccurs, self.__maxOccurs)
+ last_size = len(output_sequence_mut)
+ while (count != self.__maxOccurs) and self.__term._validate(symbol_set_mut, output_sequence_mut):
+ #print 'VAL %s old cnt %d, left %s' % (self.__term, count, symbol_set_mut)
+ this_size = len(output_sequence_mut)
+ if this_size == last_size:
+ # Validated without consuming anything. Assume we can
+ # continue to do so, jump to the minimum, and exit.
+ if count < self.__minOccurs:
+ count = self.__minOccurs
+ break
+ count += 1
+ last_size = this_size
+ result = self.satisfiesOccurrences(count)
+ if (result):
+ self._validateReplaceResults(symbol_set, symbol_set_mut, output_sequence, output_sequence_mut)
+ #print 'VAL end PRT %s res %s: %s %s %s' % (self.__term, result, self.__minOccurs, count, self.__maxOccurs)
+ return result
+
+class _Group (ContentModel_mixin):
+ """Base class for content information pertaining to a U{model
+ group<http://www.w3.org/TR/xmlschema-1/#Model_Groups>}.
+
+ There is a specific subclass for each group compositor.
+ """
+
+ _StateClass = None
+ """A reference to a L{ContentState_mixin} class that maintains state when
+ validating an instance of this group."""
+
+ def particles (self): return self.__particles
+
+ def __init__ (self, *particles):
+ self.__particles = particles
+
+ def newState (self, parent_particle_state):
+ return self._StateClass(self, parent_particle_state)
+
+ # All and Sequence share the same validation code, so it's up here.
+ def _validate (self, symbol_set, output_sequence):
+ symbol_set_mut = self._validateCloneSymbolSet(symbol_set)
+ output_sequence_mut = self._validateCloneOutputSequence(output_sequence)
+ for p in self.particles():
+ if not p._validate(symbol_set_mut, output_sequence_mut):
+ return False
+ self._validateReplaceResults(symbol_set, symbol_set_mut, output_sequence, output_sequence_mut)
+ return True
+
+
+class GroupChoice (_Group):
+ _StateClass = ChoiceState
+
+ # Choice requires a different validation algorithm
+ def _validate (self, symbol_set, output_sequence):
+ reset_mutables = True
+ for p in self.particles():
+ if reset_mutables:
+ symbol_set_mut = self._validateCloneSymbolSet(symbol_set)
+ output_sequence_mut = self._validateCloneOutputSequence(output_sequence)
+ if p._validate(symbol_set_mut, output_sequence_mut):
+ self._validateReplaceResults(symbol_set, symbol_set_mut, output_sequence, output_sequence_mut)
+ return True
+ reset_mutables = len(output_sequence) != len(output_sequence_mut)
+ return False
+
+class GroupAll (_Group):
+ _StateClass = AllState
+
+class GroupSequence (_Group):
+ _StateClass = SequenceState
+
## Local Variables:
## fill-column:78
## End: