summaryrefslogtreecommitdiff
path: root/testsuite
diff options
context:
space:
mode:
authorThomas Vander Stichele <thomas@apestaart.org>2006-02-01 11:52:04 +0000
committerThomas Vander Stichele <thomas@apestaart.org>2006-02-01 11:52:04 +0000
commitd763780e7fc81812f17d096fcad70b730550ee34 (patch)
tree8df48a29f7d9ed9d0d17dda7a10e1b4516e58e25 /testsuite
parent2f3f34b7af0105d274a536a68d73b669f634ceff (diff)
testsuite/test_event.py: add a test case for autoplugging behaviour: create a source, connect probes, store new-segme...
Original commit message from CVS: * testsuite/test_event.py: add a test case for autoplugging behaviour: create a source, connect probes, store new-segment event, add element in buffer probe callback, and forward event Currently fails due to refcounting on the stored new-segment event
Diffstat (limited to 'testsuite')
-rw-r--r--testsuite/test_event.py80
1 files changed, 80 insertions, 0 deletions
diff --git a/testsuite/test_event.py b/testsuite/test_event.py
index 3341857..ca898f4 100644
--- a/testsuite/test_event.py
+++ b/testsuite/test_event.py
@@ -142,5 +142,85 @@ class TestEmit(TestCase):
assert isinstance(event, gst.Event)
+class TestDelayedEventProbe(TestCase):
+ # this test:
+ # starts a pipeline with only a source
+ # adds an event probe to catch the (first) new-segment
+ # adds a buffer probe to "autoplug" and send out this event
+ def setUp(self):
+ TestCase.setUp(self)
+ self.pipeline = gst.Pipeline()
+ self.src = gst.element_factory_make('fakesrc')
+ self.src.set_property('num-buffers', 10)
+ self.pipeline.add(self.src)
+ self.srcpad = self.src.get_pad('src')
+
+ def tearDown(self):
+ gst.debug('setting pipeline to NULL')
+ self.pipeline.set_state(gst.STATE_NULL)
+ gst.debug('set pipeline to NULL')
+ # FIXME: wait for state change thread to die
+ while self.pipeline.__gstrefcount__ > 1:
+ gst.debug('waiting for self.pipeline G rc to drop to 1')
+ time.sleep(0.1)
+ self.assertEquals(self.pipeline.__gstrefcount__, 1)
+
+ def FIXMEtestProbe(self):
+ self.srcpad.add_event_probe(self._event_probe_cb)
+ self._buffer_probe_id = self.srcpad.add_buffer_probe(
+ self._buffer_probe_cb)
+
+ self._newsegment = None
+ self._eos = None
+ self._had_buffer = False
+
+ self.pipeline.set_state(gst.STATE_PLAYING)
+
+ while not self._eos:
+ time.sleep(0.1)
+
+ # verify if our newsegment event is still around and valid
+ self.failUnless(self._newsegment)
+ self.assertEquals(self._newsegment.type, gst.EVENT_NEWSEGMENT)
+ self.assertEquals(self._newsegment.__grefcount__, 1)
+
+ # verify if our eos event is still around and valid
+ self.failUnless(self._eos)
+ self.assertEquals(self._eos.type, gst.EVENT_EOS)
+ self.assertEquals(self._eos.__grefcount__, 1)
+
+ def _event_probe_cb(self, pad, event):
+ if event.type == gst.EVENT_NEWSEGMENT:
+ self._newsegment = event
+ self.assertEquals(event.__grefcount__, 3)
+ # drop the event, we're storing it for later sending
+ return False
+
+ if event.type == gst.EVENT_EOS:
+ self._eos = event
+ # we also want fakesink to get it
+ return True
+
+ self.fail("Got an unknown event %r" % event)
+
+ def _buffer_probe_cb(self, pad, buffer):
+ self.failUnless(self._newsegment)
+
+ # fake autoplugging by now putting in a fakesink
+ sink = gst.element_factory_make('fakesink')
+ self.pipeline.add(sink)
+ self.src.link(sink)
+ sink.set_state(gst.STATE_PLAYING)
+
+ pad = sink.get_pad('sink')
+ pad.send_event(self._newsegment)
+
+ # we don't want to be called again
+ self.srcpad.remove_buffer_probe(self._buffer_probe_id)
+
+ self._had_buffer = True
+ # now let the buffer through
+ return True
+
if __name__ == "__main__":
unittest.main()