diff --git a/app/meson.build b/app/meson.build index 02d24a34888def95ca1419d1cbba81ef61ad1ebe..dbde9d195b65bcced7545c24b06e503f2ae3d68d 100644 --- a/app/meson.build +++ b/app/meson.build @@ -1,5 +1,6 @@ src = [ 'src/main.c', + 'src/buffered_reader.c', 'src/command.c', 'src/control_msg.c', 'src/controller.c', diff --git a/app/src/buffered_reader.c b/app/src/buffered_reader.c new file mode 100644 index 0000000000000000000000000000000000000000..8aad8e60f5a6768481f6f5cfcf2737fc44bd830a --- /dev/null +++ b/app/src/buffered_reader.c @@ -0,0 +1,72 @@ +#include "buffered_reader.h" + +#include +#include "log.h" + +bool +buffered_reader_init(struct buffered_reader *reader, socket_t socket, + size_t bufsize) { + reader->buf = SDL_malloc(bufsize); + if (!reader->buf) { + LOGC("Could not allocate buffer"); + return false; + } + + reader->socket = socket; + reader->bufsize = bufsize; + reader->offset = 0; + reader->len = 0; + + return true; +} + +void +buffered_reader_destroy(struct buffered_reader *reader) { + SDL_free(reader->buf); +} + +static ssize_t +buffered_reader_fill(struct buffered_reader *reader) { + SDL_assert(!reader->len); + ssize_t r = net_recv(reader->socket, reader->buf, reader->bufsize); + if (r > 0) { + reader->offset = 0; + reader->len = r; + } + return r; +} + +ssize_t +buffered_reader_recv(struct buffered_reader *reader, void *buf, size_t count) { + if (!reader->len) { + // read from the socket + ssize_t r = buffered_reader_fill(reader); + if (r <= 0) { + return r; + } + } + + size_t r = count < reader->len ? count : reader->len; + memcpy(buf, reader->buf + reader->offset, r); + reader->offset += r; + reader->len -= r; + return r; +} + +ssize_t +buffered_reader_recv_all(struct buffered_reader *reader, void *buf, + size_t count) { + size_t done = 0; + while (done < count) { + ssize_t r = buffered_reader_recv(reader, buf, count - done); + if (r <= 0) { + // if there was some data, return them immediately + return done ? done : r; + } + + done += r; + buf += r; + } + + return done; +} diff --git a/app/src/buffered_reader.h b/app/src/buffered_reader.h new file mode 100644 index 0000000000000000000000000000000000000000..e4fb6bc60fb049be5c49843245098d85024a05b0 --- /dev/null +++ b/app/src/buffered_reader.h @@ -0,0 +1,29 @@ +#ifndef BUFFERED_READER_H +#define BUFFERED_READER_H + +#include "common.h" +#include "net.h" + +struct buffered_reader { + socket_t socket; + void *buf; + size_t bufsize; + size_t offset; + size_t len; +}; + +bool +buffered_reader_init(struct buffered_reader *reader, socket_t socket, + size_t bufsize); + +void +buffered_reader_destroy(struct buffered_reader *reader); + +ssize_t +buffered_reader_recv(struct buffered_reader *reader, void *buf, size_t count); + +ssize_t +buffered_reader_recv_all(struct buffered_reader *reader, void *buf, + size_t count); + +#endif diff --git a/app/src/scrcpy.c b/app/src/scrcpy.c index ed9887789bf8f59744e44fa1ebcdfb3db9b22167..664fa9c0a1229fe9cc1c818b7bfe4148c0f75a83 100644 --- a/app/src/scrcpy.c +++ b/app/src/scrcpy.c @@ -297,6 +297,7 @@ scrcpy(const struct scrcpy_options *options) { bool video_buffer_initialized = false; bool file_handler_initialized = false; bool recorder_initialized = false; + bool stream_initialized = false; bool stream_started = false; bool controller_initialized = false; bool controller_started = false; @@ -358,7 +359,10 @@ scrcpy(const struct scrcpy_options *options) { av_log_set_callback(av_log_callback); - stream_init(&stream, server.video_socket, dec, rec); + if (!stream_init(&stream, server.video_socket, dec, rec)) { + goto end; + } + stream_initialized = true; // now we consumed the header values, the socket receives the video stream // start the stream @@ -437,6 +441,9 @@ end: if (stream_started) { stream_join(&stream); } + if (stream_initialized) { + stream_destroy(&stream); + } if (controller_started) { controller_join(&controller); } diff --git a/app/src/stream.c b/app/src/stream.c index bca89f7129fd3da47e69289384ce4f613b7395ec..112d1f3df4593bf225f1607d79115d8ca9f0892a 100644 --- a/app/src/stream.c +++ b/app/src/stream.c @@ -17,7 +17,7 @@ #include "log.h" #include "recorder.h" -#define BUFSIZE 0x10000 +#define STREAM_BUFSIZE 0x10000 #define HEADER_SIZE 12 #define NO_PTS UINT64_C(-1) @@ -37,7 +37,8 @@ stream_recv_packet(struct stream *stream, AVPacket *packet) { // It is followed by bytes containing the packet/frame. uint8_t header[HEADER_SIZE]; - ssize_t r = net_recv_all(stream->socket, header, HEADER_SIZE); + ssize_t r = + buffered_reader_recv_all(&stream->buffered_reader, header, HEADER_SIZE); if (r < HEADER_SIZE) { return false; } @@ -51,7 +52,7 @@ stream_recv_packet(struct stream *stream, AVPacket *packet) { return false; } - r = net_recv_all(stream->socket, packet->data, len); + r = buffered_reader_recv_all(&stream->buffered_reader, packet->data, len); if (r < len) { av_packet_unref(packet); return false; @@ -267,13 +268,23 @@ end: return 0; } -void +bool stream_init(struct stream *stream, socket_t socket, struct decoder *decoder, struct recorder *recorder) { + if (!buffered_reader_init(&stream->buffered_reader, socket, + STREAM_BUFSIZE)) { + return false; + } stream->socket = socket; stream->decoder = decoder, stream->recorder = recorder; stream->has_pending = false; + return true; +} + +void +stream_destroy(struct stream *stream) { + buffered_reader_destroy(&stream->buffered_reader); } bool diff --git a/app/src/stream.h b/app/src/stream.h index 160ed7f5d490432fa86133e2e11ce5f9ee489c59..8a1d4599e287758f4be4961a996cfc4266291495 100644 --- a/app/src/stream.h +++ b/app/src/stream.h @@ -7,12 +7,14 @@ #include #include +#include "buffered_reader.h" #include "net.h" struct video_buffer; struct stream { socket_t socket; + struct buffered_reader buffered_reader; struct video_buffer *video_buffer; SDL_Thread *thread; struct decoder *decoder; @@ -25,10 +27,13 @@ struct stream { AVPacket pending; }; -void +bool stream_init(struct stream *stream, socket_t socket, struct decoder *decoder, struct recorder *recorder); +void +stream_destroy(struct stream *stream); + bool stream_start(struct stream *stream);