提交 8de916ba 编写于 作者: 云逸之's avatar 云逸之 💬

将通道绑定切回TCP

上级 96b8c05f
......@@ -9,14 +9,17 @@
; https://docs.platformio.org/page/projectconf.html
[platformio]
;此处可以同时选多个,但clion需要加载默认的env生成的Cmake文件,所以选一个方便一些
default_envs = esp32cam
;default_envs = heZhouS3
;default_envs = esp32cam
default_envs = heZhouS3
[env:esp32cam]
platform = espressif32
board = esp32cam
framework = arduino
upload_speed = 115200
monitor_speed = 115200
upload_port = COM3
lib_deps =
espressif/esp32-camera @ ^2.0.0
[env:heZhouS3]
platform = espressif32
......@@ -24,11 +27,13 @@ board = esp32-s3-devkitc-1
framework = arduino
upload_speed = 921600
monitor_speed = 115200
upload_port = COM7
; 重载摄像头配置 指定合宙S3引脚
build_flags = -DOVER_LOAD_CAM_CONF
-DHE_ZHOU_S3
-DCORE_DEBUG_LEVEL=ARDUHAL_LOG_LEVEL_VERBOSE
; 指定帧大小
;-DFRAMESIZE=FRAMESIZE_HVGA
-DFRAMESIZE=FRAMESIZE_VGA
; -DFRAMESIZE=FRAMESIZE_SVGA
; -DFRAMESIZE=FRAMESIZE_VGA
-DFRAMESIZE=FRAMESIZE_SVGA
;-DFRAMESIZE=FRAMESIZE_HD
\ No newline at end of file
......@@ -5,7 +5,7 @@
#include <lwip/sockets.h>
LightUDP::LightUDP()
: udp_server(-1), server_port(0), remote_port(0), remote_ip_int(0) {}
: udp_server(-1), server_port(0), remote_port(0), remote_ip_int(0) {}
void LightUDP::begin(IPAddress address, uint16_t port) {
server_port = port;
......@@ -24,30 +24,30 @@ void LightUDP::begin(IPAddress address, uint16_t port) {
}
struct sockaddr_in addr;
memset((char*)&addr, 0, sizeof(addr));
memset((char *) &addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(server_port);
addr.sin_addr.s_addr = (in_addr_t)address;
if (bind(udp_server, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
addr.sin_addr.s_addr = (in_addr_t) address;
if (bind(udp_server, (struct sockaddr *) &addr, sizeof(addr)) == -1) {
Serial.printf("could not bind socket: %d\n", errno);
return;
}
fcntl(udp_server, F_SETFL, O_NONBLOCK);
}
void LightUDP::setServer(const char* host, uint16_t port) {
struct hostent* server;
void LightUDP::setServer(const char *host, uint16_t port) {
struct hostent *server;
server = gethostbyname(host);
if (server == NULL) {
Serial.printf("could not get host from dns: %d\n", errno);
return;
}
this->remote_ip = IPAddress((const uint8_t*)(server->h_addr_list[0]));
this->remote_ip_int = (uint32_t)remote_ip;
this->remote_ip = IPAddress((const uint8_t *) (server->h_addr_list[0]));
this->remote_ip_int = (uint32_t) remote_ip;
this->remote_port = port;
}
void LightUDP::send(uint8_t* buf, size_t len) {
void LightUDP::send(uint8_t *buf, size_t len) {
if (udp_server == -1) {
if ((udp_server = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
Serial.printf("could not create socket: %d\n", errno);
......@@ -60,32 +60,34 @@ void LightUDP::send(uint8_t* buf, size_t len) {
recipient.sin_addr.s_addr = remote_ip_int;
recipient.sin_family = AF_INET;
recipient.sin_port = htons(remote_port);
int sent = sendto(udp_server, buf, len, 0, (struct sockaddr*)&recipient,
int sent = sendto(udp_server, buf, len, 0, (struct sockaddr *) &recipient,
sizeof(recipient));
if (sent < 0) {
Serial.printf("could not send data: %d\n", errno);
}
}
void LightUDP::recv() {
int LightUDP::recv(uint8_t *buf, size_t size) {
struct sockaddr_in si_other{};
int slen = sizeof(si_other) , len;
char * buf = new char[1460];
if(!buf){
}
if ((len = recvfrom(udp_server, buf, 1460, MSG_DONTWAIT, (struct sockaddr *) &si_other, (socklen_t *)&slen)) == -1){
delete[] buf;
if(errno == EWOULDBLOCK){
int slen = sizeof(si_other), len;
if ((len = recvfrom(udp_server, buf, size, MSG_DONTWAIT, (struct sockaddr *) &si_other, (socklen_t *) &slen)) ==
-1) {
if (errno == EWOULDBLOCK) {
return 0;
}
log_e("could not receive data: %d", errno);
}
remote_ip = IPAddress(si_other.sin_addr.s_addr);
remote_port = ntohs(si_other.sin_port);
if (len > 0) {
}
delete[] buf;
this->rec_remote_ip = IPAddress(si_other.sin_addr.s_addr);
this->rec_remote_port = ntohs(si_other.sin_port);
return len;
}
const IPAddress &LightUDP::getRecRemoteIp() const {
return rec_remote_ip;
}
uint16_t LightUDP::getRecRemotePort() const {
return rec_remote_port;
}
......@@ -9,12 +9,20 @@ class LightUDP {
uint32_t remote_ip_int;
uint16_t server_port;
uint16_t remote_port;
/**
* 接收 的最后一条消息的地址
* */
IPAddress rec_remote_ip;
uint16_t rec_remote_port;
public:
LightUDP();
void begin(IPAddress address, uint16_t port);
void setServer(const char* host, uint16_t port);
void send(uint8_t* buf, size_t len);
void recv();
int recv(uint8_t* buf, size_t size);
const IPAddress &getRecRemoteIp() const;
uint16_t getRecRemotePort() const;
};
\ No newline at end of file
......@@ -15,9 +15,19 @@ const char *ssid = "test0";
const char *passwd = "12345687";
const char *host = "192.168.137.1";
const uint16_t serverUdpPort = 8004;
/*
获取通道索引的端口(tcp)
当然也可以使用发送图片的UDP客户端获取,但必须保证获取到之后才能发送图片帧,需要反复判断重传和双向ack
故在启动时通过TCP获取通道索引
*/
const uint16_t channelBindPort = 8004;
const uint16_t localUdpPort = 2333;
LightUDP streamSender;
// 绑定sn后续为mac 作为sn 代表后续内容是SN 后一位是 6表示SN长度 空位的0 将填充mac
int channel_index = -1;
void connectWifi(const char *ssid, const char *passphrase) {
WiFi.mode(WIFI_STA);
......@@ -80,31 +90,60 @@ void setup() {
// camera init
esp_err_t err = esp_camera_init(&config);
if (err != ESP_OK) {
Serial.printf("Camera init failed with error 0x%x", err);
log_i("Camera init failed with error 0x%x", err);
return;
}
Serial.println("get sensor ");
log_i("get sensor ");
sensor_t *s = esp_camera_sensor_get();
// drop down frame size for higher initial frame rate
s->set_framesize(s, FRAMESIZE);
connectWifi(ssid, passwd);
WiFiClient client;
while (!client.connect(host, channelBindPort)) {
log_i("connection failed,wait 3 sec...");
delay(3000);
}
uint8_t bind_sn[] = {'S', 'N', ':', 6, 0, 0, 0, 0, 0, 0};
// 将mac设置到 bind_sn 第3位
WiFi.macAddress(&bind_sn[3]);
int index;
do {
client.write(bind_sn, 10);
client.flush();
index = client.read();
} while (index == -1);
if (index <= 255) {
channel_index = index;
log_i("channel_index is : %d ", channel_index);
}
streamSender.begin(WiFi.localIP(), localUdpPort);
streamSender.setServer(host, serverUdpPort);
}
void loop() {
// [[noreturn]] 不会执行return这里使用 goto回到行首
return_:
Serial.println("do loop");
if (channel_index == -1) {
log_i("delay 3 sec");
delay(3000);
goto return_;
}
camera_fb_t *fb = NULL;
size_t len = 0;
Serial.println("do loop");
log_i("send image");
while (true) {
fb = esp_camera_fb_get();
if (!fb) {
Serial.println("Camera capture failed");
return;
goto return_;
}
len = fb->len;
fb->buf[len] = channel_index;
streamSender.send(fb->buf, len);
esp_camera_fb_return(fb);
}
......
......@@ -3,6 +3,7 @@ package org.btik.server.video;
import org.btik.server.video.device.task.AsyncTaskExecutor;
import org.btik.server.video.device.udp2.BufferPool;
import org.btik.server.video.device.udp2.NewUDPDeviceChannel;
import org.btik.server.video.device.udp2.bind.UDPDeviceSnChannelBinder;
import org.btik.server.video.device.web.BioHttpVideoServer;
import java.io.FileInputStream;
......@@ -44,7 +45,9 @@ public class UDP2Main {
BioHttpVideoServer bioHttpVideoServer = new BioHttpVideoServer();
bioHttpVideoServer.setHttpPort(getIntProp("http.port", 8003));
bioHttpVideoServer.setAsyncTaskExecutor(asyncTaskExecutor);
int videoChannelCount = getIntProp("udp.video.channel.size", 128);
UDPDeviceSnChannelBinder udpDeviceSnChannelBinder = new UDPDeviceSnChannelBinder(videoChannelCount,
getIntProp("stream.bind.port", 8005) );
BufferPool bufferPool = new BufferPool();
bufferPool.setBufferPoolSize(getIntProp("udp.video.buffer.pool.size", 500));
NewUDPDeviceChannel deviceChannel = new NewUDPDeviceChannel();
......@@ -53,8 +56,11 @@ public class UDP2Main {
deviceChannel.setBufferPool(bufferPool);
deviceChannel.setStreamPort(getIntProp("stream.port", 8004));
deviceChannel.setDispatcherPoolSize(getIntProp("udp.video.dispatcher.thread.size", 8));
deviceChannel.setVideoChannelCount(videoChannelCount);
udpDeviceSnChannelBinder.start();
deviceChannel.start();
udpDeviceSnChannelBinder.setNewUDPDeviceChannel(deviceChannel);
bioHttpVideoServer.setDevChannel(deviceChannel);
bioHttpVideoServer.start();
}
......
package org.btik.server.video.device.udp2;
import org.btik.server.VideoServer;
import org.btik.server.util.ByteUtil;
import org.btik.server.util.NamePrefixThreadFactory;
import org.btik.server.video.device.iface.DevChannel;
import org.btik.server.video.device.iface.VideoChannel;
......@@ -98,7 +97,7 @@ public class NewUDPDeviceChannel extends Thread implements DevChannel {
serverSocket.receive(datagramPacket);
// 最后一位是通道索引,故长度 -1 才是照片数据
frameBuffer.size = datagramPacket.getLength() - 1;
frameBuffer.channelIndex = frameBuffer.data[frameBuffer.size];
frameBuffer.channelIndex = frameBuffer.data[frameBuffer.size] & 0xff;
frameDispatchers[frameBuffer.channelIndex & dispatcherPoolSize - 1 ].messages.add(frameBuffer);
// 切换缓冲区
frameBuffer = bufferPool.getFrameBuffer();
......
......@@ -9,8 +9,8 @@ import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* 绑定sn和通道索引
......@@ -33,7 +33,7 @@ public class UDPDeviceSnChannelBinder extends Thread {
/**
* 可用的通道索引
*/
private final LinkedList<Integer> channelIndexQueue = new LinkedList<>();
private final ConcurrentLinkedQueue<Integer> channelIndexQueue = new ConcurrentLinkedQueue<>();
private final HashMap<String, Integer> snChannelMap = new HashMap<>(64);
......@@ -44,11 +44,15 @@ public class UDPDeviceSnChannelBinder extends Thread {
public UDPDeviceSnChannelBinder(int size, int port) {
super("UDPDeviceSnChannelBinder");
if (size < 0 || size > 0xff) {
throw new IllegalArgumentException("The count of channels can only be between 0~255 ");
}
for (int i = 0; i < size; i++) {
channelIndexQueue.add(i);
}
this.port = port;
indexSender = new IndexSender();
System.out.println("sn channel binder started");
}
......@@ -94,7 +98,7 @@ public class UDPDeviceSnChannelBinder extends Thread {
System.err.println("These channels have been exhausted. ");
break bindLoop;
}
channelIndex = channelIndexQueue.removeFirst();
channelIndex = channelIndexQueue.remove();
snChannelMap.put(snStr, channelIndex);
}
......@@ -106,7 +110,7 @@ public class UDPDeviceSnChannelBinder extends Thread {
byte[] sn = new byte[snLen];
System.arraycopy(buffer, headLen + 2, sn, 0, snLen);
String snStr = ByteUtil.toHexString(sn);
indexSender.onAck(snStr,sn);
indexSender.onAck(snStr, sn);
default:
}
......@@ -132,11 +136,7 @@ public class UDPDeviceSnChannelBinder extends Thread {
public NotAckDev(int channelIndex, InetAddress socketAddress, int port) {
this.channelIndex = channelIndex;
byte[] msg = new byte[]{'V', 'C', 'I',
(byte) (channelIndex >>> 24),
(byte) (channelIndex >>> 16),
(byte) (channelIndex >>> 8),
(byte) (channelIndex)};
byte[] msg = new byte[]{'V', 'C', 'I', (byte) (channelIndex)};
this.datagramPacket = new DatagramPacket(msg, msg.length);
datagramPacket.setAddress(socketAddress);
datagramPacket.setPort(port);
......@@ -156,6 +156,7 @@ public class UDPDeviceSnChannelBinder extends Thread {
try {
if (value.sendCount > 10) {
notAckMap.remove(key);
channelIndexQueue.add(value.channelIndex);
System.out.println("retry max count " + value.datagramPacket.getAddress());
return;
}
......@@ -166,15 +167,15 @@ public class UDPDeviceSnChannelBinder extends Thread {
notAckMap.remove(key);
}
});
synchronized (lock) {
try {
lock.wait(3000);
} catch (InterruptedException ignored) {
}
synchronized (lock) {
try {
lock.wait(3000);
} catch (InterruptedException ignored) {
}
}
}
}
private void sendIndex(int channelIndex, String snStr, InetAddress socketAddress, int port) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册