diff options
Diffstat (limited to 'src/modules/rtp')
-rw-r--r-- | src/modules/rtp/module-rtp-recv.c | 107 | ||||
-rw-r--r-- | src/modules/rtp/module-rtp-send.c | 20 | ||||
-rw-r--r-- | src/modules/rtp/rtp.c | 21 | ||||
-rw-r--r-- | src/modules/rtp/rtp.h | 2 | ||||
-rw-r--r-- | src/modules/rtp/rtsp_client.c | 7 |
5 files changed, 104 insertions, 53 deletions
diff --git a/src/modules/rtp/module-rtp-recv.c b/src/modules/rtp/module-rtp-recv.c index 33e23af2..5caf8272 100644 --- a/src/modules/rtp/module-rtp-recv.c +++ b/src/modules/rtp/module-rtp-recv.c @@ -33,6 +33,7 @@ #include <unistd.h> #include <poll.h> +#include <pulse/rtclock.h> #include <pulse/timeval.h> #include <pulse/xmalloc.h> @@ -43,15 +44,17 @@ #include <pulsecore/sink-input.h> #include <pulsecore/memblockq.h> #include <pulsecore/log.h> +#include <pulsecore/core-rtclock.h> #include <pulsecore/core-util.h> #include <pulsecore/modargs.h> #include <pulsecore/namereg.h> #include <pulsecore/sample-util.h> #include <pulsecore/macro.h> #include <pulsecore/atomic.h> -#include <pulsecore/rtclock.h> #include <pulsecore/atomic.h> #include <pulsecore/time-smoother.h> +#include <pulsecore/socket-util.h> +#include <pulsecore/once.h> #include "module-rtp-recv-symdef.h" @@ -60,7 +63,7 @@ #include "sap.h" PA_MODULE_AUTHOR("Lennart Poettering"); -PA_MODULE_DESCRIPTION("Recieve data from a network via RTP/SAP/SDP"); +PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP"); PA_MODULE_VERSION(PACKAGE_VERSION); PA_MODULE_LOAD_ONCE(FALSE); PA_MODULE_USAGE( @@ -110,6 +113,7 @@ struct session { struct userdata { pa_module *module; + pa_core *core; pa_sap_context sap_context; pa_io_event* sap_event; @@ -165,7 +169,7 @@ static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) { pa_memblockq_rewind(s->memblockq, nbytes); } -/* Called from thread context */ +/* Called from I/O thread context */ static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) { struct session *s; @@ -184,11 +188,24 @@ static void sink_input_kill(pa_sink_input* i) { session_free(s); } +/* Called from IO context */ +static void sink_input_suspend_within_thread(pa_sink_input* i, pa_bool_t b) { + struct session *s; + pa_sink_input_assert_ref(i); + pa_assert_se(s = i->userdata); + + if (b) { + pa_smoother_pause(s->smoother, pa_rtclock_now()); + pa_memblockq_flush_read(s->memblockq); + } else + s->first_packet = FALSE; +} + /* Called from I/O thread context */ static int rtpoll_work_cb(pa_rtpoll_item *i) { pa_memchunk chunk; int64_t k, j, delta; - struct timeval now; + struct timeval now = { 0, 0 }; struct session *s; struct pollfd *p; @@ -206,10 +223,11 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) { p->revents = 0; - if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool) < 0) + if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0) return 0; - if (s->sdp_info.payload != s->rtp_context.payload) { + if (s->sdp_info.payload != s->rtp_context.payload || + !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) { pa_memblock_unref(chunk.memblock); return 0; } @@ -229,7 +247,7 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) { } } - /* Check wheter there was a timestamp overflow */ + /* Check whether there was a timestamp overflow */ k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset; j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp; @@ -238,15 +256,24 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) { else delta = j; - pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE); + pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE); - pa_rtclock_get(&now); + if (now.tv_sec == 0) { + PA_ONCE_BEGIN { + pa_log_warn("Using artificial time instead of timestamp"); + } PA_ONCE_END; + pa_rtclock_get(&now); + } else + pa_rtclock_from_wallclock(&now); pa_smoother_put(s->smoother, pa_timeval_load(&now), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec)); + /* Tell the smoother that we are rolling now, in case it is still paused */ + pa_smoother_resume(s->smoother, pa_timeval_load(&now), TRUE); + if (pa_memblockq_push(s->memblockq, &chunk) < 0) { pa_log_warn("Queue overrun"); - pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE); + pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE); } /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */ @@ -262,14 +289,14 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) { pa_usec_t wi, ri, render_delay, sink_delay = 0, latency, fix; unsigned fix_samples; - pa_log("Updating sample rate"); + pa_log_debug("Updating sample rate"); wi = pa_smoother_get(s->smoother, pa_timeval_load(&now)); ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec); - if (PA_MSGOBJECT(s->sink_input->sink)->process_msg(PA_MSGOBJECT(s->sink_input->sink), PA_SINK_MESSAGE_GET_LATENCY, &sink_delay, 0, NULL) < 0) - sink_delay = 0; + pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri); + sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink); render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec); if (ri > render_delay+sink_delay) @@ -294,14 +321,20 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) { fix_samples = (unsigned) (fix * (pa_usec_t) s->sink_input->thread_info.sample_spec.rate / (pa_usec_t) RATE_UPDATE_INTERVAL); /* Check if deviation is in bounds */ - if (fix_samples > s->sink_input->sample_spec.rate*.20) + if (fix_samples > s->sink_input->sample_spec.rate*.50) pa_log_debug("Hmmm, rate fix is too large (%lu Hz), not applying.", (unsigned long) fix_samples); + else { + /* Fix up rate */ + if (latency < s->intended_latency) + s->sink_input->sample_spec.rate -= fix_samples; + else + s->sink_input->sample_spec.rate += fix_samples; + + if (s->sink_input->sample_spec.rate > PA_RATE_MAX) + s->sink_input->sample_spec.rate = PA_RATE_MAX; + } - /* Fix up rate */ - if (latency < s->intended_latency) - s->sink_input->sample_spec.rate -= fix_samples; - else - s->sink_input->sample_spec.rate += fix_samples; + pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec)); pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate); @@ -362,6 +395,14 @@ static int mcast_socket(const struct sockaddr* sa, socklen_t salen) { goto fail; } + pa_make_udp_socket_low_delay(fd); + + one = 1; + if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) { + pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno)); + goto fail; + } + one = 1; if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) { pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno)); @@ -430,8 +471,14 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in s->sdp_info = *sdp_info; s->rtpoll_item = NULL; s->intended_latency = LATENCY_USEC; - s->smoother = pa_smoother_new(PA_USEC_PER_SEC*5, PA_USEC_PER_SEC*2, TRUE, 10); - pa_smoother_set_time_offset(s->smoother, pa_timeval_load(&now)); + s->smoother = pa_smoother_new( + PA_USEC_PER_SEC*5, + PA_USEC_PER_SEC*2, + TRUE, + TRUE, + 10, + pa_timeval_load(&now), + TRUE); s->last_rate_update = pa_timeval_load(&now); pa_atomic_store(&s->timestamp, (int) now.tv_sec); @@ -472,6 +519,7 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in s->sink_input->kill = sink_input_kill; s->sink_input->attach = sink_input_attach; s->sink_input->detach = sink_input_detach; + s->sink_input->suspend_within_thread = sink_input_suspend_within_thread; pa_sink_input_get_silence(s->sink_input, &silence); @@ -575,15 +623,13 @@ static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event } } -static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *ptv, void *userdata) { +static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) { struct session *s, *n; struct userdata *u = userdata; struct timeval now; - struct timeval tv; pa_assert(m); pa_assert(t); - pa_assert(ptv); pa_assert(u); pa_rtclock_get(&now); @@ -601,9 +647,7 @@ static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const str } /* Restart timer */ - pa_gettimeofday(&tv); - pa_timeval_add(&tv, DEATH_TIMEOUT*PA_USEC_PER_SEC); - m->time_restart(t, &tv); + pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC); } int pa__init(pa_module*m) { @@ -617,7 +661,6 @@ int pa__init(pa_module*m) { socklen_t salen; const char *sap_address; int fd = -1; - struct timeval tv; pa_assert(m); @@ -648,9 +691,9 @@ int pa__init(pa_module*m) { if ((fd = mcast_socket(sa, salen)) < 0) goto fail; - u = pa_xnew(struct userdata, 1); - m->userdata = u; + m->userdata = u = pa_xnew(struct userdata, 1); u->module = m; + u->core = m->core; u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL)); u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u); @@ -660,9 +703,7 @@ int pa__init(pa_module*m) { u->n_sessions = 0; u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func); - pa_gettimeofday(&tv); - pa_timeval_add(&tv, DEATH_TIMEOUT * PA_USEC_PER_SEC); - u->check_death_event = m->core->mainloop->time_new(m->core->mainloop, &tv, check_death_event_cb, u); + u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u); pa_modargs_free(ma); diff --git a/src/modules/rtp/module-rtp-send.c b/src/modules/rtp/module-rtp-send.c index 722d12bd..f147364d 100644 --- a/src/modules/rtp/module-rtp-send.c +++ b/src/modules/rtp/module-rtp-send.c @@ -31,6 +31,7 @@ #include <string.h> #include <unistd.h> +#include <pulse/rtclock.h> #include <pulse/timeval.h> #include <pulse/util.h> #include <pulse/xmalloc.h> @@ -77,7 +78,7 @@ PA_MODULE_USAGE( #define DEFAULT_DESTINATION "224.0.0.56" #define MEMBLOCKQ_MAXLENGTH (1024*170) #define DEFAULT_MTU 1280 -#define SAP_INTERVAL 5 +#define SAP_INTERVAL (5*PA_USEC_PER_SEC) static const char* const valid_modargs[] = { "source", @@ -151,18 +152,14 @@ static void source_output_kill(pa_source_output* o) { static void sap_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) { struct userdata *u = userdata; - struct timeval next; pa_assert(m); pa_assert(t); - pa_assert(tv); pa_assert(u); pa_sap_send(&u->sap_context, 0); - pa_gettimeofday(&next); - pa_timeval_add(&next, SAP_INTERVAL * PA_USEC_PER_SEC); - m->time_restart(t, &next); + pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + SAP_INTERVAL); } int pa__init(pa_module*m) { @@ -186,7 +183,6 @@ int pa__init(pa_module*m) { char *p; int r, j; socklen_t k; - struct timeval tv; char hn[128], *n; pa_bool_t loop = FALSE; pa_source_output_new_data data; @@ -347,10 +343,10 @@ int pa__init(pa_module*m) { o->push = source_output_push; o->kill = source_output_kill; - u = pa_xnew(struct userdata, 1); - m->userdata = u; - o->userdata = u; + pa_log_info("Configured source latency of %llu ms.", + (unsigned long long) pa_source_output_set_requested_latency(o, pa_bytes_to_usec(mtu, &o->sample_spec)) / PA_USEC_PER_MSEC); + m->userdata = o->userdata = u = pa_xnew(struct userdata, 1); u->module = m; u->source_output = o; @@ -395,9 +391,7 @@ int pa__init(pa_module*m) { pa_sap_send(&u->sap_context, 0); - pa_gettimeofday(&tv); - pa_timeval_add(&tv, SAP_INTERVAL * PA_USEC_PER_SEC); - u->sap_event = m->core->mainloop->time_new(m->core->mainloop, &tv, sap_event_cb, u); + u->sap_event = pa_core_rttime_new(m->core, pa_rtclock_now() + SAP_INTERVAL, sap_event_cb, u); pa_source_output_put(u->source_output); diff --git a/src/modules/rtp/rtp.c b/src/modules/rtp/rtp.c index 7537c1f5..6706a10f 100644 --- a/src/modules/rtp/rtp.c +++ b/src/modules/rtp/rtp.c @@ -162,13 +162,16 @@ pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame return c; } -int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) { +int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, struct timeval *tstamp) { int size; struct msghdr m; + struct cmsghdr *cm; struct iovec iov; uint32_t header; unsigned cc; ssize_t r; + uint8_t aux[1024]; + pa_bool_t found_tstamp = FALSE; pa_assert(c); pa_assert(chunk); @@ -208,8 +211,8 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) { m.msg_namelen = 0; m.msg_iov = &iov; m.msg_iovlen = 1; - m.msg_control = NULL; - m.msg_controllen = 0; + m.msg_control = aux; + m.msg_controllen = sizeof(aux); m.msg_flags = 0; r = recvmsg(c->fd, &m, 0); @@ -275,6 +278,18 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) { pa_memchunk_reset(&c->memchunk); } + for (cm = CMSG_FIRSTHDR(&m); cm; cm = CMSG_NXTHDR(&m, cm)) { + if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SO_TIMESTAMP) + memcpy(tstamp, CMSG_DATA(cm), sizeof(struct timeval)); + found_tstamp = TRUE; + break; + } + + if (!found_tstamp) { + pa_log_warn("Couldn't find SO_TIMESTAMP data in auxiliary recvmsg() data!"); + memset(tstamp, 0, sizeof(tstamp)); + } + return 0; fail: diff --git a/src/modules/rtp/rtp.h b/src/modules/rtp/rtp.h index eff5e75b..b197e82f 100644 --- a/src/modules/rtp/rtp.h +++ b/src/modules/rtp/rtp.h @@ -43,7 +43,7 @@ pa_rtp_context* pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint32_t ssr int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q); pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame_size); -int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool); +int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, struct timeval *tstamp); void pa_rtp_context_destroy(pa_rtp_context *c); diff --git a/src/modules/rtp/rtsp_client.c b/src/modules/rtp/rtsp_client.c index 98db05dd..72d304e8 100644 --- a/src/modules/rtp/rtsp_client.c +++ b/src/modules/rtp/rtsp_client.c @@ -30,6 +30,7 @@ #include <arpa/inet.h> #include <unistd.h> #include <sys/ioctl.h> +#include <netinet/in.h> #ifdef HAVE_SYS_FILIO_H #include <sys/filio.h> @@ -211,7 +212,7 @@ static void line_callback(pa_ioline *line, const char *s, void *userdata) { } if (!strlen(s2)) { /* End of headers */ - /* We will have a header left from our looping itteration, so add it in :) */ + /* We will have a header left from our looping iteration, so add it in :) */ if (c->last_header) { /* This is not a continuation header so let's dump it into our proplist */ pa_headerlist_puts(c->response_headers, c->last_header, pa_strbuf_tostring_free(c->header_buffer)); @@ -332,7 +333,7 @@ int pa_rtsp_connect(pa_rtsp_client *c) { pa_xfree(c->session); c->session = NULL; - if (!(c->sc = pa_socket_client_new_string(c->mainloop, c->hostname, c->port))) { + if (!(c->sc = pa_socket_client_new_string(c->mainloop, TRUE, c->hostname, c->port))) { pa_log("failed to connect to server '%s:%d'", c->hostname, c->port); return -1; } @@ -488,7 +489,7 @@ int pa_rtsp_record(pa_rtsp_client* c, uint16_t* seq, uint32_t* rtptime) { pa_assert(c); if (!c->session) { - /* No seesion in progres */ + /* No session in progress */ return -1; } |