summaryrefslogtreecommitdiff
path: root/testsuite/test_event.py
diff options
context:
space:
mode:
Diffstat (limited to 'testsuite/test_event.py')
-rw-r--r--testsuite/test_event.py80
1 files changed, 80 insertions, 0 deletions
diff --git a/testsuite/test_event.py b/testsuite/test_event.py
index 3341857..ca898f4 100644
--- a/testsuite/test_event.py
+++ b/testsuite/test_event.py
@@ -142,5 +142,85 @@ class TestEmit(TestCase):
assert isinstance(event, gst.Event)
+class TestDelayedEventProbe(TestCase):
+ # this test:
+ # starts a pipeline with only a source
+ # adds an event probe to catch the (first) new-segment
+ # adds a buffer probe to "autoplug" and send out this event
+ def setUp(self):
+ TestCase.setUp(self)
+ self.pipeline = gst.Pipeline()
+ self.src = gst.element_factory_make('fakesrc')
+ self.src.set_property('num-buffers', 10)
+ self.pipeline.add(self.src)
+ self.srcpad = self.src.get_pad('src')
+
+ def tearDown(self):
+ gst.debug('setting pipeline to NULL')
+ self.pipeline.set_state(gst.STATE_NULL)
+ gst.debug('set pipeline to NULL')
+ # FIXME: wait for state change thread to die
+ while self.pipeline.__gstrefcount__ > 1:
+ gst.debug('waiting for self.pipeline G rc to drop to 1')
+ time.sleep(0.1)
+ self.assertEquals(self.pipeline.__gstrefcount__, 1)
+
+ def FIXMEtestProbe(self):
+ self.srcpad.add_event_probe(self._event_probe_cb)
+ self._buffer_probe_id = self.srcpad.add_buffer_probe(
+ self._buffer_probe_cb)
+
+ self._newsegment = None
+ self._eos = None
+ self._had_buffer = False
+
+ self.pipeline.set_state(gst.STATE_PLAYING)
+
+ while not self._eos:
+ time.sleep(0.1)
+
+ # verify if our newsegment event is still around and valid
+ self.failUnless(self._newsegment)
+ self.assertEquals(self._newsegment.type, gst.EVENT_NEWSEGMENT)
+ self.assertEquals(self._newsegment.__grefcount__, 1)
+
+ # verify if our eos event is still around and valid
+ self.failUnless(self._eos)
+ self.assertEquals(self._eos.type, gst.EVENT_EOS)
+ self.assertEquals(self._eos.__grefcount__, 1)
+
+ def _event_probe_cb(self, pad, event):
+ if event.type == gst.EVENT_NEWSEGMENT:
+ self._newsegment = event
+ self.assertEquals(event.__grefcount__, 3)
+ # drop the event, we're storing it for later sending
+ return False
+
+ if event.type == gst.EVENT_EOS:
+ self._eos = event
+ # we also want fakesink to get it
+ return True
+
+ self.fail("Got an unknown event %r" % event)
+
+ def _buffer_probe_cb(self, pad, buffer):
+ self.failUnless(self._newsegment)
+
+ # fake autoplugging by now putting in a fakesink
+ sink = gst.element_factory_make('fakesink')
+ self.pipeline.add(sink)
+ self.src.link(sink)
+ sink.set_state(gst.STATE_PLAYING)
+
+ pad = sink.get_pad('sink')
+ pad.send_event(self._newsegment)
+
+ # we don't want to be called again
+ self.srcpad.remove_buffer_probe(self._buffer_probe_id)
+
+ self._had_buffer = True
+ # now let the buffer through
+ return True
+
if __name__ == "__main__":
unittest.main()