提交 96b8c05f 编写于 作者: 云逸之's avatar 云逸之 💬

绑定sn部分提交

上级 31b1abd1
......@@ -65,4 +65,27 @@ void LightUDP::send(uint8_t* buf, size_t len) {
if (sent < 0) {
Serial.printf("could not send data: %d\n", errno);
}
}
\ No newline at end of file
}
void LightUDP::recv() {
struct sockaddr_in si_other{};
int slen = sizeof(si_other) , len;
char * buf = new char[1460];
if(!buf){
}
if ((len = recvfrom(udp_server, buf, 1460, MSG_DONTWAIT, (struct sockaddr *) &si_other, (socklen_t *)&slen)) == -1){
delete[] buf;
if(errno == EWOULDBLOCK){
}
log_e("could not receive data: %d", errno);
}
remote_ip = IPAddress(si_other.sin_addr.s_addr);
remote_port = ntohs(si_other.sin_port);
if (len > 0) {
}
delete[] buf;
}
......@@ -15,4 +15,6 @@ class LightUDP {
void begin(IPAddress address, uint16_t port);
void setServer(const char* host, uint16_t port);
void send(uint8_t* buf, size_t len);
void recv();
};
\ No newline at end of file
......@@ -2,5 +2,7 @@ http.port=8003
#Not available when using UDP channel
http.clients.limit=10
stream.port=8004
stream.bind.port=8005
udp.video.buffer.pool.size=500
udp.video.dispatcher.thread.size=8
\ No newline at end of file
udp.video.dispatcher.thread.size=8
udp.video.channel.size=128
\ No newline at end of file
......@@ -236,5 +236,28 @@ public class ByteUtil {
return ipStr.toString();
}
/**
* 字节数组比较
*
* @param bytes1 字节数组1
* @param offset1 字节数组1的偏移量
* @param bytes2 字节数组2
* @param offset2 字节数组2的偏移量
* @param len 比较的长度,当然不含偏移量
* @return 片段是否相等
*/
public static boolean equals(byte[] bytes1, int offset1, byte[] bytes2, int offset2, int len) {
if (bytes1.length - offset1 < len || bytes2.length - offset2 < len) {
return false;
}
for (int i = 0; i < len; i++) {
if (bytes1[offset1 + i] != bytes2[offset2 + i]) {
return false;
}
}
return true;
}
}
......@@ -10,6 +10,7 @@ import java.io.IOException;
import java.util.Properties;
public class UDP2Main {
private static final String version = "0.0.3";
static Properties properties;
static {
......@@ -36,6 +37,7 @@ public class UDP2Main {
}
public static void main(String[] args) {
System.out.println("version:" + version);
AsyncTaskExecutor asyncTaskExecutor = new AsyncTaskExecutor();
asyncTaskExecutor.start();
......
......@@ -5,4 +5,6 @@ package org.btik.server.video.device.iface;
*/
public interface DevChannel {
int channelIdLen();
}
......@@ -2,7 +2,7 @@ package org.btik.server.video.device.udp2;
public class FrameBuffer {
// 2 + 4 + 2字节 2 字节的0 4字节ip 2字节端口
long address;
int channelIndex = -1;
byte[] data;
......
......@@ -10,13 +10,11 @@ import org.btik.server.video.device.iface.VideoChannel;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.concurrent.*;
public class NewUDPDeviceChannel extends Thread implements DevChannel {
private static final int SN_LEN = 16;
private static final int SN_LEN = 12;
private volatile boolean runFlag = true;
/**
......@@ -38,6 +36,8 @@ public class NewUDPDeviceChannel extends Thread implements DevChannel {
*/
private int dispatcherPoolSize = 8;
private int videoChannelCount = 128;
public void setStreamPort(int streamPort) {
this.streamPort = streamPort;
}
......@@ -50,6 +50,10 @@ public class NewUDPDeviceChannel extends Thread implements DevChannel {
this.bufferPool = bufferPool;
}
public void setVideoChannelCount(int videoChannelCount) {
this.videoChannelCount = videoChannelCount;
}
/**
* 可选的输入值 1 2 4 8 16 32 64 128 256几个数字,根据cpu核数和设备的数量选择合适的值
* ,输入其它值也会被映射到以上值,如果只有一个摄像头设备那就一个足够,线程数太多而cpu核数过少,
......@@ -62,7 +66,7 @@ public class NewUDPDeviceChannel extends Thread implements DevChannel {
}
private final HashMap<Long, VideoChannel> videoChannelMap = new HashMap<>();
private VideoChannel[] videoChannelTable;
@Override
......@@ -70,6 +74,7 @@ public class NewUDPDeviceChannel extends Thread implements DevChannel {
System.out.println("init buffer pool");
System.out.println("start dispatchers");
videoChannelTable = new VideoChannel[videoChannelCount];
frameDispatchers = new FrameDispatcher[dispatcherPoolSize];
executorService = new ThreadPoolExecutor(dispatcherPoolSize, dispatcherPoolSize,
0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamePrefixThreadFactory("frameDispatcher"));
......@@ -91,10 +96,10 @@ public class NewUDPDeviceChannel extends Thread implements DevChannel {
DatagramPacket datagramPacket = new DatagramPacket(frameBuffer.data, 0, frameBuffer.data.length);
while (runFlag) {
serverSocket.receive(datagramPacket);
InetAddress address = datagramPacket.getAddress();
frameBuffer.address = (long) address.hashCode() << 16 | datagramPacket.getPort();
frameBuffer.size = datagramPacket.getLength();
frameDispatchers[(int) (frameBuffer.address & dispatcherPoolSize - 1)].messages.add(frameBuffer);
// 最后一位是通道索引,故长度 -1 才是照片数据
frameBuffer.size = datagramPacket.getLength() - 1;
frameBuffer.channelIndex = frameBuffer.data[frameBuffer.size];
frameDispatchers[frameBuffer.channelIndex & dispatcherPoolSize - 1 ].messages.add(frameBuffer);
// 切换缓冲区
frameBuffer = bufferPool.getFrameBuffer();
datagramPacket.setData(frameBuffer.data);
......@@ -120,6 +125,7 @@ public class NewUDPDeviceChannel extends Thread implements DevChannel {
return SN_LEN;
}
class FrameDispatcher implements Runnable {
LinkedBlockingQueue<FrameBuffer> messages = new LinkedBlockingQueue<>();
......@@ -130,13 +136,15 @@ public class NewUDPDeviceChannel extends Thread implements DevChannel {
while (runFlag) {
FrameBuffer segment = messages.take();
try {
long address = segment.address;
VideoChannel videoChannel = videoChannelMap.get(address);
if (videoChannel != null) {
videoChannel.sendFrame(segment.data, segment.size);
} else {
onNewStreamOpen(segment);
int channelIndex = segment.channelIndex;
if (channelIndex < 0 || channelIndex > videoChannelCount) {
continue;
}
VideoChannel videoChannel = videoChannelTable[channelIndex];
if (videoChannel == null) {
continue;
}
videoChannel.sendFrame(segment.data, segment.size - SN_LEN / 2);
} catch (Exception e) {
if (runFlag) {
e.printStackTrace();
......@@ -156,14 +164,14 @@ public class NewUDPDeviceChannel extends Thread implements DevChannel {
System.out.println("exit : " + getName());
}
}
private void onNewStreamOpen(FrameBuffer frame) {
byte[] sn = new byte[SN_LEN + 1];
System.arraycopy(ByteUtil.toFullHexString(frame.address), 0, sn, 1, SN_LEN);
VideoChannel channel = videoServer.createChannel(sn);
videoChannelMap.put(frame.address, channel);
}
public void onNewChannelOpen(int channelIndex, byte[] sn) {
byte[] snAddr = new byte[sn.length + 1];
System.arraycopy(sn, 0, snAddr, 1, SN_LEN);
VideoChannel channel = videoServer.createChannel(snAddr);
videoChannelTable[channelIndex] = channel;
}
}
package org.btik.server.video.device.udp2.bind;
import org.btik.server.util.ByteUtil;
import org.btik.server.video.device.udp2.NewUDPDeviceChannel;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
/**
* 绑定sn和通道索引
*
* @author lustre
* @since 2023/2/7 21:51
*/
public class UDPDeviceSnChannelBinder extends Thread {
private final int port;
private volatile boolean runFlag = true;
/**
* videoChannelSnBinder
*/
private static final byte[] HEAD = {'V', 'C', 'S', 'N', 'B'};
private final IndexSender indexSender;
/**
* 可用的通道索引
*/
private final LinkedList<Integer> channelIndexQueue = new LinkedList<>();
private final HashMap<String, Integer> snChannelMap = new HashMap<>(64);
private DatagramSocket serverSocket;
private NewUDPDeviceChannel newUDPDeviceChannel;
public UDPDeviceSnChannelBinder(int size, int port) {
super("UDPDeviceSnChannelBinder");
for (int i = 0; i < size; i++) {
channelIndexQueue.add(i);
}
this.port = port;
indexSender = new IndexSender();
}
public void setNewUDPDeviceChannel(NewUDPDeviceChannel newUDPDeviceChannel) {
this.newUDPDeviceChannel = newUDPDeviceChannel;
}
@Override
public synchronized void start() {
super.start();
indexSender.start();
}
@Override
public void run() {
try (DatagramSocket serverSocket = new DatagramSocket(port)) {
this.serverSocket = serverSocket;
byte[] buffer = new byte[1400];
DatagramPacket datagramPacket = new DatagramPacket(buffer, 0, buffer.length);
bindLoop:
while (runFlag) {
serverSocket.receive(datagramPacket);
int length = datagramPacket.getLength();
// SN
int headLen = HEAD.length;
if (length < headLen + 1) {
continue;
}
if (!ByteUtil.equals(buffer, 0, HEAD, 0, headLen)) {
continue;
}
switch (buffer[headLen]) {
// sn
case 'S': {
int snLen = buffer[headLen + 1];
byte[] sn = new byte[snLen];
System.arraycopy(buffer, headLen + 2, sn, 0, snLen);
String snStr = ByteUtil.toHexString(sn);
Integer channelIndex = snChannelMap.get(snStr);
if (channelIndex == null) {
if (channelIndexQueue.isEmpty()) {
// 通道用尽,无法继续接入摄像头
System.err.println("These channels have been exhausted. ");
break bindLoop;
}
channelIndex = channelIndexQueue.removeFirst();
snChannelMap.put(snStr, channelIndex);
}
indexSender.sendIndex(channelIndex, snStr, datagramPacket.getAddress(), datagramPacket.getPort());
break;
}
case 'A':
int snLen = buffer[headLen + 1];
byte[] sn = new byte[snLen];
System.arraycopy(buffer, headLen + 2, sn, 0, snLen);
String snStr = ByteUtil.toHexString(sn);
indexSender.onAck(snStr,sn);
default:
}
}
} catch (IOException e) {
System.out.println(" start server failed:" + e.getMessage());
}
runFlag = false;
System.out.println("UDPDeviceSnChannelBinder exited!");
}
public void shutDown() {
runFlag = false;
}
static class NotAckDev {
int sendCount = 0;
DatagramPacket datagramPacket;
int channelIndex;
public NotAckDev(int channelIndex, InetAddress socketAddress, int port) {
this.channelIndex = channelIndex;
byte[] msg = new byte[]{'V', 'C', 'I',
(byte) (channelIndex >>> 24),
(byte) (channelIndex >>> 16),
(byte) (channelIndex >>> 8),
(byte) (channelIndex)};
this.datagramPacket = new DatagramPacket(msg, msg.length);
datagramPacket.setAddress(socketAddress);
datagramPacket.setPort(port);
}
}
class IndexSender extends Thread {
ConcurrentHashMap<String, NotAckDev> notAckMap = new ConcurrentHashMap<>();
final byte[] lock = new byte[0];
@Override
public void run() {
while (runFlag) {
notAckMap.forEach((key, value) -> {
try {
if (value.sendCount > 10) {
notAckMap.remove(key);
System.out.println("retry max count " + value.datagramPacket.getAddress());
return;
}
serverSocket.send(value.datagramPacket);
value.sendCount++;
} catch (IOException e) {
System.out.println("may not reachable :" + e.getMessage());
notAckMap.remove(key);
}
});
}
synchronized (lock) {
try {
lock.wait(3000);
} catch (InterruptedException ignored) {
}
}
}
private void sendIndex(int channelIndex, String snStr, InetAddress socketAddress, int port) {
notAckMap.put(snStr, new NotAckDev(channelIndex, socketAddress, port));
}
public void onAck(String snStr, byte[] sn) {
NotAckDev remove = notAckMap.remove(snStr);
newUDPDeviceChannel.onNewChannelOpen(remove.channelIndex, sn);
}
}
}
echo ""
./jre/bin/java -classpath SimpleVideoServer org.btik.server.video.UDP2Main
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册