# -*- coding: utf-8; mode: python; -*- # # GStreamer Inspector - Multimedia system plugin introspection # # Copyright (C) 2007 René Stadler # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the Free # Software Foundation; either version 3 of the License, or (at your option) # any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # more details. # # You should have received a copy of the GNU General Public License along with # this program. If not, see . """GStreamer Inspector introspection data access and processing model.""" # Depending on pygobject 2.12.3 because of bug #353943. _REQUIRED_PYGOBJECT = (2, 12, 3,) # Before gst-python 0.10.4.2, access to static caps of static pad templates can # cause crashes. _REQUIRED_PYGST = (0, 10, 4, 2,) # If the cache file format changes (*Data classes), adjust these values. Bump # the magic number for backward incompatible changes, the revision for backward # compatible ones. _DATA_MAGIC = "gst-inspector-data-cache-1" _DATA_REVISION = 2 if __name__ != "GstInspector.Data": # We have to insist on consistent import semantics. If we would allow # "import Data" from within the package, we would create a cache file where # the pickled classes' __module__ attribute refer to a module named "Data". # That could not be read if a reader uses the correct "from GstInspector # import Data". raise ImportError ("module must be imported from the GstInspector package") import sys import os from itertools import chain import logging import pygtk pygtk.require ("2.0") del pygtk import gobject from GstInspector import utils gst = None _named_ranks = None def gst_value_serialize (value): # As they should, the bindings do not expose GstValue stuff, by mapping # G(st)Values back and forth to python objects (as much as possible). # Unfortunately, gst_value_serialize is not exposed separately. Therefore, # this little hack is needed which uses the serialization of gst.Structure. # The argument is a regular python type instance for integers, lists and # the like and is mapped back to the correct GstValue by # gst.Structure.set_value. structure = gst.Structure ("dummy") structure.set_value ("dummy", value) s = structure.to_string () # Since nested structure serialization support was added to gst, a # semicolon gets appended at the end: if s.endswith (";"): s = s[:-1] # The string looks like "dummy, dummy=(int){ 8, 16 }". Everything # after the closing parenthesis is the serialized value. try: return s[s.index (")") + 1:] except ValueError: # Substring not found. raise ValueError ("cannot serialize %r" % (value,)) class FixedParamSpec (object): def __init__ (self, pspec, fixed_default): for attr in ("name", "nick", "blurb", "flags", "owner_type", "value_type", "minimum", "maximum",): try: setattr (self, attr, getattr (pspec, attr)) except (AttributeError, SystemError,): pass self.default_value = fixed_default class FactoryHelper (object): """Helper class to obtain introspection data from gst.ElementFactory objects.""" def __init__ (self, factory): self.logger = logging.getLogger ("data.factory-helper") self.factory = factory self.gtype = factory.get_element_type () self.skip_types = [gobject.GObject, gst.Object, gst.Element, gst.Bin] self.__element_instance = None def get_element (self): if self.__element_instance is None: factory_name = self.factory.get_name () self.logger.debug ("creating element from %s factory", factory_name) self.__element_instance = self.factory.create ("dummy") if self.__element_instance is None: msg = "Failed to create element from %s factory" % (factory_name,) raise gst.ElementNotFoundError (msg) return self.__element_instance def iter_interface_names (self): return (interface.name for interface in self.gtype.interfaces) def get_uri_protocols (self): factory = self.factory try: return factory.get_uri_protocols () except AttributeError: # pygst bug #385841, using workaround: if not gst.URIHandler.__gtype__ in self.gtype.interfaces: return () try: return self.get_element ().get_protocols () except gst.ElementNotFoundError: return () def get_uri_type (self): factory = self.factory value = factory.get_uri_type () try: return value.value_nick except AttributeError: # pygst bug #436620, using workaround: value = {int (gst.URI_SRC) : gst.URI_SRC, int (gst.URI_SINK) : gst.URI_SINK}.get (value, gst.URI_UNKNOWN) return value.value_nick def iter_gtype_hierarchy (self): gtype = self.gtype # Going further than GstElement is useless; all elements derive from # GObject -> GstObject -> GstElement. final_gtype = gst.Element.__gtype__ while gtype != gobject.TYPE_INVALID: if gtype == final_gtype: break yield gtype gtype = gtype.parent def iter_class_name_hierarchy (self): hierarchy = reversed (list (self.iter_gtype_hierarchy ())) return (gtype.name for gtype in hierarchy) def iter_properties (self): skip = [class_.__gtype__ for class_ in self.skip_types] for pspec in gobject.list_properties (self.gtype): if pspec.owner_type in skip and not pspec.owner_type == self.gtype: continue yield pspec def iter_properties_check_override (self): for pspec in self.iter_properties (): if pspec.owner_type != self.gtype and pspec.flags & gobject.PARAM_READABLE: try: current_value = self.get_element ().get_property (pspec.name) except (gst.ElementNotFoundError, SystemError,): pass else: if current_value != pspec.default_value: yield FixedParamSpec (pspec, current_value) continue yield pspec def iter_signal_ids (self): this_gtype = self.gtype skip = [class_.__gtype__ for class_ in self.skip_types] for gtype in reversed (list (self.iter_gtype_hierarchy ())): if gtype in skip and not gtype == this_gtype: continue for signal_id in gobject.signal_list_ids (gtype): yield signal_id class Pool (object): def __init__ (self): self._pool = {} def __call__ (self, obj): if obj is None: return obj o = self._pool.get (obj) if o is None: self._pool[obj] = obj return obj else: return o _pool = Pool () pool = _pool.__call__ class DataObjectMeta (type): """Meta class for all data object classes.""" def __init__ (cls, name, bases, dict_): super (DataObjectMeta, cls).__init__ (name, bases, dict_) cls._superslots = tuple (chain (*(getattr (c, "__slots__", ()) for c in cls.mro ()))) class DataObject (object): """Base class for all data object classes.""" __metaclass__ = DataObjectMeta __slots__ = () def __getstate__ (self): return dict (((n, getattr (self, n),) for n in self._superslots if hasattr (self, n))) def __setstate__ (self, state): for name, value in state.items (): setattr (self, name, value) def __eq__ (self, other): """x.__eq__ (y) <==> x == y""" try: return self.__getstate__ () == other.__getstate__ () except AttributeError: return NotImplemented def __ne__ (self, other): """x.__ne__ (y) <==> x != y""" try: return self.__getstate__ () != other.__getstate__ () except AttributeError: return NotImplemented class IdentificationData (DataObject): """Data object that identifies a cache stream of pickled data objects as such.""" __slots__ = ("message", "magic", "revision", "python_version", "gst_version", "pygst_version", "named_ranks",) def __init__ (self): # In case someone faces the cache file without knowing what it is, this # little message provides some insight. magic_message = ("This file is a stream of pickled Python objects. It " "is created by a program called 'GStreamer Inspector' " "(gst-inspector). It is save to delete this file; it " "is an automatically generated data cache that will " "be regenerated if lost.") self.message = "*** %s ***" % (magic_message,) self.magic = _DATA_MAGIC self.revision = _DATA_REVISION self.python_version = tuple (sys.version_info) self.gst_version = tuple (gst.get_gst_version ()) self.pygst_version = tuple (gst.get_pygst_version ()) self.named_ranks = _named_ranks def get_rank_name (self, rank): try: i = list (self.named_ranks).index (rank) return self.named_ranks[i + 1] except ValueError: raise KeyError ("No rank name for %r" % (rank,)) class DataErrorData (object): """Data object that carries information about failure of the data acquisition process.""" def __init__ (self, exc_info, obj): from traceback import format_exception exc_type, exc_value, exc_tb = exc_info self.exc_type_name = exc_type.__name__ self.error = str (exc_value) self.traceback = "".join (format_exception (*exc_info)) if obj is not None: self.object_type = str (type (obj)) self.object_name = obj.name else: self.object_type = None self.object_name = None class AccessibleDataObject (DataObject): """Base class for all data object classes that support our accessor protocol.""" __slots__ = () @classmethod def _accessor (cls, first, *rest): from operator import attrgetter if not first: raise ValueError ("illegal spec %r, rest %r" % (first, rest,)) getter = attrgetter (first) if not rest: return getter else: type_ = cls._accessor_type (first, *rest) if type_ == "traverse": type_ = cls._accessor_list_type (first, *rest) sub_getter = type_._accessor (*rest) def get (obj): return [sub_getter (x) for x in getter (obj)] return get else: sub_getter = type_._accessor (*rest) def get (obj): return sub_getter (getter (obj)) return get @classmethod def _accessor_transform (cls, from_type, *spec_fields): if from_type == cls: return ".".join ((cls._accessor_name,) + spec_fields) if spec_fields: try: sub_type = from_type._accessor_type (*spec_fields) except ValueError: pass else: if sub_type == "traverse": sub_type = from_type._accessor_list_type (*spec_fields) if sub_type == cls: return "%s.%s" % (cls._accessor_name, ".".join (spec_fields[1:]),) for base in (cls,) + cls.__mro__: try: transforms = base._accessor_transforms except AttributeError: continue try: new_spec = transforms[from_type] except KeyError: pass else: return "%s.%s.%s" % (cls._accessor_name, new_spec, ".".join (spec_fields),) else: raise ValueError ("%r does not support accessor transformation from %r" % (cls, from_type,)) @classmethod def _accessor_type (cls, *fields): first = fields[0] try: accessor_traverse = cls._accessor_traverse except AttributeError: pass else: if first in cls._accessor_traverse: return "traverse" try: accessor_types = cls._accessor_types except AttributeError: # Raising ValueError below. pass else: try: return accessor_types[first] except KeyError: pass spec = ".".join (fields) raise ValueError ("%s has no accessor type for %r (spec %r)" % (cls.__name__, first, spec,)) @classmethod def _accessor_list_type (cls, *fields): first = fields[0] try: return cls._accessor_traverse[first] except KeyError: # TODO: Error should make clear that the class is implementing the # protocol incorrectly. spec = ".".join (fields) raise ValueError ("%s accessor for %r is not a list type (spec %r)" % (cls.__name__, first, spec,)) @classmethod def _accessor_chain_level (cls, first, *rest): try: traverse = cls._accessor_traverse except AttributeError: return 0 try: type_ = traverse[first] except KeyError: if rest: spec = ".".join ((first,) + rest) raise ValueError ("%s class cannot resolve accessor spec %r" % (cls.__name__, spec,)) return 0 else: if rest: if type_ is None: raise ValueError ("%s cannot resolve sub accessor spec %r" % (cls.__name__,".".join (rest),)) sub_level = type_._accessor_chain_level (*rest) else: sub_level = 0 return sub_level + 1 class AuthorData (AccessibleDataObject): """Data object describing a single element author.""" __slots__ = ("name", "email",) _accessor_name = "author" def __init__ (self, name, email = None): self.name = name self.email = email @classmethod def from_string (cls, author_string): """Parse given string and return a new AuthorData instance. The given string may only denote the name (and optionally email address) of a single author.""" s = author_string.strip () if s.endswith (">") and "<" in s: name = s[:s.rindex ("<")].strip () email = s[s.rindex ("<") + 1:-1].strip () else: name = s email = None return cls (name, email) @classmethod def iter_from_string (cls, author_string): """Parse given string and return an iterator that yields new AuthorData instances, one for each author name (and optionally email address) denoted in the string.""" if "\n" in author_string: split_at = "\n" else: split_at = "," for single_author_string in author_string.split (split_at): yield cls.from_string (single_author_string) def __hash__ (self): # TODO: Should enforce immutability to ensure that the hash is stable. return hash ((self.name, self.email,)) def __cmp__ (self, other): return cmp (str (self), str (other)) def __repr__ (self): return "<%s object name=%r, email=%r at 0x%x>" % (type (self).__name__, self.name, self.email, id (self),) def __str__ (self): if self.email is not None: return "%s <%s>" % (self.name, self.email,) else: return str (self.name) class PluginData (AccessibleDataObject): """Data object describing a single plugin.""" __slots__ = ("typefinders", "elements", "description", "filename", "license", "name", "origin", "package", "source", "version",) def __get_features (plugin): return chain (plugin.typefinders, plugin.elements) features = property (__get_features) _accessor_name = "plugin" # These are left empty since the PluginData and ElementData classes # reference each other. They are filled in after ElementData is defined. _accessor_traverse = {} _accessor_transforms = {} def __init__ (self, gst_plugin): if not gst_plugin.is_loaded (): raise ValueError ("plugin needs to be loaded") self.typefinders = [] self.elements = [] attrs = list (PluginData.__slots__) attrs.remove ("typefinders") attrs.remove ("elements") for attr in attrs: setattr (self, attr, pool (getattr (gst_plugin, "get_%s" % (attr,)) ())) def __setstate__ (self, state): # Restore the broken reference cycle, see FeatureData.__getstate__ for # explanation. for feature in (state.get ("typefinders", ()) + state.get ("elements", ())): feature.plugin = self return DataObject.__setstate__ (self, state) def __repr__ (self): if self.filename: location = "from %s" % (self.filename,) else: location = "static plugin" return "<%s object %s (%s) at 0x%x>" % (type (self).__name__, self.name, location, id (self),) def add_typefinder (self, feature): feature.plugin = self self.typefinders.append (feature) def add_element (self, feature): feature.plugin = self self.elements.append (feature) class FeatureData (AccessibleDataObject): """Base class for data objects that descend from gst.PluginFeature.""" __slots__ = ("name", "rank", "plugin",) _accessor_name = "feature" _accessor_types = {"plugin" : PluginData} _accessor_transforms = {PluginData : "plugin"} def __init__ (self, gst_feature): self.name = pool (gst_feature.get_name ()) self.rank = pool (RankData (gst_feature.get_rank ())) # Not setting the plugin attribute, it has to be filled by the caller. def __getstate__ (self): # Break the reference cycle between feature and plugin to prevent # sending rich comparison methods into an endless recursion. The # reference will be restored in PluginData.__setstate__. state = DataObject.__getstate__ (self) del state["plugin"] return state def __repr__ (self): return "<%s object %s at 0x%x>" % (type (self).__name__, self.name, id (self),) class StructureData (AccessibleDataObject): """Data object describing a gst.Structure, i.e. a named mapping.""" __slots__ = ("name", "fields",) def __init__ (self, structure): self.name = pool (structure.get_name ()) self.fields = pool (tuple (((pool (name), pool (gst_value_serialize (structure[name])),) for name in structure.keys ()))) def __repr__ (self): return "<%s object %s at 0x%x>" % (type (self).__name__, str (self), id (self),) def __str__ (self): if not self.fields: return self.name else: fields = ",".join (("%s=%s" % field for field in self.fields)) return "%s,%s" % (self.name, fields,) class CapsData (AccessibleDataObject): __slots__ = ("any", "_structures",) def __init__ (self, caps): if caps.is_any (): self.any = True self._structures = () elif caps.is_empty (): self.any = False self._structures = () else: self.any = False self._structures = tuple ((pool (StructureData (s)) for s in caps)) def __getstate__ (self): if self.any: return True else: return tuple ((s.name, s.fields,) for s in self._structures) def __setstate__ (self, state): if state == True: self.any = True self._structures = () else: self.any = False def restore_structure (s): struct = StructureData.__new__ (StructureData) name, fields = s struct.__setstate__ ({"name" : name, "fields" : fields}) return struct self._structures = tuple ((restore_structure (s) for s in state)) def __repr__ (self): return "<%s object (%s) at 0x%x>" % (type (self).__name__, str (self), id (self),) def __str__ (self): if self.any: return "ANY" elif not self._structures: return "EMPTY" else: return "; ".join ((str (s) for s in self._structures)) def __iter__ (self): return iter (self._structures) def __len__ (self): return len (self._structures) def __getitem__ (self, key): return self._structures.__getitem__ (key) def __contains__ (self, item): return item in self._structures class TypeFinderData (FeatureData): """Data object describing a type finder.""" __slots__ = ("caps", "extensions",) def __init__ (self, factory): FeatureData.__init__ (self, factory) self.caps = CapsData (factory.get_caps ()) # TODO: Keep an eye on gst-python bug #385841. ## self.extensions = factory.get_extensions () class PadData (AccessibleDataObject): """Data object describing a single pad (or pad template).""" __slots__ = ("name", "presence", "direction", "caps",) _accessor_name = "pad" _accessor_traverse = {"caps" : StructureData} def __init__ (self, element_factory, pad_template): self.name = pool (pad_template.name_template) self.presence = pool (pad_template.presence.value_nick) self.direction = pool (pad_template.direction.value_nick) self.caps = CapsData (pad_template.static_caps.get ()) def __getstate__ (self): return (self.name, self.presence, self.direction, self.caps,) def __setstate__ (self, state): self.name, self.presence, self.direction, self.caps = state def __repr__ (self): return "<%s object %s, %s, %s at 0x%x>" % (type (self).__name__, self.name, self.direction, self.presence, id (self),) class ValueData (DataObject): __slots__ = ("description", "nickname", "value",) value_type = None def __getstate__ (self): return (self.description, self.nickname, self.value,) def __setstate__ (self, state): self.description, self.nickname, self.value = state def __repr__ (self): return "<%s object %s (%r) at 0x%x>" % (type (self).__name__, self.nickname, self.value, id (self),) class EnumValueData (ValueData): """Data object describing a single GEnumValue.""" __slots__ = () value_type = "enum" def __init__ (self, item): value, enum = item self.description = pool (enum.value_name) self.nickname = pool (enum.value_nick) self.value = value @classmethod def iter_from_param_spec (cls, pspec): items = pspec.enum_class.__enum_values__.items () return (cls (i) for i in sorted (items)) class FlagsValueData (ValueData): """Data object describing a single GFlagsValue.""" __slots__ = () value_type = "flags" def __init__ (self, item): value, flags = item self.description = pool (flags.first_value_name) self.nickname = pool (flags.first_value_nick) self.value = value @classmethod def iter_from_param_spec (cls, pspec): items = pspec.flags_class.__flags_values__.items () return (cls (i) for i in sorted (items)) class RankData (DataObject): """Data object describing a single feature rank. The rank is just a plain integer; this wrapper adds very user friendly stringification.""" __slots__ = ("_rank",) def __init__ (self, rank): self._rank = int (rank) def __getstate__ (self): return self._rank def __setstate__ (self, state): self._rank = int (state) def __int__ (self): return self._rank def __cmp__ (self, other): return cmp (self._rank, int (other)) def __repr__ (self): return "<%s object %s at 0x%x>" % (type (self).__name__, self.__str__ (), id (self),) def __str__ (self): try: # Example: "256 (primary)" return "%i (%s)" % (self._rank, _named_ranks[self._rank],) except KeyError: # Not a named rank. ranks = _named_ranks.keys () delta = ((abs (self._rank - rank), rank,) for rank in ranks) nearest_rank = list (sorted (delta))[0][1] nearest_name = _named_ranks[nearest_rank] if nearest_rank > self._rank: sign = "-" else: sign = "+" diff = abs (nearest_rank - self._rank) # Example: "255 (primary - 1)" return "%i (%s %s %i)" % (self._rank, nearest_name, sign, diff,) class PropertyData (DataObject): """Data object describing a GObject property.""" __slots__ = ("name", "nickname", "description", "owner_name", "flags", "data_type", "values", "default", "minimum", "maximum",) def __init__ (self, pspec): flags = () if pspec.flags & gobject.PARAM_READABLE: flags = ("readable",) if pspec.flags & gobject.PARAM_WRITABLE: flags = flags + ("writable",) if pspec.flags & gst.PARAM_CONTROLLABLE: flags = flags + ("controllable",) self.name = pool (pspec.name) self.nickname = pool (pspec.nick) self.description = pool (pspec.blurb) self.owner_name = pool (pspec.owner_type.name) self.flags = pool (flags) self.data_type = pool (pspec.value_type.name) def get_value (v): try: # For enums: return v.value_nick except AttributeError: pass try: # For flags: return tuple (v.value_nicks) except AttributeError: pass if isinstance (v, gst.Caps): return CapsData (v) if isinstance (v, float): # TODO Py2.5: Consider direct pickling of floats. We use # stringification for now since the workaround that Py2.4 needs # on Linux does not work on Windows. return str (v) # Anything else, hopefully pickable! return v self.default = get_value (pspec.default_value) try: self.minimum = get_value (pspec.minimum) self.maximum = get_value (pspec.maximum) except AttributeError: self.minimum = None self.maximum = None if hasattr (pspec, "enum_class"): self.values = tuple (EnumValueData.iter_from_param_spec (pspec)) elif hasattr (pspec, "flags_class"): self.values = tuple (FlagsValueData.iter_from_param_spec (pspec)) else: self.values = () def __getstate__ (self): return tuple ((getattr (self, attr) for attr in type (self).__slots__)) def __setstate__ (self, state): for attr, value in zip (type (self).__slots__, state): setattr (self, attr, value) def __repr__ (self): return "<%s object %s of %s at 0x%x>" % (type (self).__name__, self.name, self.owner_name, id (self),) class SignalData (DataObject): """Data object describing a GObject signal.""" __slots__ = ("name", "owner_name", "flags", "parameters", "return_type",) def __init__ (self, signal_id): query = gobject.signal_query (signal_id) signal_id, name, gtype, iflags, return_gtype, paramspecs = query flags = () for flag_name in ("ACTION", "DETAILED", "NO_HOOKS", "NO_RECURSE", "RUN_CLEANUP", "RUN_FIRST", "RUN_LAST",): flag = getattr (gobject, "SIGNAL_%s" % (flag_name,)) if iflags & flag: flags += (flag_name.lower ().replace ("_", "-"),) self.name = pool (name) self.owner_name = pool (gtype.name) self.flags = pool (flags) self.parameters = tuple ([pool (t.name) for t in paramspecs]) self.return_type = pool (return_gtype.name) def __getstate__ (self): return tuple ((getattr (self, attr) for attr in type (self).__slots__)) def __setstate__ (self, state): for attr, value in zip (type (self).__slots__, state): setattr (self, attr, value) def __repr__ (self): return "<%s object %s of %s at 0x%x>" % (type (self).__name__, self.name, self.object, id (self),) class ElementData (FeatureData): """Data object describing a single element.""" __slots__ = ("authors", "description", "hierarchy", "interfaces", "klasses", "longname", "properties", "signals", "pads", "type_name", "uri_protocols", "uri_type",) def __get_parent_classes (element): return element.hierarchy[:-1] parent_classes = property (__get_parent_classes) _accessor_name = "element" _accessor_traverse = {"authors" : AuthorData, "hierarchy" : str, "interfaces" : str, "klasses" : str, "parent_classes" : str, "properties" : PropertyData, "signals" : SignalData, "pads" : PadData, "uri_protocols" : str} def __init__ (self, factory): FeatureData.__init__ (self, factory) helper = FactoryHelper (factory) authors = AuthorData.iter_from_string (factory.get_author ()) self.authors = tuple ((pool (author) for author in authors)) # No pooling for the description -- these should be rather unique. self.description = factory.get_description () self.hierarchy = tuple ((pool (n) for n in helper.iter_class_name_hierarchy ())) self.type_name = pool (helper.gtype.name) self.interfaces = tuple ((pool (n) for n in sorted (helper.iter_interface_names ()))) self.klasses = tuple ((pool (k.strip ()) for k in factory.get_klass ().split ("/"))) self.longname = pool (factory.get_longname ()) self.properties = tuple ((pool (PropertyData (p)) for p in helper.iter_properties_check_override ())) self.signals = tuple ((pool (SignalData (s)) for s in helper.iter_signal_ids ())) self.pads = tuple ((pool (PadData (factory, t)) for t in factory.get_static_pad_templates ())) self.uri_protocols = tuple ((pool (p) for p in helper.get_uri_protocols ())) self.uri_type = pool (helper.get_uri_type ()) # Now that ElementData is defined, fix the PluginData accessor hints: PluginData._accessor_traverse["elements"] = ElementData PluginData._accessor_transforms[ElementData] = "elements" class Acquisition (object): def __init__ (self): self.logger = logging.getLogger ("data.acq") def __iter__ (self): return self class AcquisitionReader (object): """Mix-in class for Acquisition objects that read a pickled stream of data.""" def __init__ (self, fileobj): self.fileobj = fileobj global _named_ranks try: from cPickle import Unpickler, UnpicklingError except ImportError: from pickle import Unpickler, UnpicklingError unpickler = Unpickler (fileobj) try: ident = unpickler.load () except (UnpicklingError, NameError, AttributeError, IndexError, ImportError,): self.unpickling_error () except EOFError: self.eof_error () if isinstance (ident, DataErrorData): error = ident def data_iter (): # TODO Py2.5: Use try...finally. yield error self.cleanup () return data_iter () try: magic = ident.magic revision = ident.revision except AttributeError: self.unpickling_error () if magic != _DATA_MAGIC or revision < _DATA_REVISION: self.bad_magic_error () _named_ranks = ident.named_ranks self.unpickler = unpickler def __del__ (self): try: self.cleanup () except AttributeError: pass def __iter__ (self): return self def next (self): try: return self.unpickler.load () except EOFError: self.cleanup () raise StopIteration def cleanup (self): self.fileobj.close () def unpickling_error (self): raise def eof_error (self): raise EnvironmentError ("child process exited early") def bad_magic_error (self): raise EnvironmentError ("incompatible data revision/magic field") class DirectAcquisitionBase (Acquisition): """Acquire data by querying the GStreamer registry.""" def __init__ (self, registry = None, **kw): Acquisition.__init__ (self) self.setup_gst () self.logger.info ("using GStreamer %i.%i.%i.%i", *gst.get_gst_version ()) self.logger.info ("using gst-python %i.%i.%i.%i", *gst.get_pygst_version ()) if registry is None: registry = gst.registry_get_default () self.registry = registry self.gen = self.__data_gen () def next (self): return self.gen.next () @staticmethod def setup_gst (): global gst, _named_ranks if gst is not None: # Already imported. return # Wipe sys.argv before importing the gst module. See gst-python bug # #425847 for more information. del sys.argv[1:] import pygst try: pygst.require ("0.10") except pygst.RequiredVersionError, exc: # ImportError is much more appropriate for this kind of error. raise ImportError ("need GStreamer 0.10 series (error: %s)" % (exc.args[0],)) import gst pygobject_version = gobject.pygobject_version if pygobject_version < _REQUIRED_PYGOBJECT: msg = "pygobject is too old (need at least %i.%i.%i, found %i.%i.%i)" raise ImportError (msg % (_REQUIRED_PYGOBJECT + pygobject_version)) try: pygst_version = gst.get_pygst_version () except AttributeError: # get_pygst_version was added in 0.10.4, so this is way too old. msg = "gst-python is too old (need at least %i.%i.%i.%i)" raise ImportError (msg % _REQUIRED_PYGST) if pygst_version < _REQUIRED_PYGST: msg = "gst-python is too old (need at least %i.%i.%i.%i, found %i.%i.%i.%i)" raise ImportError (msg % (_REQUIRED_PYGST + pygst_version)) ranks = {} for int_rank, enum_obj in gst.Rank.__enum_values__.items (): ranks[int_rank] = enum_obj.value_nick # Store in global variable. _named_ranks = ranks try: gst.registry_fork_set_enabled (False) except AttributeError: # Bindings too old, nothing to worry about. pass # Before gst-python 0.10.6, GST_PARAM_CONTROLLABLE is not available in the # bindings: try: gst.PARAM_CONTROLLABLE except AttributeError: G_PARAM_USER_SHIFT = 8 gst.PARAM_CONTROLLABLE = 1 << (G_PARAM_USER_SHIFT + 1) def __data_gen (self): registry = self.registry for plugin in registry.get_plugin_list (): plugin = plugin.load () if plugin is None: # Error during load, probably a stale entry in the registry. continue plugin_data = PluginData (plugin) features = registry.get_feature_list_by_plugin (plugin.get_name ()) for feature in features: if isinstance (feature, gst.ElementFactory): plugin_data.add_element (ElementData (feature)) # TODO: Ignoring type finders here, see TypeFinderData.__init__ for # details. ## elif isinstance (feature, gst.TypeFindFactory): ## plugin_data.add_typefinder (TypeFinderData (feature)) yield plugin_data self.logger.debug ("acquisition finished") class DirectAcquisition (DirectAcquisitionBase): """Acquire data by querying the GStreamer registry.""" def __init__ (self, *a, **kw): DirectAcquisitionBase.__init__ (self, *a, **kw) self.cache_file = utils.SaveWriteFile (CacheAcquisition.get_file_name (), mode = "wb") if "extra_file" in kw: extra_file = kw["extra_file"] self.writer = Writer (utils.TeeWriteFile (self.cache_file, extra_file)) else: self.writer = Writer (self.cache_file) def next (self): try: plugin_data = DirectAcquisitionBase.next (self) except StopIteration: self.cache_file.close () raise self.writer.write (plugin_data) return plugin_data class BadCacheFileError (Exception): pass class CacheAcquisition (Acquisition, AcquisitionReader): """Acquire data by reading from cache file.""" @classmethod def get_file_name (cls): return os.path.join (utils.XDG.CACHE_HOME, "gst-inspector", "cache-0.10.data") def __init__ (self): fp = file (self.get_file_name (), "rb") Acquisition.__init__ (self) AcquisitionReader.__init__ (self, fp) def unpickling_error (self): raise BadCacheFileError ("cache file has an unknown format") def eof_error (self): raise BadCacheFileError ("cache file is truncated") def bad_magic_error (self): raise BadCacheFileError ("cache file has an incompatible format") class ForkAcquisitionChild (object): def __init__ (self, parent_file): self.logger = logging.getLogger ("data.child") try: for plugin_data in DirectAcquisition (extra_file = parent_file): pass except IOError, exc: import errno if exc.errno == errno.EPIPE: # Broken pipe, most probably because the parent process exited. # Do nothing, die silently. self.logger.info ("broken pipe, exiting") else: raise class ForkAcquisition (Acquisition, AcquisitionReader): """Acquire data by fork()ing a child process.""" def __init__ (self): Acquisition.__init__ (self) fp, self.pid = self._fork_child () AcquisitionReader.__init__ (self, fp) def _fork_child (self): self.logger.debug ("forking child process") pipe_r, pipe_w = os.pipe () pid = os.fork () if pid != 0: # Parent process. os.close (pipe_w) return (os.fdopen (pipe_r, "rb"), pid,) else: # Child process. exit_status = 0 try: try: os.close (pipe_r) parent_file = os.fdopen (pipe_w, "wb") try: ForkAcquisitionChild (parent_file) finally: try: parent_file.close () except IOError, exc: import errno if exc.errno == errno.EPIPE: # Silently ignore broken pipe error. pass else: raise except StandardError, exc: print >> sys.stderr, "Exception %s in child process:" % (str (exc),) sys.excepthook (*sys.exc_info ()) finally: os._exit (exit_status) def cleanup (self): if self.pid is not None: self.logger.debug ("waiting for child process %i", self.pid) # Some systems might need this to clean up properly: try: os.waitpid (self.pid, 0) except OSError: # We get here if the child process was already terminated. pass self.pid = None self.logger.debug ("child process terminated") AcquisitionReader.cleanup (self) class SpawnAcquisition (Acquisition, AcquisitionReader): """Acquire data by spawning a child process.""" def __init__ (self): from subprocess import PIPE Acquisition.__init__ (self) exe = sys.executable script = sys.argv[0] # TODO: This should invoke a separate libexec script instead. self.proc = utils.FixedPopen ((exe, script, "--data-dump-acquisition",), stdout = PIPE) AcquisitionReader.__init__ (self, self.proc.stdout) class Writer (object): def __init__ (self, fileobj): self.logger = logging.getLogger ("data.writer") try: from cPickle import Pickler except ImportError: from pickle import Pickler self.fileobj = fileobj self.pickler = Pickler (fileobj, protocol = 2) self.write (IdentificationData ()) def write (self, plugin_data): try: from cPickle import UnpicklingError except ImportError: from pickle import UnpicklingError try: self.pickler.dump (plugin_data) except (UnpicklingError, TypeError, SystemError,), exc: # TypeError is raised by copy_reg for unpickable objects. # SystemError seems to occur when triggering bugs in Python related # to the platform, e.g. trying to pickle float ("inf") with a binary # pickle protocol on Linux with Python 2.4. self.logger.error ("could not pickle %r: %s", plugin_data, str (exc)) data_error_data = DataErrorData (sys.exc_info (), plugin_data) self.pickler.dump (data_error_data) def close (self): self.pickler = None self.fileobj.close () class Policy (object): def __init__ (self, long_running = None, update = None): self.long_running = long_running self.update = update def modify (self, other): if other.long_running is not None: self.long_running = other.long_running if other.update is not None: self.update = other.update def iter_acquisitions (self): """Return an iterator that yields the recommended data acquisition constructors according to the policy attributes. The yielded conctructors are to be tried in order; the first one represents the best method. In case of failure, the next item is to be used as fallback method respectively.""" # TODO: Once the new update_registry GStreamer function is made # available in the bindings, this order can be adjusted to be more # efficient. if not self.update: yield CacheAcquisition elif "gst" in sys.modules: # GStreamer is already initialized in this process, so a forked # child process would just re-read the old data. Spawning is # the only solution until we get gst.update_registry. yield SpawnAcquisition return if self.long_running: if hasattr (os, "fork"): yield ForkAcquisition yield SpawnAcquisition yield DirectAcquisition class Dispatcher (object): def __call__ (self, iterator): raise NotImplementedError ("derived classes must override this method") class DefaultDispatcher (Dispatcher): def __call__ (self, iterator): for x in iterator: pass class GSourceDispatcher (Dispatcher): def __init__ (self): Dispatcher.__init__ (self) self.source_id = None def __call__ (self, iterator): if self.source_id is not None: gobject.source_remove (self.source_id) self.source_id = gobject.idle_add (iterator.next) class Consumer (object): def handle_load_started (self): pass def handle_load_started_after (self): pass def handle_data_error (self, error): pass def handle_data_added (self, data): pass def handle_load_finished (self): pass class ConsumerProxy (Consumer): def __init__ (self): self.consumers = [] def handle_load_started (self): for consumer in self.consumers: consumer.handle_load_started () def handle_load_started_after (self): for consumer in self.consumers: consumer.handle_load_started_after () def handle_data_error (self, error): for consumer in self.consumers: consumer.handle_data_error (error) def handle_data_added (self, data): for consumer in self.consumers: consumer.handle_data_added (data) def handle_load_finished (self): for consumer in self.consumers: consumer.handle_load_finished () class ProducerBase (object): def __init__ (self, dispatcher = None, policy = None): self.logger = logging.getLogger ("data.producer") self.consumers = [] if dispatcher is None: dispatcher = DefaultDispatcher () if policy is None: policy = Policy (long_running = False, update = False) self.dispatch = dispatcher self.policy = policy def start (self): for consumer in self.consumers: consumer.handle_load_started () for consumer in self.consumers: consumer.handle_load_started_after () def have_data_error (self, error): for consumer in self.consumers: consumer.handle_data_error (error) def have_data_added (self, data): for consumer in self.consumers: consumer.handle_data_added (data) def have_load_finished (self): for consumer in self.consumers: consumer.handle_load_finished () class Producer (ProducerBase): def __init__ (self, dispatcher = None, policy = None): ProducerBase.__init__ (self, dispatcher, policy) self.worker = None def start (self): if self.worker is not None: raise ValueError ("already started") self.worker = self._worker_gen () ProducerBase.start (self) self.dispatch (self.worker) def __process_data (self, data): if isinstance (data, DataErrorData): self.have_data_error (data) else: self.have_data_added (data) def _worker_gen (self): if self.policy.update: self.logger.info ("update forced by policy") for acq_class in self.policy.iter_acquisitions (): self.logger.info ("trying %s", acq_class.__name__) if acq_class != CacheAcquisition: gen = acq_class () break else: try: gen = acq_class () except (EnvironmentError, BadCacheFileError,): # EnvironmentError is most probably just "file not found". # BadCacheFileError is our custom exception to indicate that # the file is either garbage or was written by another # inspector version with an incompatible format. self.logger.info ("cache file acquisition unavailable") else: break else: # TODO: What a dubious error! raise RuntimeError ("cannot acquire data") self.logger.info ("using %s", acq_class.__name__) yield True i = 0 batch = [] def process_batch (): for data in batch: self.__process_data (data) del batch[:] n_batch = 20 def key (data): if isinstance (data, DataErrorData): # Handle errors immediately: return n_batch elif hasattr (data, "elements"): return len (data.elements) else: return 1 for data in gen: batch.append (data) i += key (data) if i > n_batch: process_batch () i = 0 yield True process_batch () self.have_load_finished () self.worker = None self.worker_source_id = None yield False def accessor_lookup (spec): if not "." in spec: first = spec rest = "" else: i = spec.index (".") first = spec[:i] rest = spec[i + 1:] types = dict (((cls._accessor_name, cls,) for cls in (PluginData, ElementData, AuthorData, PadData,))) try: type_ = types[first] except KeyError: raise ValueError ("unknown accessor type name %r" % (first,)) return (type_, rest,) def identity (obj): return obj def accessor (spec): type_, rest = accessor_lookup (spec) if not rest: return identity return type_._accessor (*rest.split (".")) def accessor_transform (spec, new_type_spec): from_type, rest = accessor_lookup (spec) new_type = accessor_lookup (new_type_spec)[0] return new_type._accessor_transform (from_type, *rest.split (".")) def accessor_chain_level (spec): type_, rest = accessor_lookup (spec) if not rest: # Identity. return 0 return type_._accessor_chain_level (*rest.split (".")) class Collector (Consumer): def __init__ (self, accessor_spec, chain_level = None): plugin_accessor_spec = accessor_transform (accessor_spec, "plugin") if chain_level is None: chain_level = accessor_chain_level (plugin_accessor_spec) self.accessor = accessor (plugin_accessor_spec) self.chain_level = chain_level self.items = set () def clear (self): self.items.clear () def handle_load_started (self): self.clear () def handle_data_added (self, plugin): item = self.accessor (plugin) if self.chain_level == 0: self.items.add (item) elif self.chain_level == 1: self.items.update (item) else: item = tuple (item) for n in range (self.chain_level - 1): item = tuple (chain (*item)) self.items.update (item) class Filter (object): def __init__ (self, accessor_spec, param): self.accessor = accessor (accessor_spec) self.param = param def __call__ (self, obj): raise NotImplementedError () class FilterEq (Filter): def __call__ (self, obj): return self.accessor (obj) == self.param class FilterIn (Filter): def __call__ (self, obj): return self.param in self.accessor (obj) class FilterInIn (Filter): def __call__ (self, obj): for sub_obj in self.accessor (obj): if self.param in sub_obj: return True else: return False class FilterInInIn (Filter): def __call__ (self, obj): for sub_obj in self.accessor (obj): for sub_sub_obj in sub_obj: if self.param in sub_sub_obj: return True else: return False def get_data (spec, chain_level = None): if chain_level is None: chain_level = accessor_chain_level (spec) producer = Producer () consumer = Collector (spec, chain_level) producer.consumers.append (consumer) producer.start () return consumer.items def get_plugins (): return get_data ("plugin") def get_elements (): return get_data ("plugin.elements") def _dump_cache (*a, **kw): if sys.platform == "win32": import os, msvcrt msvcrt.setmode (sys.stdout.fileno (), os.O_BINARY) # This also updates the cache file. for data in DirectAcquisition (extra_file = sys.stdout): pass