提交 201a3227 编写于 作者: 云逸之's avatar 云逸之 💬

修改udp发送帧大小,使用单帧ip报文发送单帧图片

上级 20265605
......@@ -2,15 +2,16 @@
#include <WiFi.h>
#include "esp_camera.h"
#define CAMERA_MODEL_AI_THINKER
#include "UdpClient.hpp"
#include "ai_thinker_esp32_cam_meta.h"
char* ssid = "test0";
const char* ssid = "test0";
const char* passwd = "12345687";
const char* host = "192.168.137.1";
const uint16_t serverUdpPort = 8004;
const uint16_t localUdpPort = 2333;
WiFiUDP streamSender;
LightUDP streamSender;
void connectWifi(const char* ssid, const char* passphrase) {
WiFi.mode(WIFI_STA);
......@@ -75,28 +76,18 @@ void setup() {
Serial.println("get sensor ");
sensor_t* s = esp_camera_sensor_get();
// drop down frame size for higher initial frame rate
s->set_framesize(s, FRAMESIZE_VGA);
s->set_framesize(s, FRAMESIZE_SVGA);
connectWifi(ssid, passwd);
Serial.println("connect stream channel");
streamSender.begin(WiFi.localIP(), localUdpPort);
// 发送mac地址作为设备序列号,用于摄像头频道号
uint8_t mac[6];
WiFi.macAddress(mac);
char macStr[12] = {0};
sprintf(macStr, "%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3],
mac[4], mac[5]);
Serial.println("sprint mac ");
streamSender.beginPacket(host, serverUdpPort);
streamSender.print(String(macStr));
streamSender.endPacket();
streamSender.setServer(host, serverUdpPort);
}
void loop() {
camera_fb_t* fb = NULL;
size_t len = 0;
Serial.println("do loop");
while (true) {
fb = esp_camera_fb_get();
if (!fb) {
......@@ -104,13 +95,7 @@ void loop() {
return;
}
len = fb->len;
streamSender.beginPacket(host, serverUdpPort);
streamSender.write(fb->buf, len);
streamSender.endPacket();
// 这个库会自动将大于MTU的帧拆成多帧,而不是利用自动ip分片,
// 导致服务器端需要合并帧,此处发送空包标识结束
streamSender.beginPacket(host, serverUdpPort);
streamSender.endPacket();
streamSender.send(fb->buf, len);
esp_camera_fb_return(fb);
}
}
\ No newline at end of file
package org.btik.server;
import org.btik.server.video.VideoChannel;
import org.btik.server.video.device.iface.VideoChannel;
/**
* 视频服务
......
package org.btik.server.video;
import org.btik.server.video.device.task.AsyncTaskExecutor;
import org.btik.server.video.device.tcp.BioDeviceChannel;
import org.btik.server.video.device.web.BioHttpVideoServer;
import java.io.FileInputStream;
import java.io.IOException;
......
package org.btik.server.video;
import org.btik.server.video.device.task.AsyncTaskExecutor;
import org.btik.server.video.device.udp2.BufferPool;
import org.btik.server.video.device.udp2.NewUDPDeviceChannel;
import org.btik.server.video.device.web.BioHttpVideoServer;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
public class UDP2Main {
static Properties properties;
static {
properties = new Properties();
try {
properties.load(new FileInputStream("light-video.properties"));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 读取配置信息
*
* @param key 配置key
* @param def 获取为空时的默认值
*/
private static int getIntProp(String key, int def) {
Object o = properties.get(key);
if (o == null) {
return def;
}
return Integer.parseInt(o.toString());
}
public static void main(String[] args) {
AsyncTaskExecutor asyncTaskExecutor = new AsyncTaskExecutor();
asyncTaskExecutor.start();
BioHttpVideoServer bioHttpVideoServer = new BioHttpVideoServer();
bioHttpVideoServer.setHttpPort(getIntProp("http.port", 8003));
bioHttpVideoServer.setAsyncTaskExecutor(asyncTaskExecutor);
BufferPool bufferPool = new BufferPool();
bufferPool.setBufferPoolSize(getIntProp("udp.video.buffer.pool.size", 500));
NewUDPDeviceChannel deviceChannel = new NewUDPDeviceChannel();
deviceChannel.setVideoServer(bioHttpVideoServer);
deviceChannel.setBufferPool(bufferPool);
deviceChannel.setStreamPort(getIntProp("stream.port", 8004));
deviceChannel.setDispatcherPoolSize(getIntProp("udp.video.dispatcher.thread.size", 8));
deviceChannel.start();
bioHttpVideoServer.setDevChannel(deviceChannel);
bioHttpVideoServer.start();
}
}
package org.btik.server.video;
import org.btik.server.video.device.task.AsyncTaskExecutor;
import org.btik.server.video.device.udp.BufferPool;
import org.btik.server.video.device.udp.UDPDeviceChannel;
import org.btik.server.video.device.web.BioHttpVideoServer;
import java.io.FileInputStream;
import java.io.IOException;
......
package org.btik.server;
package org.btik.server.video.device.iface;
/**
* 设备通道
......
package org.btik.server.video;
package org.btik.server.video.device.iface;
import java.nio.charset.StandardCharsets;
......
package org.btik.server.video;
package org.btik.server.video.device.iface;
public interface VideoChannel {
/**
......
package org.btik.server.video;
package org.btik.server.video.device.task;
import java.util.concurrent.LinkedBlockingQueue;
......
package org.btik.server.video.device.tcp;
import org.btik.server.DevChannel;
import org.btik.server.video.device.iface.DevChannel;
import org.btik.server.VideoServer;
import org.btik.server.video.AsyncTaskExecutor;
import org.btik.server.video.VideoChannel;
import org.btik.server.video.device.task.AsyncTaskExecutor;
import org.btik.server.video.device.iface.VideoChannel;
import java.io.IOException;
......
package org.btik.server.video.device.tcp;
import org.btik.server.video.VideoChannel;
import org.btik.server.video.device.iface.VideoChannel;
import java.io.ByteArrayOutputStream;
......
package org.btik.server.video.device.tcp;
import org.btik.server.video.VideoChannel;
import org.btik.server.video.device.tcp.FrameBuffer;
import org.btik.server.video.device.iface.VideoChannel;
import java.io.IOException;
......
package org.btik.server.video.device.udp;
import org.btik.server.DevChannel;
import org.btik.server.video.device.iface.DevChannel;
import org.btik.server.VideoServer;
import org.btik.server.util.ByteUtil;
import org.btik.server.util.NamePrefixThreadFactory;
import org.btik.server.video.AsyncTaskExecutor;
import org.btik.server.video.VideoChannel;
import org.btik.server.video.device.task.AsyncTaskExecutor;
import org.btik.server.video.device.iface.VideoChannel;
import java.io.IOException;
import java.net.DatagramPacket;
......
package org.btik.server.video.device.udp2;
import java.util.concurrent.ConcurrentLinkedQueue;
public class BufferPool {
/**
* 接收图片缓冲区大小,<br>
* 与TCP不同的是,若图片大于当前帧大小,会截断则无法得到完整图片,默认40KB
*/
private static final int RECEIVE_BUFFER_SIZE = 64512;
/**
* 帧缓冲池,避免反复new帧缓冲区
*/
private final ConcurrentLinkedQueue<FrameBuffer> frameBufferPool = new ConcurrentLinkedQueue<>();
/**
* 初始缓存区池大小,本身会自动扩容,随着设备增多可以设置合理值
*/
private int bufferPoolSize = 500;
public void setBufferPoolSize(int bufferPoolSize) {
this.bufferPoolSize = bufferPoolSize;
}
private void init() {
for (int i = 0; i < bufferPoolSize; i++) {
frameBufferPool.add(new FrameBuffer(new byte[RECEIVE_BUFFER_SIZE]));
}
}
public FrameBuffer getFrameBuffer() {
FrameBuffer buffer = frameBufferPool.poll();
if (buffer == null) {
System.out.println("mem up");
// 自动扩容
buffer = new FrameBuffer(new byte[RECEIVE_BUFFER_SIZE]);
}
return buffer;
}
public void returnBuffer(FrameBuffer buffer) {
frameBufferPool.add(buffer);
}
public BufferPool() {
init();
}
}
package org.btik.server.video.device.udp2;
public class FrameBuffer {
// 2 + 4 + 2字节 2 字节的0 4字节ip 2字节端口
long address;
byte[] data;
int size;
public FrameBuffer(byte[] data) {
this.data = data;
}
}
package org.btik.server.video.device.udp2;
import org.btik.server.VideoServer;
import org.btik.server.util.ByteUtil;
import org.btik.server.util.NamePrefixThreadFactory;
import org.btik.server.video.device.iface.DevChannel;
import org.btik.server.video.device.iface.VideoChannel;
import org.btik.server.video.device.task.AsyncTaskExecutor;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.concurrent.*;
public class NewUDPDeviceChannel extends Thread implements DevChannel {
private static final int SN_LEN = 16;
private volatile boolean runFlag = true;
/**
* 帧通道端口号
*/
private int streamPort;
private VideoServer videoServer;
private ExecutorService executorService;
private BufferPool bufferPool;
private FrameDispatcher[] frameDispatchers;
/**
* 帧分发线程数量,随着设备增多可以适当增加
*/
private int dispatcherPoolSize = 8;
public void setStreamPort(int streamPort) {
this.streamPort = streamPort;
}
public void setVideoServer(VideoServer videoServer) {
this.videoServer = videoServer;
}
public void setBufferPool(BufferPool bufferPool) {
this.bufferPool = bufferPool;
}
/**
* 可选的输入值 1 2 4 8 16 32 64 128 256几个数字,根据cpu核数和设备的数量选择合适的值
* ,输入其它值也会被映射到以上值,如果只有一个摄像头设备那就一个足够,线程数太多而cpu核数过少,
* 反而因为线程不断切换使得效率更低
*/
public void setDispatcherPoolSize(int dispatcherPoolSize) {
int maximumCapacity = 256;
int n = -1 >>> Integer.numberOfLeadingZeros(dispatcherPoolSize - 1);
this.dispatcherPoolSize = (n < 0) ? 1 : (n >= maximumCapacity) ? maximumCapacity : n + 1;
}
private final HashMap<Long, VideoChannel> videoChannelMap = new HashMap<>();
@Override
public synchronized void start() {
System.out.println("init buffer pool");
System.out.println("start dispatchers");
frameDispatchers = new FrameDispatcher[dispatcherPoolSize];
executorService = new ThreadPoolExecutor(dispatcherPoolSize, dispatcherPoolSize,
0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamePrefixThreadFactory("frameDispatcher"));
for (int i = 0; i < dispatcherPoolSize; i++) {
FrameDispatcher msgDispatcher = new FrameDispatcher();
frameDispatchers[i] = msgDispatcher;
executorService.submit(msgDispatcher);
}
System.out.println("udp channel loaded");
super.start();
}
@Override
public void run() {
try (DatagramSocket serverSocket = new DatagramSocket(streamPort)) {
FrameBuffer frameBuffer = bufferPool.getFrameBuffer();
DatagramPacket datagramPacket = new DatagramPacket(frameBuffer.data, 0, frameBuffer.data.length);
while (runFlag) {
serverSocket.receive(datagramPacket);
InetAddress address = datagramPacket.getAddress();
frameBuffer.address = (long) address.hashCode() << 16 | datagramPacket.getPort();
frameBuffer.size = datagramPacket.getLength();
frameDispatchers[(int) (frameBuffer.address & dispatcherPoolSize - 1)].messages.add(frameBuffer);
// 切换缓冲区
frameBuffer = bufferPool.getFrameBuffer();
datagramPacket.setData(frameBuffer.data);
}
} catch (IOException e) {
System.out.println(" start server failed:" + e.getMessage());
}
}
public void shutDown() {
runFlag = false;
// 无消息导致阻塞时,没有读到flag,帮助退出阻塞
for (FrameDispatcher frameDispatcher : frameDispatchers) {
frameDispatcher.messages.add(new FrameBuffer(new byte[0]));
}
// 线程池核心线程也需要停止
executorService.shutdown();
}
@Override
public int channelIdLen() {
return SN_LEN;
}
static class UDPDev {
byte[] frame;
int[] sizeTable;
VideoChannel videoChannel;
BufferPool bufferPool;
long address;
int segmentIndex = 0;
public UDPDev(byte[] frame, VideoChannel videoChannel, BufferPool bufferPool, long address) {
this.frame = frame;
this.sizeTable = new int[frame.length];
this.videoChannel = videoChannel;
this.address = address;
this.bufferPool = bufferPool;
}
}
class FrameDispatcher implements Runnable {
LinkedBlockingQueue<FrameBuffer> messages = new LinkedBlockingQueue<>();
@Override
public void run() {
try {
while (runFlag) {
FrameBuffer segment = messages.take();
try {
long address = segment.address;
VideoChannel videoChannel = videoChannelMap.get(address);
if (videoChannel != null) {
videoChannel.sendFrame(segment.data, segment.size);
} else {
onNewStreamOpen(segment);
}
} catch (Exception e) {
if (runFlag) {
e.printStackTrace();
} else {
break;
}
} finally {
// 归还到池里
bufferPool.returnBuffer(segment);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("exit by:" + e);
}
System.out.println("exit : " + getName());
}
}
private void onNewStreamOpen(FrameBuffer frame) {
byte[] sn = new byte[SN_LEN + 1];
System.arraycopy(ByteUtil.toHexString(frame.address), 0, sn, 1, SN_LEN);
VideoChannel channel = videoServer.createChannel(sn);
videoChannelMap.put(frame.address, channel);
}
}
package org.btik.server.video;
package org.btik.server.video.device.web;
import org.btik.server.DevChannel;
import org.btik.server.video.device.iface.DevChannel;
import org.btik.server.VideoServer;
import org.btik.server.util.ByteUtil;
import org.btik.server.video.device.task.AsyncTaskExecutor;
import org.btik.server.video.device.iface.HttpConstant;
import org.btik.server.video.device.iface.VideoChannel;
import java.io.IOException;
import java.io.InputStream;
......
package org.btik.server.video;
package org.btik.server.video.device.web;
import org.btik.server.util.ByteUtil;
import org.btik.server.video.device.task.AsyncTaskExecutor;
import org.btik.server.video.device.iface.HttpConstant;
import org.btik.server.video.device.iface.VideoChannel;
import java.io.IOException;
import java.io.OutputStream;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册