diff --git a/simpleVideoRecorder/src/main.cpp b/simpleVideoRecorder/src/main.cpp index c179a18c7f0222526013171a76d7fcf17d2ea7d0..508881ee59fdd4e9b3dcc1efe612c6509adcee6c 100644 --- a/simpleVideoRecorder/src/main.cpp +++ b/simpleVideoRecorder/src/main.cpp @@ -2,15 +2,16 @@ #include #include "esp_camera.h" #define CAMERA_MODEL_AI_THINKER +#include "UdpClient.hpp" #include "ai_thinker_esp32_cam_meta.h" -char* ssid = "test0"; +const char* ssid = "test0"; const char* passwd = "12345687"; const char* host = "192.168.137.1"; const uint16_t serverUdpPort = 8004; const uint16_t localUdpPort = 2333; -WiFiUDP streamSender; +LightUDP streamSender; void connectWifi(const char* ssid, const char* passphrase) { WiFi.mode(WIFI_STA); @@ -75,28 +76,18 @@ void setup() { Serial.println("get sensor "); sensor_t* s = esp_camera_sensor_get(); // drop down frame size for higher initial frame rate - s->set_framesize(s, FRAMESIZE_VGA); + s->set_framesize(s, FRAMESIZE_SVGA); connectWifi(ssid, passwd); - Serial.println("connect stream channel"); streamSender.begin(WiFi.localIP(), localUdpPort); - - // 发送mac地址作为设备序列号,用于摄像头频道号 - uint8_t mac[6]; - WiFi.macAddress(mac); - char macStr[12] = {0}; - sprintf(macStr, "%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], - mac[4], mac[5]); - Serial.println("sprint mac "); - streamSender.beginPacket(host, serverUdpPort); - streamSender.print(String(macStr)); - streamSender.endPacket(); + streamSender.setServer(host, serverUdpPort); } void loop() { camera_fb_t* fb = NULL; size_t len = 0; Serial.println("do loop"); + while (true) { fb = esp_camera_fb_get(); if (!fb) { @@ -104,13 +95,7 @@ void loop() { return; } len = fb->len; - streamSender.beginPacket(host, serverUdpPort); - streamSender.write(fb->buf, len); - streamSender.endPacket(); - // 这个库会自动将大于MTU的帧拆成多帧,而不是利用自动ip分片, - // 导致服务器端需要合并帧,此处发送空包标识结束 - streamSender.beginPacket(host, serverUdpPort); - streamSender.endPacket(); + streamSender.send(fb->buf, len); esp_camera_fb_return(fb); } } \ No newline at end of file diff --git a/src/org/btik/server/VideoServer.java b/src/org/btik/server/VideoServer.java index 2ffc35c40742cb428bd9cc649f2a4ae21f1ab793..22c40984479d5284e2d089402ba04db4f3a457f9 100644 --- a/src/org/btik/server/VideoServer.java +++ b/src/org/btik/server/VideoServer.java @@ -1,7 +1,7 @@ package org.btik.server; -import org.btik.server.video.VideoChannel; +import org.btik.server.video.device.iface.VideoChannel; /** * 视频服务 diff --git a/src/org/btik/server/video/Main.java b/src/org/btik/server/video/Main.java index 57847a8863bb495b6885e8c5b287e8390160229d..10405bc0ea184d7dba8a134b27c78c082fd32326 100644 --- a/src/org/btik/server/video/Main.java +++ b/src/org/btik/server/video/Main.java @@ -1,6 +1,8 @@ package org.btik.server.video; +import org.btik.server.video.device.task.AsyncTaskExecutor; import org.btik.server.video.device.tcp.BioDeviceChannel; +import org.btik.server.video.device.web.BioHttpVideoServer; import java.io.FileInputStream; import java.io.IOException; diff --git a/src/org/btik/server/video/UDP2Main.java b/src/org/btik/server/video/UDP2Main.java new file mode 100644 index 0000000000000000000000000000000000000000..e769a070927b244e32e8efcef9b3f4798d6e366e --- /dev/null +++ b/src/org/btik/server/video/UDP2Main.java @@ -0,0 +1,59 @@ +package org.btik.server.video; + +import org.btik.server.video.device.task.AsyncTaskExecutor; +import org.btik.server.video.device.udp2.BufferPool; +import org.btik.server.video.device.udp2.NewUDPDeviceChannel; +import org.btik.server.video.device.web.BioHttpVideoServer; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +public class UDP2Main { + static Properties properties; + + static { + properties = new Properties(); + try { + properties.load(new FileInputStream("light-video.properties")); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * 读取配置信息 + * + * @param key 配置key + * @param def 获取为空时的默认值 + */ + private static int getIntProp(String key, int def) { + Object o = properties.get(key); + if (o == null) { + return def; + } + return Integer.parseInt(o.toString()); + } + + public static void main(String[] args) { + AsyncTaskExecutor asyncTaskExecutor = new AsyncTaskExecutor(); + asyncTaskExecutor.start(); + + BioHttpVideoServer bioHttpVideoServer = new BioHttpVideoServer(); + bioHttpVideoServer.setHttpPort(getIntProp("http.port", 8003)); + bioHttpVideoServer.setAsyncTaskExecutor(asyncTaskExecutor); + + BufferPool bufferPool = new BufferPool(); + bufferPool.setBufferPoolSize(getIntProp("udp.video.buffer.pool.size", 500)); + NewUDPDeviceChannel deviceChannel = new NewUDPDeviceChannel(); + deviceChannel.setVideoServer(bioHttpVideoServer); + + deviceChannel.setBufferPool(bufferPool); + deviceChannel.setStreamPort(getIntProp("stream.port", 8004)); + deviceChannel.setDispatcherPoolSize(getIntProp("udp.video.dispatcher.thread.size", 8)); + deviceChannel.start(); + + bioHttpVideoServer.setDevChannel(deviceChannel); + bioHttpVideoServer.start(); + } +} diff --git a/src/org/btik/server/video/UDPMain.java b/src/org/btik/server/video/UDPMain.java index d4bb9c64da6bf80fba2983f6450bf15a2a6b05dd..ea17043864cf7d044da4131097e4cabe66e708eb 100644 --- a/src/org/btik/server/video/UDPMain.java +++ b/src/org/btik/server/video/UDPMain.java @@ -1,7 +1,9 @@ package org.btik.server.video; +import org.btik.server.video.device.task.AsyncTaskExecutor; import org.btik.server.video.device.udp.BufferPool; import org.btik.server.video.device.udp.UDPDeviceChannel; +import org.btik.server.video.device.web.BioHttpVideoServer; import java.io.FileInputStream; import java.io.IOException; diff --git a/src/org/btik/server/DevChannel.java b/src/org/btik/server/video/device/iface/DevChannel.java similarity index 64% rename from src/org/btik/server/DevChannel.java rename to src/org/btik/server/video/device/iface/DevChannel.java index 14ca6d6e0c891f000c83439aedf51cbbf7aa3bd6..951dab444b8174c01e70032a17324a5d74ee325a 100644 --- a/src/org/btik/server/DevChannel.java +++ b/src/org/btik/server/video/device/iface/DevChannel.java @@ -1,4 +1,4 @@ -package org.btik.server; +package org.btik.server.video.device.iface; /** * 设备通道 diff --git a/src/org/btik/server/video/HttpConstant.java b/src/org/btik/server/video/device/iface/HttpConstant.java similarity index 95% rename from src/org/btik/server/video/HttpConstant.java rename to src/org/btik/server/video/device/iface/HttpConstant.java index afbcf92b300db9083ed56f2e86430ac6190dc7ac..49b309ecb7052fb42031572273887db32b400398 100644 --- a/src/org/btik/server/video/HttpConstant.java +++ b/src/org/btik/server/video/device/iface/HttpConstant.java @@ -1,4 +1,4 @@ -package org.btik.server.video; +package org.btik.server.video.device.iface; import java.nio.charset.StandardCharsets; diff --git a/src/org/btik/server/video/VideoChannel.java b/src/org/btik/server/video/device/iface/VideoChannel.java similarity index 88% rename from src/org/btik/server/video/VideoChannel.java rename to src/org/btik/server/video/device/iface/VideoChannel.java index 30a1d8a3fb84bd62bf5703c9f0d225f58c7cf35a..bf1f4d868c41e7eb3b2ae91128aa7ca2d46aaba5 100644 --- a/src/org/btik/server/video/VideoChannel.java +++ b/src/org/btik/server/video/device/iface/VideoChannel.java @@ -1,4 +1,4 @@ -package org.btik.server.video; +package org.btik.server.video.device.iface; public interface VideoChannel { /** diff --git a/src/org/btik/server/video/AsyncTaskExecutor.java b/src/org/btik/server/video/device/task/AsyncTaskExecutor.java similarity index 95% rename from src/org/btik/server/video/AsyncTaskExecutor.java rename to src/org/btik/server/video/device/task/AsyncTaskExecutor.java index 325ddc885a7922b4277cbe28f47c7d6b1b9dfe5e..fd121d2d8c1f6ae845a372255ba0e66b2a76a683 100644 --- a/src/org/btik/server/video/AsyncTaskExecutor.java +++ b/src/org/btik/server/video/device/task/AsyncTaskExecutor.java @@ -1,4 +1,4 @@ -package org.btik.server.video; +package org.btik.server.video.device.task; import java.util.concurrent.LinkedBlockingQueue; diff --git a/src/org/btik/server/video/device/tcp/BioDeviceChannel.java b/src/org/btik/server/video/device/tcp/BioDeviceChannel.java index eee1a8c269d1fd0e0537ad382f9709b3db872e8a..49931d6788a901085543f36f7c3ce7eb774a6f6e 100644 --- a/src/org/btik/server/video/device/tcp/BioDeviceChannel.java +++ b/src/org/btik/server/video/device/tcp/BioDeviceChannel.java @@ -1,10 +1,10 @@ package org.btik.server.video.device.tcp; -import org.btik.server.DevChannel; +import org.btik.server.video.device.iface.DevChannel; import org.btik.server.VideoServer; -import org.btik.server.video.AsyncTaskExecutor; -import org.btik.server.video.VideoChannel; +import org.btik.server.video.device.task.AsyncTaskExecutor; +import org.btik.server.video.device.iface.VideoChannel; import java.io.IOException; diff --git a/src/org/btik/server/video/device/tcp/FrameBuffer.java b/src/org/btik/server/video/device/tcp/FrameBuffer.java index 627da0089a39bfd05a8b952f3d7d40c66f84c554..a945e9d366e16ed7d0a021ea72f5ab9b0f1c9bdd 100644 --- a/src/org/btik/server/video/device/tcp/FrameBuffer.java +++ b/src/org/btik/server/video/device/tcp/FrameBuffer.java @@ -1,6 +1,6 @@ package org.btik.server.video.device.tcp; -import org.btik.server.video.VideoChannel; +import org.btik.server.video.device.iface.VideoChannel; import java.io.ByteArrayOutputStream; diff --git a/src/org/btik/server/video/device/tcp/FrameReceiver.java b/src/org/btik/server/video/device/tcp/FrameReceiver.java index 65f7b1ca757a8c730d5d159cb475742677c56836..5e39c359937bb7c26cdb82d0f7477b2693fe1d75 100644 --- a/src/org/btik/server/video/device/tcp/FrameReceiver.java +++ b/src/org/btik/server/video/device/tcp/FrameReceiver.java @@ -1,8 +1,7 @@ package org.btik.server.video.device.tcp; -import org.btik.server.video.VideoChannel; -import org.btik.server.video.device.tcp.FrameBuffer; +import org.btik.server.video.device.iface.VideoChannel; import java.io.IOException; diff --git a/src/org/btik/server/video/device/udp/UDPDeviceChannel.java b/src/org/btik/server/video/device/udp/UDPDeviceChannel.java index b283a106ad5c951c5c1cbfea70f618c1e8cc73e3..63bcaf13530c2c57eb51374937e3633ff9019aa7 100644 --- a/src/org/btik/server/video/device/udp/UDPDeviceChannel.java +++ b/src/org/btik/server/video/device/udp/UDPDeviceChannel.java @@ -1,11 +1,11 @@ package org.btik.server.video.device.udp; -import org.btik.server.DevChannel; +import org.btik.server.video.device.iface.DevChannel; import org.btik.server.VideoServer; import org.btik.server.util.ByteUtil; import org.btik.server.util.NamePrefixThreadFactory; -import org.btik.server.video.AsyncTaskExecutor; -import org.btik.server.video.VideoChannel; +import org.btik.server.video.device.task.AsyncTaskExecutor; +import org.btik.server.video.device.iface.VideoChannel; import java.io.IOException; import java.net.DatagramPacket; diff --git a/src/org/btik/server/video/device/udp2/BufferPool.java b/src/org/btik/server/video/device/udp2/BufferPool.java new file mode 100644 index 0000000000000000000000000000000000000000..6583b460126dc2361b14d107b36611b235c466ca --- /dev/null +++ b/src/org/btik/server/video/device/udp2/BufferPool.java @@ -0,0 +1,51 @@ +package org.btik.server.video.device.udp2; + + +import java.util.concurrent.ConcurrentLinkedQueue; + +public class BufferPool { + /** + * 接收图片缓冲区大小,
+ * 与TCP不同的是,若图片大于当前帧大小,会截断则无法得到完整图片,默认40KB + */ + private static final int RECEIVE_BUFFER_SIZE = 64512; + + /** + * 帧缓冲池,避免反复new帧缓冲区 + */ + private final ConcurrentLinkedQueue frameBufferPool = new ConcurrentLinkedQueue<>(); + + /** + * 初始缓存区池大小,本身会自动扩容,随着设备增多可以设置合理值 + */ + private int bufferPoolSize = 500; + + public void setBufferPoolSize(int bufferPoolSize) { + this.bufferPoolSize = bufferPoolSize; + } + + private void init() { + for (int i = 0; i < bufferPoolSize; i++) { + frameBufferPool.add(new FrameBuffer(new byte[RECEIVE_BUFFER_SIZE])); + } + } + + public FrameBuffer getFrameBuffer() { + FrameBuffer buffer = frameBufferPool.poll(); + if (buffer == null) { + System.out.println("mem up"); + // 自动扩容 + buffer = new FrameBuffer(new byte[RECEIVE_BUFFER_SIZE]); + } + return buffer; + } + + public void returnBuffer(FrameBuffer buffer) { + frameBufferPool.add(buffer); + + } + + public BufferPool() { + init(); + } +} diff --git a/src/org/btik/server/video/device/udp2/FrameBuffer.java b/src/org/btik/server/video/device/udp2/FrameBuffer.java new file mode 100644 index 0000000000000000000000000000000000000000..0e6e98a12d1a29b05e537ce953fd07fceb67f398 --- /dev/null +++ b/src/org/btik/server/video/device/udp2/FrameBuffer.java @@ -0,0 +1,14 @@ +package org.btik.server.video.device.udp2; + +public class FrameBuffer { + // 2 + 4 + 2字节 2 字节的0 4字节ip 2字节端口 + long address; + + byte[] data; + + int size; + + public FrameBuffer(byte[] data) { + this.data = data; + } +} diff --git a/src/org/btik/server/video/device/udp2/NewUDPDeviceChannel.java b/src/org/btik/server/video/device/udp2/NewUDPDeviceChannel.java new file mode 100644 index 0000000000000000000000000000000000000000..6d4a3a1396ca05c2719f79ea15e085c1af94a6b3 --- /dev/null +++ b/src/org/btik/server/video/device/udp2/NewUDPDeviceChannel.java @@ -0,0 +1,193 @@ +package org.btik.server.video.device.udp2; + +import org.btik.server.VideoServer; +import org.btik.server.util.ByteUtil; +import org.btik.server.util.NamePrefixThreadFactory; +import org.btik.server.video.device.iface.DevChannel; +import org.btik.server.video.device.iface.VideoChannel; +import org.btik.server.video.device.task.AsyncTaskExecutor; + + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.util.HashMap; +import java.util.concurrent.*; + +public class NewUDPDeviceChannel extends Thread implements DevChannel { + + private static final int SN_LEN = 16; + private volatile boolean runFlag = true; + + /** + * 帧通道端口号 + */ + private int streamPort; + + private VideoServer videoServer; + + private ExecutorService executorService; + + private BufferPool bufferPool; + + + private FrameDispatcher[] frameDispatchers; + + /** + * 帧分发线程数量,随着设备增多可以适当增加 + */ + private int dispatcherPoolSize = 8; + + public void setStreamPort(int streamPort) { + this.streamPort = streamPort; + } + + public void setVideoServer(VideoServer videoServer) { + this.videoServer = videoServer; + } + + public void setBufferPool(BufferPool bufferPool) { + this.bufferPool = bufferPool; + } + + /** + * 可选的输入值 1 2 4 8 16 32 64 128 256几个数字,根据cpu核数和设备的数量选择合适的值 + * ,输入其它值也会被映射到以上值,如果只有一个摄像头设备那就一个足够,线程数太多而cpu核数过少, + * 反而因为线程不断切换使得效率更低 + */ + public void setDispatcherPoolSize(int dispatcherPoolSize) { + int maximumCapacity = 256; + int n = -1 >>> Integer.numberOfLeadingZeros(dispatcherPoolSize - 1); + this.dispatcherPoolSize = (n < 0) ? 1 : (n >= maximumCapacity) ? maximumCapacity : n + 1; + } + + + private final HashMap videoChannelMap = new HashMap<>(); + + + @Override + public synchronized void start() { + System.out.println("init buffer pool"); + + System.out.println("start dispatchers"); + frameDispatchers = new FrameDispatcher[dispatcherPoolSize]; + executorService = new ThreadPoolExecutor(dispatcherPoolSize, dispatcherPoolSize, + 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamePrefixThreadFactory("frameDispatcher")); + for (int i = 0; i < dispatcherPoolSize; i++) { + FrameDispatcher msgDispatcher = new FrameDispatcher(); + frameDispatchers[i] = msgDispatcher; + executorService.submit(msgDispatcher); + } + + System.out.println("udp channel loaded"); + super.start(); + } + + @Override + public void run() { + + try (DatagramSocket serverSocket = new DatagramSocket(streamPort)) { + FrameBuffer frameBuffer = bufferPool.getFrameBuffer(); + DatagramPacket datagramPacket = new DatagramPacket(frameBuffer.data, 0, frameBuffer.data.length); + while (runFlag) { + serverSocket.receive(datagramPacket); + InetAddress address = datagramPacket.getAddress(); + frameBuffer.address = (long) address.hashCode() << 16 | datagramPacket.getPort(); + frameBuffer.size = datagramPacket.getLength(); + frameDispatchers[(int) (frameBuffer.address & dispatcherPoolSize - 1)].messages.add(frameBuffer); + // 切换缓冲区 + frameBuffer = bufferPool.getFrameBuffer(); + datagramPacket.setData(frameBuffer.data); + } + } catch (IOException e) { + System.out.println(" start server failed:" + e.getMessage()); + } + } + + public void shutDown() { + runFlag = false; + // 无消息导致阻塞时,没有读到flag,帮助退出阻塞 + for (FrameDispatcher frameDispatcher : frameDispatchers) { + frameDispatcher.messages.add(new FrameBuffer(new byte[0])); + } + // 线程池核心线程也需要停止 + executorService.shutdown(); + } + + + @Override + public int channelIdLen() { + return SN_LEN; + } + + + static class UDPDev { + byte[] frame; + + int[] sizeTable; + VideoChannel videoChannel; + + BufferPool bufferPool; + long address; + + int segmentIndex = 0; + + public UDPDev(byte[] frame, VideoChannel videoChannel, BufferPool bufferPool, long address) { + this.frame = frame; + this.sizeTable = new int[frame.length]; + this.videoChannel = videoChannel; + this.address = address; + this.bufferPool = bufferPool; + } + + + } + + class FrameDispatcher implements Runnable { + LinkedBlockingQueue messages = new LinkedBlockingQueue<>(); + + @Override + public void run() { + + try { + while (runFlag) { + FrameBuffer segment = messages.take(); + try { + long address = segment.address; + VideoChannel videoChannel = videoChannelMap.get(address); + if (videoChannel != null) { + videoChannel.sendFrame(segment.data, segment.size); + } else { + onNewStreamOpen(segment); + } + } catch (Exception e) { + if (runFlag) { + e.printStackTrace(); + } else { + break; + } + } finally { + // 归还到池里 + bufferPool.returnBuffer(segment); + } + + } + } catch (InterruptedException e) { + e.printStackTrace(); + System.out.println("exit by:" + e); + } + System.out.println("exit : " + getName()); + } + + } + + private void onNewStreamOpen(FrameBuffer frame) { + byte[] sn = new byte[SN_LEN + 1]; + System.arraycopy(ByteUtil.toHexString(frame.address), 0, sn, 1, SN_LEN); + VideoChannel channel = videoServer.createChannel(sn); + videoChannelMap.put(frame.address, channel); + } + + +} diff --git a/src/org/btik/server/video/BioHttpVideoServer.java b/src/org/btik/server/video/device/web/BioHttpVideoServer.java similarity index 95% rename from src/org/btik/server/video/BioHttpVideoServer.java rename to src/org/btik/server/video/device/web/BioHttpVideoServer.java index a9fe53e42f07eaf031779926c1d51da5a73c2c16..83b62e1f225ba94d99645d026343ee04d050d0d4 100644 --- a/src/org/btik/server/video/BioHttpVideoServer.java +++ b/src/org/btik/server/video/device/web/BioHttpVideoServer.java @@ -1,9 +1,12 @@ -package org.btik.server.video; +package org.btik.server.video.device.web; -import org.btik.server.DevChannel; +import org.btik.server.video.device.iface.DevChannel; import org.btik.server.VideoServer; import org.btik.server.util.ByteUtil; +import org.btik.server.video.device.task.AsyncTaskExecutor; +import org.btik.server.video.device.iface.HttpConstant; +import org.btik.server.video.device.iface.VideoChannel; import java.io.IOException; import java.io.InputStream; diff --git a/src/org/btik/server/video/MJPEGVideoChannel.java b/src/org/btik/server/video/device/web/MJPEGVideoChannel.java similarity index 95% rename from src/org/btik/server/video/MJPEGVideoChannel.java rename to src/org/btik/server/video/device/web/MJPEGVideoChannel.java index f9c94dff4bab9054423f4b2b310b8065a578dd76..83657f967355efd46728ce09e76eb8ee33573522 100644 --- a/src/org/btik/server/video/MJPEGVideoChannel.java +++ b/src/org/btik/server/video/device/web/MJPEGVideoChannel.java @@ -1,6 +1,9 @@ -package org.btik.server.video; +package org.btik.server.video.device.web; import org.btik.server.util.ByteUtil; +import org.btik.server.video.device.task.AsyncTaskExecutor; +import org.btik.server.video.device.iface.HttpConstant; +import org.btik.server.video.device.iface.VideoChannel; import java.io.IOException; import java.io.OutputStream;