From 20265605007b0cf3f35e53abaf953c9ad13a6ec3 Mon Sep 17 00:00:00 2001 From: lustre Date: Sat, 6 Aug 2022 14:08:02 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=AF=E8=BF=90=E8=A1=8C=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- simpleVideoRecorder/src/main.cpp | 22 ++- src/org/btik/server/DevChannel.java | 8 + .../btik/server/video/BioHttpVideoServer.java | 11 +- .../btik/server/video/MJPEGVideoChannel.java | 42 ++++- src/org/btik/server/video/Main.java | 6 +- src/org/btik/server/video/UDPMain.java | 24 +-- src/org/btik/server/video/VideoChannel.java | 7 + .../device/{ => tcp}/BioDeviceChannel.java | 10 +- .../video/device/{ => tcp}/FrameBuffer.java | 3 +- .../video/device/{ => tcp}/FrameReceiver.java | 4 +- .../server/video/device/udp/BufferPool.java | 71 ++++++++ .../video/device/udp/FrameSegmentBuffer.java | 17 ++ .../device/{ => udp}/UDPDeviceChannel.java | 163 +++++++++++------- 13 files changed, 292 insertions(+), 96 deletions(-) create mode 100644 src/org/btik/server/DevChannel.java rename src/org/btik/server/video/device/{ => tcp}/BioDeviceChannel.java (94%) rename src/org/btik/server/video/device/{ => tcp}/FrameBuffer.java (96%) rename src/org/btik/server/video/device/{ => tcp}/FrameReceiver.java (97%) create mode 100644 src/org/btik/server/video/device/udp/BufferPool.java create mode 100644 src/org/btik/server/video/device/udp/FrameSegmentBuffer.java rename src/org/btik/server/video/device/{ => udp}/UDPDeviceChannel.java (50%) diff --git a/simpleVideoRecorder/src/main.cpp b/simpleVideoRecorder/src/main.cpp index 315c24b..c179a18 100644 --- a/simpleVideoRecorder/src/main.cpp +++ b/simpleVideoRecorder/src/main.cpp @@ -7,8 +7,10 @@ char* ssid = "test0"; const char* passwd = "12345687"; const char* host = "192.168.137.1"; +const uint16_t serverUdpPort = 8004; +const uint16_t localUdpPort = 2333; -WiFiClient streamSender; +WiFiUDP streamSender; void connectWifi(const char* ssid, const char* passphrase) { WiFi.mode(WIFI_STA); @@ -77,10 +79,8 @@ void setup() { connectWifi(ssid, passwd); Serial.println("connect stream channel"); - if (!streamSender.connect(host, 8004)) { - Serial.println("connect stream channel failed"); - } - streamSender.setNoDelay(true); + streamSender.begin(WiFi.localIP(), localUdpPort); + // 发送mac地址作为设备序列号,用于摄像头频道号 uint8_t mac[6]; WiFi.macAddress(mac); @@ -88,15 +88,15 @@ void setup() { sprintf(macStr, "%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); Serial.println("sprint mac "); + streamSender.beginPacket(host, serverUdpPort); streamSender.print(String(macStr)); - streamSender.flush(); + streamSender.endPacket(); } void loop() { camera_fb_t* fb = NULL; size_t len = 0; Serial.println("do loop"); - uint8_t end[5] = {'j', 'p', 'e', 'g', '\n'}; while (true) { fb = esp_camera_fb_get(); if (!fb) { @@ -104,9 +104,13 @@ void loop() { return; } len = fb->len; + streamSender.beginPacket(host, serverUdpPort); streamSender.write(fb->buf, len); - streamSender.write(end, 5); - streamSender.flush(); + streamSender.endPacket(); + // 这个库会自动将大于MTU的帧拆成多帧,而不是利用自动ip分片, + // 导致服务器端需要合并帧,此处发送空包标识结束 + streamSender.beginPacket(host, serverUdpPort); + streamSender.endPacket(); esp_camera_fb_return(fb); } } \ No newline at end of file diff --git a/src/org/btik/server/DevChannel.java b/src/org/btik/server/DevChannel.java new file mode 100644 index 0000000..14ca6d6 --- /dev/null +++ b/src/org/btik/server/DevChannel.java @@ -0,0 +1,8 @@ +package org.btik.server; + +/** + * 设备通道 + */ +public interface DevChannel { + int channelIdLen(); +} diff --git a/src/org/btik/server/video/BioHttpVideoServer.java b/src/org/btik/server/video/BioHttpVideoServer.java index 668df0f..a9fe53e 100644 --- a/src/org/btik/server/video/BioHttpVideoServer.java +++ b/src/org/btik/server/video/BioHttpVideoServer.java @@ -1,6 +1,7 @@ package org.btik.server.video; +import org.btik.server.DevChannel; import org.btik.server.VideoServer; import org.btik.server.util.ByteUtil; @@ -8,6 +9,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.*; import java.util.Arrays; +import java.util.Date; import java.util.Enumeration; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -24,6 +26,8 @@ public class BioHttpVideoServer extends Thread implements HttpConstant, VideoSer private AsyncTaskExecutor asyncTaskExecutor; + private DevChannel devChannel; + private int httpPort; private final ConcurrentHashMap videoChannelMap = new ConcurrentHashMap<>(); @@ -36,6 +40,10 @@ public class BioHttpVideoServer extends Thread implements HttpConstant, VideoSer this.httpPort = httpPort; } + public void setDevChannel(DevChannel devChannel) { + this.devChannel = devChannel; + } + @Override public synchronized void start() { super.start(); @@ -47,7 +55,7 @@ public class BioHttpVideoServer extends Thread implements HttpConstant, VideoSer try (ServerSocket serverSocket = new ServerSocket(httpPort)) { byte[] uri = new byte[URI_LEN]; //channel 是 /{sn} 的形式 目前为 12位字符 - byte[] channel = new byte[13]; + byte[] channel = new byte[devChannel.channelIdLen() + 1]; while (runFlag) { Socket client = serverSocket.accept(); InputStream inputStream = client.getInputStream(); @@ -68,6 +76,7 @@ public class BioHttpVideoServer extends Thread implements HttpConstant, VideoSer continue; } String channelStr = new String(channel); + System.out.println("pre open" + new Date()); executorService.submit(() -> doStreamOpen(client, channelStr)); } catch (IOException e) { disConnect(client, e); diff --git a/src/org/btik/server/video/MJPEGVideoChannel.java b/src/org/btik/server/video/MJPEGVideoChannel.java index 2dcee28..f9c94df 100644 --- a/src/org/btik/server/video/MJPEGVideoChannel.java +++ b/src/org/btik/server/video/MJPEGVideoChannel.java @@ -6,6 +6,7 @@ import java.io.IOException; import java.io.OutputStream; import java.net.Socket; import java.util.Collections; +import java.util.Date; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -52,11 +53,36 @@ public class MJPEGVideoChannel implements VideoChannel, HttpConstant { } } + static long l = System.currentTimeMillis(); + + @Override + public void sendFrame(byte[][] frame, int[] len, int segmentCount) { + int allLen = 0; + for (int i = 0; i < segmentCount; i++) { + allLen += len[i]; + } + byte[] lenStrBytes = ByteUtil.toString(allLen); + synchronized (clientLock) { + for (Socket client : clients) { + try { + OutputStream outputStream = client.getOutputStream(); + sendChunk(_STREAM_BOUNDARY, outputStream); + sendChunk(outputStream, _STREAM_PART, lenStrBytes, DOUBLE_LINE); + sendChunk(outputStream, allLen, len, frame); + outputStream.flush(); + } catch (IOException e) { + checkState(client, e); + } + + } + } + } + /** * 加入频道 */ public void joinChannel(Socket client) throws IOException { - System.out.println("open:" + client.getRemoteSocketAddress()); + System.out.println("open:" + client.getRemoteSocketAddress() + " " + new Date()); OutputStream outputStream = client.getOutputStream(); outputStream.write(STREAM_RESP_HEAD_BYTES); outputStream.flush(); @@ -102,6 +128,20 @@ public class MJPEGVideoChannel implements VideoChannel, HttpConstant { } + void sendChunk(OutputStream out, int allLen, final int[] length, byte[][] chunk) throws IOException { + out.write(NEW_LINE); + out.write(ByteUtil.toHexString(allLen)); + out.write(NEW_LINE); + for (int i = 0; i < length.length; i++) { + int len = length[i]; + if (len == 0) { + break; + } + out.write(chunk[i], 0, len); + } + + } + void disConnect(Socket socket, Exception e) { asyncTaskExecutor.execute(() -> { System.err.println(e.getMessage()); diff --git a/src/org/btik/server/video/Main.java b/src/org/btik/server/video/Main.java index f894c0f..57847a8 100644 --- a/src/org/btik/server/video/Main.java +++ b/src/org/btik/server/video/Main.java @@ -1,6 +1,6 @@ package org.btik.server.video; -import org.btik.server.video.device.BioDeviceChannel; +import org.btik.server.video.device.tcp.BioDeviceChannel; import java.io.FileInputStream; import java.io.IOException; @@ -43,7 +43,6 @@ public class Main { bioHttpVideoServer.setHttpPort(Integer.parseInt( getProp("http.port", "8003"))); bioHttpVideoServer.setAsyncTaskExecutor(asyncTaskExecutor); - bioHttpVideoServer.start(); BioDeviceChannel bioDeviceChannel = new BioDeviceChannel(); bioDeviceChannel.setAsyncTaskExecutor(asyncTaskExecutor); @@ -53,6 +52,9 @@ public class Main { bioDeviceChannel.setStreamPort(Integer.parseInt( getProp("stream.port", "8004"))); bioDeviceChannel.start(); + + bioHttpVideoServer.setDevChannel(bioDeviceChannel); + bioHttpVideoServer.start(); } } diff --git a/src/org/btik/server/video/UDPMain.java b/src/org/btik/server/video/UDPMain.java index bf86044..d4bb9c6 100644 --- a/src/org/btik/server/video/UDPMain.java +++ b/src/org/btik/server/video/UDPMain.java @@ -1,6 +1,7 @@ package org.btik.server.video; -import org.btik.server.video.device.UDPDeviceChannel; +import org.btik.server.video.device.udp.BufferPool; +import org.btik.server.video.device.udp.UDPDeviceChannel; import java.io.FileInputStream; import java.io.IOException; @@ -24,12 +25,12 @@ public class UDPMain { * @param key 配置key * @param def 获取为空时的默认值 */ - private static String getProp(String key, String def) { + private static int getIntProp(String key, int def) { Object o = properties.get(key); if (o == null) { return def; } - return String.valueOf(o); + return Integer.parseInt(o.toString()); } public static void main(String[] args) { @@ -37,18 +38,21 @@ public class UDPMain { asyncTaskExecutor.start(); BioHttpVideoServer bioHttpVideoServer = new BioHttpVideoServer(); - bioHttpVideoServer.setHttpPort(Integer.parseInt( - getProp("http.port", "8003"))); + bioHttpVideoServer.setHttpPort(getIntProp("http.port", 8003)); bioHttpVideoServer.setAsyncTaskExecutor(asyncTaskExecutor); - bioHttpVideoServer.start(); + BufferPool bufferPool = new BufferPool(); + bufferPool.setBufferPoolSize(getIntProp("udp.video.buffer.pool.size", 500)); UDPDeviceChannel deviceChannel = new UDPDeviceChannel(); deviceChannel.setAsyncTaskExecutor(asyncTaskExecutor); deviceChannel.setVideoServer(bioHttpVideoServer); - deviceChannel.setStreamPort(Integer.parseInt( - getProp("stream.port", "8004"))); - deviceChannel.setBufferPoolSize(Integer.parseInt(getProp("udp.video.buffer.pool.size", "500"))); - deviceChannel.setDispatcherPoolSize(Integer.parseInt(getProp("udp.video.dispatcher.thread.size", "8"))); + + deviceChannel.setBufferPool(bufferPool); + deviceChannel.setStreamPort(getIntProp("stream.port", 8004)); + deviceChannel.setDispatcherPoolSize(getIntProp("udp.video.dispatcher.thread.size", 8)); deviceChannel.start(); + + bioHttpVideoServer.setDevChannel(deviceChannel); + bioHttpVideoServer.start(); } } diff --git a/src/org/btik/server/video/VideoChannel.java b/src/org/btik/server/video/VideoChannel.java index 5752610..30a1d8a 100644 --- a/src/org/btik/server/video/VideoChannel.java +++ b/src/org/btik/server/video/VideoChannel.java @@ -7,4 +7,11 @@ public interface VideoChannel { * @param frame 一帧jpeg */ void sendFrame(byte[] frame, int len); + + /** + * 字节数组发送给不同客户端 + * + * @param frame 一帧jpeg + */ + void sendFrame(byte[][] frame, int[] len, int segmentCount); } diff --git a/src/org/btik/server/video/device/BioDeviceChannel.java b/src/org/btik/server/video/device/tcp/BioDeviceChannel.java similarity index 94% rename from src/org/btik/server/video/device/BioDeviceChannel.java rename to src/org/btik/server/video/device/tcp/BioDeviceChannel.java index e5b5fd2..eee1a8c 100644 --- a/src/org/btik/server/video/device/BioDeviceChannel.java +++ b/src/org/btik/server/video/device/tcp/BioDeviceChannel.java @@ -1,6 +1,7 @@ -package org.btik.server.video.device; +package org.btik.server.video.device.tcp; +import org.btik.server.DevChannel; import org.btik.server.VideoServer; import org.btik.server.video.AsyncTaskExecutor; import org.btik.server.video.VideoChannel; @@ -18,7 +19,7 @@ import java.util.concurrent.ConcurrentHashMap; /** * 发送帧设备接入通道 */ -public class BioDeviceChannel extends Thread { +public class BioDeviceChannel extends Thread implements DevChannel { private static final int SN_LEN = 12; private boolean runFlag = true; @@ -136,4 +137,9 @@ public class BioDeviceChannel extends Thread { public void setVideoServer(VideoServer videoServer) { this.videoServer = videoServer; } + + @Override + public int channelIdLen() { + return SN_LEN; + } } diff --git a/src/org/btik/server/video/device/FrameBuffer.java b/src/org/btik/server/video/device/tcp/FrameBuffer.java similarity index 96% rename from src/org/btik/server/video/device/FrameBuffer.java rename to src/org/btik/server/video/device/tcp/FrameBuffer.java index e1ed871..627da00 100644 --- a/src/org/btik/server/video/device/FrameBuffer.java +++ b/src/org/btik/server/video/device/tcp/FrameBuffer.java @@ -1,6 +1,5 @@ -package org.btik.server.video.device; +package org.btik.server.video.device.tcp; -import org.btik.server.VideoServer; import org.btik.server.video.VideoChannel; import java.io.ByteArrayOutputStream; diff --git a/src/org/btik/server/video/device/FrameReceiver.java b/src/org/btik/server/video/device/tcp/FrameReceiver.java similarity index 97% rename from src/org/btik/server/video/device/FrameReceiver.java rename to src/org/btik/server/video/device/tcp/FrameReceiver.java index 9892283..65f7b1c 100644 --- a/src/org/btik/server/video/device/FrameReceiver.java +++ b/src/org/btik/server/video/device/tcp/FrameReceiver.java @@ -1,8 +1,8 @@ -package org.btik.server.video.device; +package org.btik.server.video.device.tcp; -import org.btik.server.VideoServer; import org.btik.server.video.VideoChannel; +import org.btik.server.video.device.tcp.FrameBuffer; import java.io.IOException; diff --git a/src/org/btik/server/video/device/udp/BufferPool.java b/src/org/btik/server/video/device/udp/BufferPool.java new file mode 100644 index 0000000..60948be --- /dev/null +++ b/src/org/btik/server/video/device/udp/BufferPool.java @@ -0,0 +1,71 @@ +package org.btik.server.video.device.udp; + +import java.util.concurrent.ConcurrentLinkedQueue; + +public class BufferPool { + /** + * 接收图片缓冲区大小,
+ * 与TCP不同的是,若图片大于当前帧大小,会截断则无法得到完整图片,默认40KB + */ + private static final int RECEIVE_BUFFER_SIZE = 40960; + /** + * 帧平片段缓冲池,避免反复new帧缓冲区 + */ + private final ConcurrentLinkedQueue frameSegmentBufferPool = new ConcurrentLinkedQueue<>(); + + private final ConcurrentLinkedQueue frameSegmentBufferDataPool = new ConcurrentLinkedQueue<>(); + + /** + * 初始缓存区池大小,本身会自动扩容,随着设备增多可以设置合理值 + */ + private int bufferPoolSize = 500; + + public void setBufferPoolSize(int bufferPoolSize) { + this.bufferPoolSize = bufferPoolSize; + } + + private void init() { + for (int i = 0; i < bufferPoolSize; i++) { + frameSegmentBufferPool.add(new FrameSegmentBuffer(new byte[RECEIVE_BUFFER_SIZE])); + } + for (int i = 0; i < bufferPoolSize * 3; i++) { + frameSegmentBufferDataPool.add(new byte[RECEIVE_BUFFER_SIZE]); + } + } + + public FrameSegmentBuffer getFrameBuffer() { + FrameSegmentBuffer buffer = frameSegmentBufferPool.poll(); + if (buffer == null) { + System.out.println("mem up"); + // 自动扩容 + buffer = new FrameSegmentBuffer(new byte[RECEIVE_BUFFER_SIZE]); + } + return buffer; + } + + public void returnBuffer(FrameSegmentBuffer buffer) { + frameSegmentBufferPool.add(buffer); + + } + + public byte[] getBuffer() { + byte[] buffer = frameSegmentBufferDataPool.poll(); + if (buffer == null) { + System.out.println("data mem up"); + // 自动扩容 + buffer = new byte[RECEIVE_BUFFER_SIZE]; + } + return buffer; + } + + public void returnBufferData(byte[] buffer) { + if (buffer != null) { + frameSegmentBufferDataPool.add(buffer); + } + + } + + public BufferPool() { + init(); + } +} diff --git a/src/org/btik/server/video/device/udp/FrameSegmentBuffer.java b/src/org/btik/server/video/device/udp/FrameSegmentBuffer.java new file mode 100644 index 0000000..03ad153 --- /dev/null +++ b/src/org/btik/server/video/device/udp/FrameSegmentBuffer.java @@ -0,0 +1,17 @@ +package org.btik.server.video.device.udp; + +/** + * 帧片段 + */ +public class FrameSegmentBuffer { + // 2 + 4 + 2字节 2 字节的0 4字节ip 2字节端口 + long address; + + byte[] data; + + int size; + + public FrameSegmentBuffer(byte[] data) { + this.data = data; + } +} diff --git a/src/org/btik/server/video/device/UDPDeviceChannel.java b/src/org/btik/server/video/device/udp/UDPDeviceChannel.java similarity index 50% rename from src/org/btik/server/video/device/UDPDeviceChannel.java rename to src/org/btik/server/video/device/udp/UDPDeviceChannel.java index e757a20..b283a10 100644 --- a/src/org/btik/server/video/device/UDPDeviceChannel.java +++ b/src/org/btik/server/video/device/udp/UDPDeviceChannel.java @@ -1,6 +1,8 @@ -package org.btik.server.video.device; +package org.btik.server.video.device.udp; +import org.btik.server.DevChannel; import org.btik.server.VideoServer; +import org.btik.server.util.ByteUtil; import org.btik.server.util.NamePrefixThreadFactory; import org.btik.server.video.AsyncTaskExecutor; import org.btik.server.video.VideoChannel; @@ -15,19 +17,8 @@ import java.util.concurrent.*; /** * 发送帧设备接入通道 */ -public class UDPDeviceChannel extends Thread { - /** - * 接收图片缓冲区大小,
- * 与TCP不同的是,若图片大于当前帧大小,会截断则无法得到完整图片,默认40KB - */ - private static final int RECEIVE_BUFFER_SIZE = 40960; - - /** - * 初始缓存区池大小,本身会自动扩容,随着设备增多可以设置合理值 - */ - private int bufferPoolSize = 500; - - private static final int SN_LEN = 12; +public class UDPDeviceChannel extends Thread implements DevChannel { + private static final int SN_LEN = 16; private volatile boolean runFlag = true; /** @@ -41,6 +32,8 @@ public class UDPDeviceChannel extends Thread { private ExecutorService executorService; + private BufferPool bufferPool; + private FrameDispatcher[] frameDispatchers; @@ -61,6 +54,10 @@ public class UDPDeviceChannel extends Thread { this.asyncTaskExecutor = asyncTaskExecutor; } + public void setBufferPool(BufferPool bufferPool) { + this.bufferPool = bufferPool; + } + /** * 可选的输入值 1 2 4 8 16 32 64 128 256几个数字,根据cpu核数和设备的数量选择合适的值 * ,输入其它值也会被映射到以上值,如果只有一个摄像头设备那就一个足够,线程数太多而cpu核数过少, @@ -72,26 +69,15 @@ public class UDPDeviceChannel extends Thread { this.dispatcherPoolSize = (n < 0) ? 1 : (n >= maximumCapacity) ? maximumCapacity : n + 1; } - public void setBufferPoolSize(int bufferPoolSize) { - this.bufferPoolSize = bufferPoolSize; - } - /** - * 帧缓冲池,避免反复new帧缓冲区 - */ - private final ConcurrentLinkedQueue frameBufferPool = new ConcurrentLinkedQueue<>(); - - private final ConcurrentHashMap videoChannelMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap devMap = new ConcurrentHashMap<>(); @Override public synchronized void start() { System.out.println("init buffer pool"); - for (int i = 0; i < bufferPoolSize; i++) { - frameBufferPool.add(new FrameBuffer(new byte[RECEIVE_BUFFER_SIZE])); - } - super.start(); + System.out.println("start dispatchers"); frameDispatchers = new FrameDispatcher[dispatcherPoolSize]; executorService = new ThreadPoolExecutor(dispatcherPoolSize, dispatcherPoolSize, @@ -103,22 +89,24 @@ public class UDPDeviceChannel extends Thread { } System.out.println("udp channel loaded"); + super.start(); } @Override public void run() { try (DatagramSocket serverSocket = new DatagramSocket(streamPort)) { - FrameBuffer frameBuffer = getFrameBuffer(); - DatagramPacket datagramPacket = new DatagramPacket(frameBuffer.data, 0, frameBuffer.data.length); + FrameSegmentBuffer frameSegmentBuffer = bufferPool.getFrameBuffer(); + DatagramPacket datagramPacket = new DatagramPacket(frameSegmentBuffer.data, 0, frameSegmentBuffer.data.length); while (runFlag) { serverSocket.receive(datagramPacket); InetAddress address = datagramPacket.getAddress(); - frameBuffer.address = (long) address.hashCode() << 16 | datagramPacket.getPort(); - frameBuffer.size = datagramPacket.getLength(); - frameDispatchers[(int) (frameBuffer.address & dispatcherPoolSize - 1)].messages.add(frameBuffer); + frameSegmentBuffer.address = (long) address.hashCode() << 16 | datagramPacket.getPort(); + frameSegmentBuffer.size = datagramPacket.getLength(); + frameDispatchers[(int) (frameSegmentBuffer.address & dispatcherPoolSize - 1)].messages.add(frameSegmentBuffer); // 切换缓冲区 - frameBuffer = getFrameBuffer(); + frameSegmentBuffer = bufferPool.getFrameBuffer(); + datagramPacket.setData(frameSegmentBuffer.data); } } catch (IOException e) { System.out.println(" start server failed:" + e.getMessage()); @@ -129,77 +117,118 @@ public class UDPDeviceChannel extends Thread { runFlag = false; // 无消息导致阻塞时,没有读到flag,帮助退出阻塞 for (FrameDispatcher frameDispatcher : frameDispatchers) { - frameDispatcher.messages.add(new FrameBuffer(new byte[0])); + frameDispatcher.messages.add(new FrameSegmentBuffer(new byte[0])); } // 线程池核心线程也需要停止 executorService.shutdown(); } - private FrameBuffer getFrameBuffer() { - FrameBuffer buffer = frameBufferPool.poll(); - if (buffer == null) { - // 自动扩容 - buffer = new FrameBuffer(new byte[RECEIVE_BUFFER_SIZE]); - } - return buffer; + + @Override + public int channelIdLen() { + return SN_LEN; } - /** - * 单帧图片 - */ - static class FrameBuffer { - // 2 + 4 + 2字节 2 字节的0 4字节ip 2字节端口 + + static class UDPDev { + byte[][] frame; + + int[] sizeTable; + VideoChannel videoChannel; + + BufferPool bufferPool; long address; - byte[] data; + int segmentIndex = 0; - int size; + public UDPDev(byte[][] frame, VideoChannel videoChannel, BufferPool bufferPool, long address) { + this.frame = frame; + this.sizeTable = new int[frame.length]; + this.videoChannel = videoChannel; + this.address = address; + this.bufferPool = bufferPool; + } - public FrameBuffer(byte[] data) { - this.data = data; + public void appendSegment(FrameSegmentBuffer frameSegmentBuffer) { + + int size = frameSegmentBuffer.size; + // 判断结束标识 + if (size == 0) { + videoChannel.sendFrame(frame, sizeTable, segmentIndex + 1); + free(); + } else { + try { + sizeTable[segmentIndex] = size; + } catch (IndexOutOfBoundsException e) { + e.printStackTrace(); + // 将指针复位,防止累积后持续越界 + free(); + return; + } + + byte[] data = frameSegmentBuffer.data; + frame[segmentIndex++] = data; + // 被帧空间指向后,为帧片段重新分配内存 + frameSegmentBuffer.data = bufferPool.getBuffer(); + } + + } + + void free() { + for (int i = 0; i < segmentIndex; i++) { + bufferPool.returnBufferData(frame[i]); + frame[i] = null; + sizeTable[i] = 0; + } + segmentIndex = 0; } } class FrameDispatcher implements Runnable { - LinkedBlockingQueue messages = new LinkedBlockingQueue<>(); + LinkedBlockingQueue messages = new LinkedBlockingQueue<>(); @Override public void run() { try { while (runFlag) { - FrameBuffer frame = messages.take(); + long l = System.currentTimeMillis(); + FrameSegmentBuffer segment = messages.take(); try { - byte[] data = frame.data; - int length = data.length; - if (length == SN_LEN) { - asyncTaskExecutor.execute(() -> onNewStreamOpen(frame)); - continue; + long address = segment.address; + UDPDev dev = devMap.get(address); + if (dev != null) { + dev.appendSegment(segment); + } else { + onNewStreamOpen(segment); } - long address = frame.address; - VideoChannel channel = videoChannelMap.get(address); - if (channel != null) { - channel.sendFrame(data, length); + } catch (Exception e) { + if (runFlag) { + e.printStackTrace(); + } else { + break; } } finally { // 归还到池里 - frameBufferPool.add(frame); + bufferPool.returnBuffer(segment); + } } } catch (InterruptedException e) { + e.printStackTrace(); System.out.println("exit by:" + e); } + System.out.println("exit : " + getName()); } + } - private void onNewStreamOpen(FrameBuffer frame) { + private void onNewStreamOpen(FrameSegmentBuffer frame) { byte[] sn = new byte[SN_LEN + 1]; - System.arraycopy(frame.data, 0, sn, 1, SN_LEN); + System.arraycopy(ByteUtil.toHexString(frame.address), 0, sn, 1, SN_LEN); VideoChannel channel = videoServer.createChannel(sn); - videoChannelMap.put(frame.address, channel); - // 归还单帧缓冲区 - frameBufferPool.add(frame); + devMap.put(frame.address, new UDPDev(new byte[200][], channel, bufferPool, frame.address)); } } -- GitLab