提交 20265605 编写于 作者: 云逸之's avatar 云逸之 💬

可运行版本

上级 78f2a451
......@@ -7,8 +7,10 @@
char* ssid = "test0";
const char* passwd = "12345687";
const char* host = "192.168.137.1";
const uint16_t serverUdpPort = 8004;
const uint16_t localUdpPort = 2333;
WiFiClient streamSender;
WiFiUDP streamSender;
void connectWifi(const char* ssid, const char* passphrase) {
WiFi.mode(WIFI_STA);
......@@ -77,10 +79,8 @@ void setup() {
connectWifi(ssid, passwd);
Serial.println("connect stream channel");
if (!streamSender.connect(host, 8004)) {
Serial.println("connect stream channel failed");
}
streamSender.setNoDelay(true);
streamSender.begin(WiFi.localIP(), localUdpPort);
// 发送mac地址作为设备序列号,用于摄像头频道号
uint8_t mac[6];
WiFi.macAddress(mac);
......@@ -88,15 +88,15 @@ void setup() {
sprintf(macStr, "%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3],
mac[4], mac[5]);
Serial.println("sprint mac ");
streamSender.beginPacket(host, serverUdpPort);
streamSender.print(String(macStr));
streamSender.flush();
streamSender.endPacket();
}
void loop() {
camera_fb_t* fb = NULL;
size_t len = 0;
Serial.println("do loop");
uint8_t end[5] = {'j', 'p', 'e', 'g', '\n'};
while (true) {
fb = esp_camera_fb_get();
if (!fb) {
......@@ -104,9 +104,13 @@ void loop() {
return;
}
len = fb->len;
streamSender.beginPacket(host, serverUdpPort);
streamSender.write(fb->buf, len);
streamSender.write(end, 5);
streamSender.flush();
streamSender.endPacket();
// 这个库会自动将大于MTU的帧拆成多帧,而不是利用自动ip分片,
// 导致服务器端需要合并帧,此处发送空包标识结束
streamSender.beginPacket(host, serverUdpPort);
streamSender.endPacket();
esp_camera_fb_return(fb);
}
}
\ No newline at end of file
package org.btik.server;
/**
* 设备通道
*/
public interface DevChannel {
int channelIdLen();
}
package org.btik.server.video;
import org.btik.server.DevChannel;
import org.btik.server.VideoServer;
import org.btik.server.util.ByteUtil;
......@@ -8,6 +9,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.*;
import java.util.Arrays;
import java.util.Date;
import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
......@@ -24,6 +26,8 @@ public class BioHttpVideoServer extends Thread implements HttpConstant, VideoSer
private AsyncTaskExecutor asyncTaskExecutor;
private DevChannel devChannel;
private int httpPort;
private final ConcurrentHashMap<String, MJPEGVideoChannel> videoChannelMap = new ConcurrentHashMap<>();
......@@ -36,6 +40,10 @@ public class BioHttpVideoServer extends Thread implements HttpConstant, VideoSer
this.httpPort = httpPort;
}
public void setDevChannel(DevChannel devChannel) {
this.devChannel = devChannel;
}
@Override
public synchronized void start() {
super.start();
......@@ -47,7 +55,7 @@ public class BioHttpVideoServer extends Thread implements HttpConstant, VideoSer
try (ServerSocket serverSocket = new ServerSocket(httpPort)) {
byte[] uri = new byte[URI_LEN];
//channel 是 /{sn} 的形式 目前为 12位字符
byte[] channel = new byte[13];
byte[] channel = new byte[devChannel.channelIdLen() + 1];
while (runFlag) {
Socket client = serverSocket.accept();
InputStream inputStream = client.getInputStream();
......@@ -68,6 +76,7 @@ public class BioHttpVideoServer extends Thread implements HttpConstant, VideoSer
continue;
}
String channelStr = new String(channel);
System.out.println("pre open" + new Date());
executorService.submit(() -> doStreamOpen(client, channelStr));
} catch (IOException e) {
disConnect(client, e);
......
......@@ -6,6 +6,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Collections;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
......@@ -52,11 +53,36 @@ public class MJPEGVideoChannel implements VideoChannel, HttpConstant {
}
}
static long l = System.currentTimeMillis();
@Override
public void sendFrame(byte[][] frame, int[] len, int segmentCount) {
int allLen = 0;
for (int i = 0; i < segmentCount; i++) {
allLen += len[i];
}
byte[] lenStrBytes = ByteUtil.toString(allLen);
synchronized (clientLock) {
for (Socket client : clients) {
try {
OutputStream outputStream = client.getOutputStream();
sendChunk(_STREAM_BOUNDARY, outputStream);
sendChunk(outputStream, _STREAM_PART, lenStrBytes, DOUBLE_LINE);
sendChunk(outputStream, allLen, len, frame);
outputStream.flush();
} catch (IOException e) {
checkState(client, e);
}
}
}
}
/**
* 加入频道
*/
public void joinChannel(Socket client) throws IOException {
System.out.println("open:" + client.getRemoteSocketAddress());
System.out.println("open:" + client.getRemoteSocketAddress() + " " + new Date());
OutputStream outputStream = client.getOutputStream();
outputStream.write(STREAM_RESP_HEAD_BYTES);
outputStream.flush();
......@@ -102,6 +128,20 @@ public class MJPEGVideoChannel implements VideoChannel, HttpConstant {
}
void sendChunk(OutputStream out, int allLen, final int[] length, byte[][] chunk) throws IOException {
out.write(NEW_LINE);
out.write(ByteUtil.toHexString(allLen));
out.write(NEW_LINE);
for (int i = 0; i < length.length; i++) {
int len = length[i];
if (len == 0) {
break;
}
out.write(chunk[i], 0, len);
}
}
void disConnect(Socket socket, Exception e) {
asyncTaskExecutor.execute(() -> {
System.err.println(e.getMessage());
......
package org.btik.server.video;
import org.btik.server.video.device.BioDeviceChannel;
import org.btik.server.video.device.tcp.BioDeviceChannel;
import java.io.FileInputStream;
import java.io.IOException;
......@@ -43,7 +43,6 @@ public class Main {
bioHttpVideoServer.setHttpPort(Integer.parseInt(
getProp("http.port", "8003")));
bioHttpVideoServer.setAsyncTaskExecutor(asyncTaskExecutor);
bioHttpVideoServer.start();
BioDeviceChannel bioDeviceChannel = new BioDeviceChannel();
bioDeviceChannel.setAsyncTaskExecutor(asyncTaskExecutor);
......@@ -53,6 +52,9 @@ public class Main {
bioDeviceChannel.setStreamPort(Integer.parseInt(
getProp("stream.port", "8004")));
bioDeviceChannel.start();
bioHttpVideoServer.setDevChannel(bioDeviceChannel);
bioHttpVideoServer.start();
}
}
package org.btik.server.video;
import org.btik.server.video.device.UDPDeviceChannel;
import org.btik.server.video.device.udp.BufferPool;
import org.btik.server.video.device.udp.UDPDeviceChannel;
import java.io.FileInputStream;
import java.io.IOException;
......@@ -24,12 +25,12 @@ public class UDPMain {
* @param key 配置key
* @param def 获取为空时的默认值
*/
private static String getProp(String key, String def) {
private static int getIntProp(String key, int def) {
Object o = properties.get(key);
if (o == null) {
return def;
}
return String.valueOf(o);
return Integer.parseInt(o.toString());
}
public static void main(String[] args) {
......@@ -37,18 +38,21 @@ public class UDPMain {
asyncTaskExecutor.start();
BioHttpVideoServer bioHttpVideoServer = new BioHttpVideoServer();
bioHttpVideoServer.setHttpPort(Integer.parseInt(
getProp("http.port", "8003")));
bioHttpVideoServer.setHttpPort(getIntProp("http.port", 8003));
bioHttpVideoServer.setAsyncTaskExecutor(asyncTaskExecutor);
bioHttpVideoServer.start();
BufferPool bufferPool = new BufferPool();
bufferPool.setBufferPoolSize(getIntProp("udp.video.buffer.pool.size", 500));
UDPDeviceChannel deviceChannel = new UDPDeviceChannel();
deviceChannel.setAsyncTaskExecutor(asyncTaskExecutor);
deviceChannel.setVideoServer(bioHttpVideoServer);
deviceChannel.setStreamPort(Integer.parseInt(
getProp("stream.port", "8004")));
deviceChannel.setBufferPoolSize(Integer.parseInt(getProp("udp.video.buffer.pool.size", "500")));
deviceChannel.setDispatcherPoolSize(Integer.parseInt(getProp("udp.video.dispatcher.thread.size", "8")));
deviceChannel.setBufferPool(bufferPool);
deviceChannel.setStreamPort(getIntProp("stream.port", 8004));
deviceChannel.setDispatcherPoolSize(getIntProp("udp.video.dispatcher.thread.size", 8));
deviceChannel.start();
bioHttpVideoServer.setDevChannel(deviceChannel);
bioHttpVideoServer.start();
}
}
......@@ -7,4 +7,11 @@ public interface VideoChannel {
* @param frame 一帧jpeg
*/
void sendFrame(byte[] frame, int len);
/**
* 字节数组发送给不同客户端
*
* @param frame 一帧jpeg
*/
void sendFrame(byte[][] frame, int[] len, int segmentCount);
}
package org.btik.server.video.device;
package org.btik.server.video.device.tcp;
import org.btik.server.DevChannel;
import org.btik.server.VideoServer;
import org.btik.server.video.AsyncTaskExecutor;
import org.btik.server.video.VideoChannel;
......@@ -18,7 +19,7 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* 发送帧设备接入通道
*/
public class BioDeviceChannel extends Thread {
public class BioDeviceChannel extends Thread implements DevChannel {
private static final int SN_LEN = 12;
private boolean runFlag = true;
......@@ -136,4 +137,9 @@ public class BioDeviceChannel extends Thread {
public void setVideoServer(VideoServer videoServer) {
this.videoServer = videoServer;
}
@Override
public int channelIdLen() {
return SN_LEN;
}
}
package org.btik.server.video.device;
package org.btik.server.video.device.tcp;
import org.btik.server.VideoServer;
import org.btik.server.video.VideoChannel;
import java.io.ByteArrayOutputStream;
......
package org.btik.server.video.device;
package org.btik.server.video.device.tcp;
import org.btik.server.VideoServer;
import org.btik.server.video.VideoChannel;
import org.btik.server.video.device.tcp.FrameBuffer;
import java.io.IOException;
......
package org.btik.server.video.device.udp;
import java.util.concurrent.ConcurrentLinkedQueue;
public class BufferPool {
/**
* 接收图片缓冲区大小,<br>
* 与TCP不同的是,若图片大于当前帧大小,会截断则无法得到完整图片,默认40KB
*/
private static final int RECEIVE_BUFFER_SIZE = 40960;
/**
* 帧平片段缓冲池,避免反复new帧缓冲区
*/
private final ConcurrentLinkedQueue<FrameSegmentBuffer> frameSegmentBufferPool = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<byte[]> frameSegmentBufferDataPool = new ConcurrentLinkedQueue<>();
/**
* 初始缓存区池大小,本身会自动扩容,随着设备增多可以设置合理值
*/
private int bufferPoolSize = 500;
public void setBufferPoolSize(int bufferPoolSize) {
this.bufferPoolSize = bufferPoolSize;
}
private void init() {
for (int i = 0; i < bufferPoolSize; i++) {
frameSegmentBufferPool.add(new FrameSegmentBuffer(new byte[RECEIVE_BUFFER_SIZE]));
}
for (int i = 0; i < bufferPoolSize * 3; i++) {
frameSegmentBufferDataPool.add(new byte[RECEIVE_BUFFER_SIZE]);
}
}
public FrameSegmentBuffer getFrameBuffer() {
FrameSegmentBuffer buffer = frameSegmentBufferPool.poll();
if (buffer == null) {
System.out.println("mem up");
// 自动扩容
buffer = new FrameSegmentBuffer(new byte[RECEIVE_BUFFER_SIZE]);
}
return buffer;
}
public void returnBuffer(FrameSegmentBuffer buffer) {
frameSegmentBufferPool.add(buffer);
}
public byte[] getBuffer() {
byte[] buffer = frameSegmentBufferDataPool.poll();
if (buffer == null) {
System.out.println("data mem up");
// 自动扩容
buffer = new byte[RECEIVE_BUFFER_SIZE];
}
return buffer;
}
public void returnBufferData(byte[] buffer) {
if (buffer != null) {
frameSegmentBufferDataPool.add(buffer);
}
}
public BufferPool() {
init();
}
}
package org.btik.server.video.device.udp;
/**
* 帧片段
*/
public class FrameSegmentBuffer {
// 2 + 4 + 2字节 2 字节的0 4字节ip 2字节端口
long address;
byte[] data;
int size;
public FrameSegmentBuffer(byte[] data) {
this.data = data;
}
}
package org.btik.server.video.device;
package org.btik.server.video.device.udp;
import org.btik.server.DevChannel;
import org.btik.server.VideoServer;
import org.btik.server.util.ByteUtil;
import org.btik.server.util.NamePrefixThreadFactory;
import org.btik.server.video.AsyncTaskExecutor;
import org.btik.server.video.VideoChannel;
......@@ -15,19 +17,8 @@ import java.util.concurrent.*;
/**
* 发送帧设备接入通道
*/
public class UDPDeviceChannel extends Thread {
/**
* 接收图片缓冲区大小,<br>
* 与TCP不同的是,若图片大于当前帧大小,会截断则无法得到完整图片,默认40KB
*/
private static final int RECEIVE_BUFFER_SIZE = 40960;
/**
* 初始缓存区池大小,本身会自动扩容,随着设备增多可以设置合理值
*/
private int bufferPoolSize = 500;
private static final int SN_LEN = 12;
public class UDPDeviceChannel extends Thread implements DevChannel {
private static final int SN_LEN = 16;
private volatile boolean runFlag = true;
/**
......@@ -41,6 +32,8 @@ public class UDPDeviceChannel extends Thread {
private ExecutorService executorService;
private BufferPool bufferPool;
private FrameDispatcher[] frameDispatchers;
......@@ -61,6 +54,10 @@ public class UDPDeviceChannel extends Thread {
this.asyncTaskExecutor = asyncTaskExecutor;
}
public void setBufferPool(BufferPool bufferPool) {
this.bufferPool = bufferPool;
}
/**
* 可选的输入值 1 2 4 8 16 32 64 128 256几个数字,根据cpu核数和设备的数量选择合适的值
* ,输入其它值也会被映射到以上值,如果只有一个摄像头设备那就一个足够,线程数太多而cpu核数过少,
......@@ -72,26 +69,15 @@ public class UDPDeviceChannel extends Thread {
this.dispatcherPoolSize = (n < 0) ? 1 : (n >= maximumCapacity) ? maximumCapacity : n + 1;
}
public void setBufferPoolSize(int bufferPoolSize) {
this.bufferPoolSize = bufferPoolSize;
}
/**
* 帧缓冲池,避免反复new帧缓冲区
*/
private final ConcurrentLinkedQueue<FrameBuffer> frameBufferPool = new ConcurrentLinkedQueue<>();
private final ConcurrentHashMap<Long, VideoChannel> videoChannelMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, UDPDev> devMap = new ConcurrentHashMap<>();
@Override
public synchronized void start() {
System.out.println("init buffer pool");
for (int i = 0; i < bufferPoolSize; i++) {
frameBufferPool.add(new FrameBuffer(new byte[RECEIVE_BUFFER_SIZE]));
}
super.start();
System.out.println("start dispatchers");
frameDispatchers = new FrameDispatcher[dispatcherPoolSize];
executorService = new ThreadPoolExecutor(dispatcherPoolSize, dispatcherPoolSize,
......@@ -103,22 +89,24 @@ public class UDPDeviceChannel extends Thread {
}
System.out.println("udp channel loaded");
super.start();
}
@Override
public void run() {
try (DatagramSocket serverSocket = new DatagramSocket(streamPort)) {
FrameBuffer frameBuffer = getFrameBuffer();
DatagramPacket datagramPacket = new DatagramPacket(frameBuffer.data, 0, frameBuffer.data.length);
FrameSegmentBuffer frameSegmentBuffer = bufferPool.getFrameBuffer();
DatagramPacket datagramPacket = new DatagramPacket(frameSegmentBuffer.data, 0, frameSegmentBuffer.data.length);
while (runFlag) {
serverSocket.receive(datagramPacket);
InetAddress address = datagramPacket.getAddress();
frameBuffer.address = (long) address.hashCode() << 16 | datagramPacket.getPort();
frameBuffer.size = datagramPacket.getLength();
frameDispatchers[(int) (frameBuffer.address & dispatcherPoolSize - 1)].messages.add(frameBuffer);
frameSegmentBuffer.address = (long) address.hashCode() << 16 | datagramPacket.getPort();
frameSegmentBuffer.size = datagramPacket.getLength();
frameDispatchers[(int) (frameSegmentBuffer.address & dispatcherPoolSize - 1)].messages.add(frameSegmentBuffer);
// 切换缓冲区
frameBuffer = getFrameBuffer();
frameSegmentBuffer = bufferPool.getFrameBuffer();
datagramPacket.setData(frameSegmentBuffer.data);
}
} catch (IOException e) {
System.out.println(" start server failed:" + e.getMessage());
......@@ -129,77 +117,118 @@ public class UDPDeviceChannel extends Thread {
runFlag = false;
// 无消息导致阻塞时,没有读到flag,帮助退出阻塞
for (FrameDispatcher frameDispatcher : frameDispatchers) {
frameDispatcher.messages.add(new FrameBuffer(new byte[0]));
frameDispatcher.messages.add(new FrameSegmentBuffer(new byte[0]));
}
// 线程池核心线程也需要停止
executorService.shutdown();
}
private FrameBuffer getFrameBuffer() {
FrameBuffer buffer = frameBufferPool.poll();
if (buffer == null) {
// 自动扩容
buffer = new FrameBuffer(new byte[RECEIVE_BUFFER_SIZE]);
}
return buffer;
@Override
public int channelIdLen() {
return SN_LEN;
}
/**
* 单帧图片
*/
static class FrameBuffer {
// 2 + 4 + 2字节 2 字节的0 4字节ip 2字节端口
static class UDPDev {
byte[][] frame;
int[] sizeTable;
VideoChannel videoChannel;
BufferPool bufferPool;
long address;
byte[] data;
int segmentIndex = 0;
int size;
public UDPDev(byte[][] frame, VideoChannel videoChannel, BufferPool bufferPool, long address) {
this.frame = frame;
this.sizeTable = new int[frame.length];
this.videoChannel = videoChannel;
this.address = address;
this.bufferPool = bufferPool;
}
public FrameBuffer(byte[] data) {
this.data = data;
public void appendSegment(FrameSegmentBuffer frameSegmentBuffer) {
int size = frameSegmentBuffer.size;
// 判断结束标识
if (size == 0) {
videoChannel.sendFrame(frame, sizeTable, segmentIndex + 1);
free();
} else {
try {
sizeTable[segmentIndex] = size;
} catch (IndexOutOfBoundsException e) {
e.printStackTrace();
// 将指针复位,防止累积后持续越界
free();
return;
}
byte[] data = frameSegmentBuffer.data;
frame[segmentIndex++] = data;
// 被帧空间指向后,为帧片段重新分配内存
frameSegmentBuffer.data = bufferPool.getBuffer();
}
}
void free() {
for (int i = 0; i < segmentIndex; i++) {
bufferPool.returnBufferData(frame[i]);
frame[i] = null;
sizeTable[i] = 0;
}
segmentIndex = 0;
}
}
class FrameDispatcher implements Runnable {
LinkedBlockingQueue<FrameBuffer> messages = new LinkedBlockingQueue<>();
LinkedBlockingQueue<FrameSegmentBuffer> messages = new LinkedBlockingQueue<>();
@Override
public void run() {
try {
while (runFlag) {
FrameBuffer frame = messages.take();
long l = System.currentTimeMillis();
FrameSegmentBuffer segment = messages.take();
try {
byte[] data = frame.data;
int length = data.length;
if (length == SN_LEN) {
asyncTaskExecutor.execute(() -> onNewStreamOpen(frame));
continue;
long address = segment.address;
UDPDev dev = devMap.get(address);
if (dev != null) {
dev.appendSegment(segment);
} else {
onNewStreamOpen(segment);
}
long address = frame.address;
VideoChannel channel = videoChannelMap.get(address);
if (channel != null) {
channel.sendFrame(data, length);
} catch (Exception e) {
if (runFlag) {
e.printStackTrace();
} else {
break;
}
} finally {
// 归还到池里
frameBufferPool.add(frame);
bufferPool.returnBuffer(segment);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("exit by:" + e);
}
System.out.println("exit : " + getName());
}
}
private void onNewStreamOpen(FrameBuffer frame) {
private void onNewStreamOpen(FrameSegmentBuffer frame) {
byte[] sn = new byte[SN_LEN + 1];
System.arraycopy(frame.data, 0, sn, 1, SN_LEN);
System.arraycopy(ByteUtil.toHexString(frame.address), 0, sn, 1, SN_LEN);
VideoChannel channel = videoServer.createChannel(sn);
videoChannelMap.put(frame.address, channel);
// 归还单帧缓冲区
frameBufferPool.add(frame);
devMap.put(frame.address, new UDPDev(new byte[200][], channel, bufferPool, frame.address));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册