From 49d351b537d991c226c1aff5fce84fa18043e1f8 Mon Sep 17 00:00:00 2001 From: Maya Venkatraman Date: Tue, 13 Oct 2020 13:50:47 -0700 Subject: [PATCH] obs-ffmpeg: Add HLS output Add and register an obs_output_info struct called ffmpeg_hls_muxer. It uses ffmpeg's HLS muxer to stream output. Add threading and buffer to reduce skipped frames. Also add frame drop logic when there are too many packets in the buffer for congestion control. --- plugins/obs-ffmpeg/CMakeLists.txt | 1 + plugins/obs-ffmpeg/ffmpeg-mux/ffmpeg-mux.c | 19 +- plugins/obs-ffmpeg/obs-ffmpeg-hls-mux.c | 332 +++++++++++++++++++++ plugins/obs-ffmpeg/obs-ffmpeg-mux.c | 23 ++ plugins/obs-ffmpeg/obs-ffmpeg-mux.h | 13 +- plugins/obs-ffmpeg/obs-ffmpeg.c | 2 + 6 files changed, 384 insertions(+), 6 deletions(-) create mode 100644 plugins/obs-ffmpeg/obs-ffmpeg-hls-mux.c diff --git a/plugins/obs-ffmpeg/CMakeLists.txt b/plugins/obs-ffmpeg/CMakeLists.txt index e0d5e8b76..9df656b2d 100644 --- a/plugins/obs-ffmpeg/CMakeLists.txt +++ b/plugins/obs-ffmpeg/CMakeLists.txt @@ -30,6 +30,7 @@ set(obs-ffmpeg_SOURCES obs-ffmpeg-nvenc.c obs-ffmpeg-output.c obs-ffmpeg-mux.c + obs-ffmpeg-hls-mux.c obs-ffmpeg-source.c) if(UNIX AND NOT APPLE) diff --git a/plugins/obs-ffmpeg/ffmpeg-mux/ffmpeg-mux.c b/plugins/obs-ffmpeg/ffmpeg-mux/ffmpeg-mux.c index 2b664543f..b0fcb16aa 100644 --- a/plugins/obs-ffmpeg/ffmpeg-mux/ffmpeg-mux.c +++ b/plugins/obs-ffmpeg/ffmpeg-mux/ffmpeg-mux.c @@ -623,29 +623,38 @@ static inline int open_output_file(struct ffmpeg_mux *ffm) #define SRT_PROTO "srt" #define UDP_PROTO "udp" #define TCP_PROTO "tcp" +#define HTTP_PROTO "http" static int ffmpeg_mux_init_context(struct ffmpeg_mux *ffm) { AVOutputFormat *output_format; int ret; bool is_network = false; + bool is_http = false; + is_http = (strncmp(ffm->params.file, HTTP_PROTO, + sizeof(HTTP_PROTO) - 1) == 0); + if (strncmp(ffm->params.file, SRT_PROTO, sizeof(SRT_PROTO) - 1) == 0 || strncmp(ffm->params.file, UDP_PROTO, sizeof(UDP_PROTO) - 1) == 0 || - strncmp(ffm->params.file, TCP_PROTO, sizeof(TCP_PROTO) - 1) == 0) + strncmp(ffm->params.file, TCP_PROTO, sizeof(TCP_PROTO) - 1) == 0 || + is_http) { is_network = true; - - if (is_network) { avformat_network_init(); + } + + if (is_network && !is_http) output_format = av_guess_format("mpegts", NULL, "video/M2PT"); - } else { + else output_format = av_guess_format(NULL, ffm->params.file, NULL); - } if (output_format == NULL) { fprintf(stderr, "Couldn't find an appropriate muxer for '%s'\n", ffm->params.printable_file.array); return FFM_ERROR; } + printf("info: Output format name and long_name: %s, %s\n", + output_format->name ? output_format->name : "unknown", + output_format->long_name ? output_format->long_name : "unknown"); ret = avformat_alloc_output_context2(&ffm->output, output_format, NULL, ffm->params.file); diff --git a/plugins/obs-ffmpeg/obs-ffmpeg-hls-mux.c b/plugins/obs-ffmpeg/obs-ffmpeg-hls-mux.c new file mode 100644 index 000000000..365699c72 --- /dev/null +++ b/plugins/obs-ffmpeg/obs-ffmpeg-hls-mux.c @@ -0,0 +1,332 @@ +#include "obs-ffmpeg-mux.h" + +#define do_log(level, format, ...) \ + blog(level, "[ffmpeg hls muxer: '%s'] " format, \ + obs_output_get_name(stream->output), ##__VA_ARGS__) + +#define warn(format, ...) do_log(LOG_WARNING, format, ##__VA_ARGS__) +#define info(format, ...) do_log(LOG_INFO, format, ##__VA_ARGS__) + +const char *ffmpeg_hls_mux_getname(void *type) +{ + UNUSED_PARAMETER(type); + return obs_module_text("FFmpegHlsMuxer"); +} + +int hls_stream_dropped_frames(void *data) +{ + struct ffmpeg_muxer *stream = data; + return stream->dropped_frames; +} + +void ffmpeg_hls_mux_destroy(void *data) +{ + struct ffmpeg_muxer *stream = data; + + if (stream) { + deactivate(stream, 0); + + pthread_mutex_destroy(&stream->write_mutex); + os_sem_destroy(stream->write_sem); + os_event_destroy(stream->stop_event); + + da_free(stream->mux_packets); + circlebuf_free(&stream->packets); + + os_process_pipe_destroy(stream->pipe); + dstr_free(&stream->path); + dstr_free(&stream->printable_path); + dstr_free(&stream->stream_key); + dstr_free(&stream->muxer_settings); + bfree(data); + } +} + +void *ffmpeg_hls_mux_create(obs_data_t *settings, obs_output_t *output) +{ + struct ffmpeg_muxer *stream = bzalloc(sizeof(*stream)); + pthread_mutex_init_value(&stream->write_mutex); + stream->output = output; + + /* init mutex, semaphore and event */ + if (pthread_mutex_init(&stream->write_mutex, NULL) != 0) + goto fail; + if (os_event_init(&stream->stop_event, OS_EVENT_TYPE_AUTO) != 0) + goto fail; + if (os_sem_init(&stream->write_sem, 0) != 0) + goto fail; + + UNUSED_PARAMETER(settings); + return stream; + +fail: + ffmpeg_hls_mux_destroy(stream); + return NULL; +} + +static bool process_packet(struct ffmpeg_muxer *stream) +{ + struct encoder_packet packet; + bool has_packet = false; + bool ret = true; + + pthread_mutex_lock(&stream->write_mutex); + + if (stream->packets.size) { + circlebuf_pop_front(&stream->packets, &packet, sizeof(packet)); + has_packet = true; + } + + pthread_mutex_unlock(&stream->write_mutex); + + if (has_packet) { + ret = write_packet(stream, &packet); + obs_encoder_packet_release(&packet); + } + return ret; +} + +static void *write_thread(void *data) +{ + struct ffmpeg_muxer *stream = data; + + while (os_sem_wait(stream->write_sem) == 0) { + if (os_event_try(stream->stop_event) == 0) + return NULL; + + if (!process_packet(stream)) + break; + } + + obs_output_signal_stop(stream->output, OBS_OUTPUT_ERROR); + deactivate(stream, 0); + return NULL; +} + +bool ffmpeg_hls_mux_start(void *data) +{ + struct ffmpeg_muxer *stream = data; + obs_service_t *service; + const char *path_str; + const char *stream_key; + struct dstr path = {0}; + obs_encoder_t *vencoder; + obs_data_t *settings; + int keyint_sec; + + if (!obs_output_can_begin_data_capture(stream->output, 0)) + return false; + if (!obs_output_initialize_encoders(stream->output, 0)) + return false; + + service = obs_output_get_service(stream->output); + if (!service) + return false; + path_str = obs_service_get_url(service); + stream_key = obs_service_get_key(service); + dstr_copy(&stream->stream_key, stream_key); + dstr_copy(&path, path_str); + dstr_replace(&path, "{stream_key}", stream_key); + dstr_copy(&stream->muxer_settings, + "method=PUT http_persistent=1 ignore_io_errors=1 "); + dstr_catf(&stream->muxer_settings, "http_user_agent=libobs/%s", + OBS_VERSION); + + vencoder = obs_output_get_video_encoder(stream->output); + settings = obs_encoder_get_settings(vencoder); + keyint_sec = obs_data_get_int(settings, "keyint_sec"); + if (keyint_sec) { + dstr_catf(&stream->muxer_settings, " hls_time=%d", keyint_sec); + stream->keyint_sec = keyint_sec; + } + + obs_data_release(settings); + + start_pipe(stream, path.array); + dstr_free(&path); + + if (!stream->pipe) { + obs_output_set_last_error( + stream->output, obs_module_text("HelperProcessFailed")); + warn("Failed to create process pipe"); + return false; + } + stream->mux_thread_joinable = pthread_create(&stream->mux_thread, NULL, + write_thread, stream) == 0; + if (!stream->mux_thread_joinable) + return false; + + /* write headers and start capture */ + os_atomic_set_bool(&stream->active, true); + os_atomic_set_bool(&stream->capturing, true); + stream->is_hls = true; + stream->total_bytes = 0; + stream->dropped_frames = 0; + stream->min_priority = 0; + + obs_output_begin_data_capture(stream->output, 0); + + dstr_copy(&stream->printable_path, path_str); + info("Writing to path '%s'...", stream->printable_path.array); + return true; +} + +static bool write_packet_to_buf(struct ffmpeg_muxer *stream, + struct encoder_packet *packet) +{ + circlebuf_push_back(&stream->packets, packet, + sizeof(struct encoder_packet)); + return true; +} + +static void drop_frames(struct ffmpeg_muxer *stream, int highest_priority) +{ + struct circlebuf new_buf = {0}; + int num_frames_dropped = 0; + + circlebuf_reserve(&new_buf, sizeof(struct encoder_packet) * 8); + + while (stream->packets.size) { + struct encoder_packet packet; + circlebuf_pop_front(&stream->packets, &packet, sizeof(packet)); + + /* do not drop audio data or video keyframes */ + if (packet.type == OBS_ENCODER_AUDIO || + packet.drop_priority >= highest_priority) { + circlebuf_push_back(&new_buf, &packet, sizeof(packet)); + } else { + num_frames_dropped++; + obs_encoder_packet_release(&packet); + } + } + + circlebuf_free(&stream->packets); + stream->packets = new_buf; + + if (stream->min_priority < highest_priority) + stream->min_priority = highest_priority; + + stream->dropped_frames += num_frames_dropped; +} + +static bool find_first_video_packet(struct ffmpeg_muxer *stream, + struct encoder_packet *first) +{ + size_t count = stream->packets.size / sizeof(*first); + + for (size_t i = 0; i < count; i++) { + struct encoder_packet *cur = + circlebuf_data(&stream->packets, i * sizeof(*first)); + if (cur->type == OBS_ENCODER_VIDEO && !cur->keyframe) { + *first = *cur; + return true; + } + } + return false; +} + +void check_to_drop_frames(struct ffmpeg_muxer *stream, bool pframes) +{ + struct encoder_packet first; + int64_t buffer_duration_usec; + int priority = pframes ? OBS_NAL_PRIORITY_HIGHEST + : OBS_NAL_PRIORITY_HIGH; + int keyint_sec = stream->keyint_sec; + int64_t drop_threshold_sec = keyint_sec ? 2 * keyint_sec : 10; + + if (!find_first_video_packet(stream, &first)) + return; + + buffer_duration_usec = stream->last_dts_usec - first.dts_usec; + + if (buffer_duration_usec > drop_threshold_sec * 1000000) + drop_frames(stream, priority); +} + +static bool add_video_packet(struct ffmpeg_muxer *stream, + struct encoder_packet *packet) +{ + check_to_drop_frames(stream, false); + check_to_drop_frames(stream, true); + + /* if currently dropping frames, drop packets until it reaches the + * desired priority */ + if (packet->drop_priority < stream->min_priority) { + stream->dropped_frames++; + return false; + } else { + stream->min_priority = 0; + } + + stream->last_dts_usec = packet->dts_usec; + return write_packet_to_buf(stream, packet); +} + +void ffmpeg_hls_mux_data(void *data, struct encoder_packet *packet) +{ + struct ffmpeg_muxer *stream = data; + struct encoder_packet new_packet; + struct encoder_packet tmp_packet; + bool added_packet = false; + + if (!active(stream)) + return; + + /* encoder failure */ + if (!packet) { + deactivate(stream, OBS_OUTPUT_ENCODE_ERROR); + return; + } + + if (!stream->sent_headers) { + if (!send_headers(stream)) + return; + stream->sent_headers = true; + } + + if (stopping(stream)) { + if (packet->sys_dts_usec >= stream->stop_ts) { + deactivate(stream, 0); + return; + } + } + + if (packet->type == OBS_ENCODER_VIDEO) { + obs_parse_avc_packet(&tmp_packet, packet); + packet->drop_priority = tmp_packet.priority; + obs_encoder_packet_release(&tmp_packet); + } + obs_encoder_packet_ref(&new_packet, packet); + + pthread_mutex_lock(&stream->write_mutex); + + if (active(stream)) { + added_packet = + (packet->type == OBS_ENCODER_VIDEO) + ? add_video_packet(stream, &new_packet) + : write_packet_to_buf(stream, &new_packet); + } + + pthread_mutex_unlock(&stream->write_mutex); + + if (added_packet) + os_sem_post(stream->write_sem); + else + obs_encoder_packet_release(&new_packet); +} + +struct obs_output_info ffmpeg_hls_muxer = { + .id = "ffmpeg_hls_muxer", + .flags = OBS_OUTPUT_AV | OBS_OUTPUT_ENCODED | OBS_OUTPUT_MULTI_TRACK | + OBS_OUTPUT_SERVICE, + .encoded_video_codecs = "h264", + .encoded_audio_codecs = "aac", + .get_name = ffmpeg_hls_mux_getname, + .create = ffmpeg_hls_mux_create, + .destroy = ffmpeg_hls_mux_destroy, + .start = ffmpeg_hls_mux_start, + .stop = ffmpeg_mux_stop, + .encoded_packet = ffmpeg_hls_mux_data, + .get_total_bytes = ffmpeg_mux_total_bytes, + .get_dropped_frames = hls_stream_dropped_frames, +}; diff --git a/plugins/obs-ffmpeg/obs-ffmpeg-mux.c b/plugins/obs-ffmpeg/obs-ffmpeg-mux.c index ee277a42b..83f04b64a 100644 --- a/plugins/obs-ffmpeg/obs-ffmpeg-mux.c +++ b/plugins/obs-ffmpeg/obs-ffmpeg-mux.c @@ -67,6 +67,7 @@ static void ffmpeg_mux_destroy(void *data) if (stream->mux_thread_joinable) pthread_join(stream->mux_thread, NULL); da_free(stream->mux_packets); + circlebuf_free(&stream->packets); os_process_pipe_destroy(stream->pipe); dstr_free(&stream->path); @@ -364,6 +365,15 @@ int deactivate(struct ffmpeg_muxer *stream, int code) { int ret = -1; + if (stream->is_hls) { + if (stream->mux_thread_joinable) { + os_event_signal(stream->stop_event); + os_sem_post(stream->write_sem); + pthread_join(stream->mux_thread, NULL); + stream->mux_thread_joinable = false; + } + } + if (active(stream)) { ret = os_process_pipe_destroy(stream->pipe); stream->pipe = NULL; @@ -383,6 +393,19 @@ int deactivate(struct ffmpeg_muxer *stream, int code) obs_output_end_data_capture(stream->output); } + if (stream->is_hls) { + pthread_mutex_lock(&stream->write_mutex); + + while (stream->packets.size) { + struct encoder_packet packet; + circlebuf_pop_front(&stream->packets, &packet, + sizeof(packet)); + obs_encoder_packet_release(&packet); + } + + pthread_mutex_unlock(&stream->write_mutex); + } + os_atomic_set_bool(&stream->stopping, false); return ret; } diff --git a/plugins/obs-ffmpeg/obs-ffmpeg-mux.h b/plugins/obs-ffmpeg/obs-ffmpeg-mux.h index ebe06ca29..eb9dbe1df 100644 --- a/plugins/obs-ffmpeg/obs-ffmpeg-mux.h +++ b/plugins/obs-ffmpeg/obs-ffmpeg-mux.h @@ -33,12 +33,23 @@ struct ffmpeg_muxer { int keyframes; obs_hotkey_id hotkey; volatile bool muxing; - DARRAY(struct encoder_packet) mux_packets; + + /* these are accessed both by replay buffer and by HLS */ pthread_t mux_thread; bool mux_thread_joinable; struct circlebuf packets; + /* HLS only */ + int keyint_sec; + pthread_mutex_t write_mutex; + os_sem_t *write_sem; + os_event_t *stop_event; + bool is_hls; + int dropped_frames; + int min_priority; + int64_t last_dts_usec; + bool is_network; }; diff --git a/plugins/obs-ffmpeg/obs-ffmpeg.c b/plugins/obs-ffmpeg/obs-ffmpeg.c index 38a7c8573..5a9a3e06b 100644 --- a/plugins/obs-ffmpeg/obs-ffmpeg.c +++ b/plugins/obs-ffmpeg/obs-ffmpeg.c @@ -24,6 +24,7 @@ extern struct obs_output_info ffmpeg_output; extern struct obs_output_info ffmpeg_muxer; extern struct obs_output_info ffmpeg_mpegts_muxer; extern struct obs_output_info replay_buffer; +extern struct obs_output_info ffmpeg_hls_muxer; extern struct obs_encoder_info aac_encoder_info; extern struct obs_encoder_info opus_encoder_info; extern struct obs_encoder_info nvenc_encoder_info; @@ -231,6 +232,7 @@ bool obs_module_load(void) obs_register_output(&ffmpeg_output); obs_register_output(&ffmpeg_muxer); obs_register_output(&ffmpeg_mpegts_muxer); + obs_register_output(&ffmpeg_hls_muxer); obs_register_output(&replay_buffer); obs_register_encoder(&aac_encoder_info); obs_register_encoder(&opus_encoder_info); -- GitLab