From d763780e7fc81812f17d096fcad70b730550ee34 Mon Sep 17 00:00:00 2001 From: Thomas Vander Stichele Date: Wed, 1 Feb 2006 11:52:04 +0000 Subject: testsuite/test_event.py: add a test case for autoplugging behaviour: create a source, connect probes, store new-segme... Original commit message from CVS: * testsuite/test_event.py: add a test case for autoplugging behaviour: create a source, connect probes, store new-segment event, add element in buffer probe callback, and forward event Currently fails due to refcounting on the stored new-segment event --- testsuite/test_event.py | 80 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) (limited to 'testsuite') diff --git a/testsuite/test_event.py b/testsuite/test_event.py index 3341857..ca898f4 100644 --- a/testsuite/test_event.py +++ b/testsuite/test_event.py @@ -142,5 +142,85 @@ class TestEmit(TestCase): assert isinstance(event, gst.Event) +class TestDelayedEventProbe(TestCase): + # this test: + # starts a pipeline with only a source + # adds an event probe to catch the (first) new-segment + # adds a buffer probe to "autoplug" and send out this event + def setUp(self): + TestCase.setUp(self) + self.pipeline = gst.Pipeline() + self.src = gst.element_factory_make('fakesrc') + self.src.set_property('num-buffers', 10) + self.pipeline.add(self.src) + self.srcpad = self.src.get_pad('src') + + def tearDown(self): + gst.debug('setting pipeline to NULL') + self.pipeline.set_state(gst.STATE_NULL) + gst.debug('set pipeline to NULL') + # FIXME: wait for state change thread to die + while self.pipeline.__gstrefcount__ > 1: + gst.debug('waiting for self.pipeline G rc to drop to 1') + time.sleep(0.1) + self.assertEquals(self.pipeline.__gstrefcount__, 1) + + def FIXMEtestProbe(self): + self.srcpad.add_event_probe(self._event_probe_cb) + self._buffer_probe_id = self.srcpad.add_buffer_probe( + self._buffer_probe_cb) + + self._newsegment = None + self._eos = None + self._had_buffer = False + + self.pipeline.set_state(gst.STATE_PLAYING) + + while not self._eos: + time.sleep(0.1) + + # verify if our newsegment event is still around and valid + self.failUnless(self._newsegment) + self.assertEquals(self._newsegment.type, gst.EVENT_NEWSEGMENT) + self.assertEquals(self._newsegment.__grefcount__, 1) + + # verify if our eos event is still around and valid + self.failUnless(self._eos) + self.assertEquals(self._eos.type, gst.EVENT_EOS) + self.assertEquals(self._eos.__grefcount__, 1) + + def _event_probe_cb(self, pad, event): + if event.type == gst.EVENT_NEWSEGMENT: + self._newsegment = event + self.assertEquals(event.__grefcount__, 3) + # drop the event, we're storing it for later sending + return False + + if event.type == gst.EVENT_EOS: + self._eos = event + # we also want fakesink to get it + return True + + self.fail("Got an unknown event %r" % event) + + def _buffer_probe_cb(self, pad, buffer): + self.failUnless(self._newsegment) + + # fake autoplugging by now putting in a fakesink + sink = gst.element_factory_make('fakesink') + self.pipeline.add(sink) + self.src.link(sink) + sink.set_state(gst.STATE_PLAYING) + + pad = sink.get_pad('sink') + pad.send_event(self._newsegment) + + # we don't want to be called again + self.srcpad.remove_buffer_probe(self._buffer_probe_id) + + self._had_buffer = True + # now let the buffer through + return True + if __name__ == "__main__": unittest.main() -- cgit v1.2.3