提交 4c235390 编写于 作者: 云逸之's avatar 云逸之 💬

支持多个频道

上级 643036c3
......@@ -81,6 +81,15 @@ void setup() {
Serial.println("connect stream channel failed");
}
streamSender.setNoDelay(true);
// 发送mac地址作为设备序列号,用于摄像头频道号
uint8_t mac[6];
WiFi.macAddress(mac);
char macStr[12] = {0};
sprintf(macStr, "%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3],
mac[4], mac[5]);
Serial.println("sprint mac ");
streamSender.print(String(macStr));
streamSender.flush();
}
void loop() {
......
package org.btik.server;
import org.btik.server.video.device.FrameBuffer;
import org.btik.server.video.device.FrameReceiver;
import org.btik.server.video.VideoChannel;
/**
*视频服务
* 视频服务
*/
public interface VideoServer {
/**
* 字节数组发送给不同客户端
*
* @param frame 一帧jpeg
* @param channelId 通道号
*/
void sendFrame(FrameBuffer buffer);
VideoChannel createChannel(byte[] channelId);
}
......@@ -3,7 +3,7 @@ package org.btik.server.util;
/**
* @author lustre
* @version 1.0
* @since 2021/5/15 12:37
* @since 2021/5/15 12:37
* 字节工具
*/
public class ByteUtil {
......@@ -206,5 +206,23 @@ public class ByteUtil {
return result;
}
/**
* ipv4 to String
*
* @param ip 不能为空,不能为0长
* @throws IndexOutOfBoundsException 当为0长时
* @throws NullPointerException 当为null时
*/
public static String ipv42Str(byte[] ip) {
StringBuilder ipStr = new StringBuilder();
int top = ip.length - 1;
for (int i = 0; i < top; i++) {
ipStr.append(0xff & ip[i])
.append('.');
}
ipStr.append(ip[top] & 0xff);
return ipStr.toString();
}
}
package org.btik.server.video;
import org.btik.server.VideoServer;
import org.btik.server.util.ByteUtil;
import org.btik.server.video.device.FrameBuffer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.*;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -18,18 +17,16 @@ import java.util.concurrent.Executors;
* 以http mjpeg 合成视频流对VideoServer的实现
*
* */
public class BioHttpVideoServer extends Thread implements VideoServer, HttpConstant {
public class BioHttpVideoServer extends Thread implements HttpConstant, VideoServer {
private boolean runFlag = true;
private static final HashSet<Socket> clients = new HashSet<>();
private final ExecutorService executorService = Executors.newFixedThreadPool(3, r -> new Thread(r, "client" + System.currentTimeMillis()));
private AsyncTaskExecutor asyncTaskExecutor;
private int httpPort;
private final byte[] clientLock = new byte[0];
private final ConcurrentHashMap<String, MJPEGVideoChannel> videoChannelMap = new ConcurrentHashMap<>();
public void setAsyncTaskExecutor(AsyncTaskExecutor asyncTaskExecutor) {
this.asyncTaskExecutor = asyncTaskExecutor;
......@@ -48,23 +45,30 @@ public class BioHttpVideoServer extends Thread implements VideoServer, HttpConst
@Override
public void run() {
try (ServerSocket serverSocket = new ServerSocket(httpPort)) {
byte[] uri = new byte[URI_LEN];
//channel 是 /{sn} 的形式 目前为 12位字符
byte[] channel = new byte[13];
while (runFlag) {
Socket client = serverSocket.accept();
InputStream inputStream = client.getInputStream();
client.setSoTimeout(300);
byte[] bytes = new byte[URI_LEN];
try {
// 判断EOF
if (inputStream.read(bytes) < 0) {
if (inputStream.read(uri) < URI_LEN) {
asyncTaskExecutor.execute(() -> do404(client));
return;
continue;
}
// 判断uri
if (!Arrays.equals(bytes, uri)) {
if (!Arrays.equals(uri, HttpConstant.uri)) {
asyncTaskExecutor.execute(() -> do404(client));
continue;
}
if (inputStream.read(channel) < channel.length) {
asyncTaskExecutor.execute(() -> do404(client));
return;
continue;
}
executorService.submit(() -> doStreamOpen(client));
String channelStr = new String(channel);
executorService.submit(() -> doStreamOpen(client, channelStr));
} catch (IOException e) {
disConnect(client, e);
}
......@@ -76,14 +80,16 @@ public class BioHttpVideoServer extends Thread implements VideoServer, HttpConst
}
private void doStreamOpen(Socket client) {
private void doStreamOpen(Socket client, String channel) {
try {
System.out.println("open:" + client.getRemoteSocketAddress());
OutputStream outputStream = client.getOutputStream();
outputStream.write(STREAM_RESP_HEAD_BYTES);
outputStream.flush();
client.setTcpNoDelay(true);
clients.add(client);
MJPEGVideoChannel videoChannel = videoChannelMap.get(channel);
if (null == videoChannel) {
// 频道不存在,主播还未开启直播间
System.err.println("channel not exists");
do404(client);
return;
}
videoChannel.joinChannel(client);
} catch (IOException e) {
e.printStackTrace();
}
......@@ -104,9 +110,6 @@ public class BioHttpVideoServer extends Thread implements VideoServer, HttpConst
void disConnect(Socket socket, Exception e) {
asyncTaskExecutor.execute(() -> {
System.err.println(e.getMessage());
synchronized (clientLock) {
clients.remove(socket);
}
try {
System.err.println("close:" + socket.getRemoteSocketAddress());
socket.close();
......@@ -117,67 +120,42 @@ public class BioHttpVideoServer extends Thread implements VideoServer, HttpConst
}
private void checkState(Socket socket, Exception e) {
if (socket.isClosed()) {
disConnect(socket, e);
}
}
void sendChunk(byte[] chunk, OutputStream out) throws IOException {
int length = chunk.length;
out.write(NEW_LINE);
out.write(ByteUtil.toHexString(length));
out.write(NEW_LINE);
out.write(chunk);
}
void sendChunk(FrameBuffer buffer, OutputStream out) throws IOException {
int length = buffer.frameLen();
out.write(NEW_LINE);
out.write(ByteUtil.toHexString(length));
out.write(NEW_LINE);
buffer.takeFrame(out);
}
void sendChunk(OutputStream out, byte[]... chunk) throws IOException {
int length = 0;
for (byte[] bytes : chunk) {
length += bytes.length;
}
out.write(NEW_LINE);
out.write(ByteUtil.toHexString(length));
out.write(NEW_LINE);
for (byte[] bytes : chunk) {
out.write(bytes);
}
}
public void shutDown(String msg) {
System.err.println("exit: " + msg);
runFlag = false;
}
@Override
public void sendFrame(FrameBuffer buffer) {
int length = buffer.frameLen();
synchronized (clientLock) {
for (Socket client : clients) {
try {
OutputStream outputStream = client.getOutputStream();
sendChunk(_STREAM_BOUNDARY, outputStream);
sendChunk(outputStream, _STREAM_PART, ByteUtil.toString(length), DOUBLE_LINE);
sendChunk(buffer, outputStream);
outputStream.flush();
} catch (IOException e) {
checkState(client, e);
}
public VideoChannel createChannel(byte[] channelId) {
channelId[0] = HTTP_PATH_SEPARATOR;
String channelIdPath = new String(channelId);
System.out.println("new channel:");
printHttpAddress(channelIdPath);
return videoChannelMap.computeIfAbsent(channelIdPath,
channelIdStr -> new MJPEGVideoChannel(channelIdStr, asyncTaskExecutor));
}
private void printHttpAddress(String channelIdPath) {
try {
String channelHttpAddress = "http://%s:" + httpPort + "/video" + channelIdPath + "\r\n";
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
while (networkInterfaces.hasMoreElements()) {
NetworkInterface networkInterface = networkInterfaces.nextElement();
Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
InetAddress obj = inetAddresses.nextElement();
if (!(obj instanceof Inet4Address)) {
continue;
}
System.out.printf(channelHttpAddress, ByteUtil.ipv42Str(obj.getAddress()));
}
}
} catch (SocketException e) {
e.printStackTrace();
}
}
}
......@@ -4,7 +4,9 @@ import java.nio.charset.StandardCharsets;
public interface HttpConstant {
byte[] uri = "GET /video HTTP/1.1".getBytes(StandardCharsets.UTF_8);
byte HTTP_PATH_SEPARATOR = '/';
byte[] uri = "GET /video".getBytes(StandardCharsets.UTF_8);
int URI_LEN = uri.length;
byte[] NOT_FOUND = "HTTP/1.1 404 \r\nContent-Length: 3\r\n\r\n404".getBytes(StandardCharsets.UTF_8);
String PART_BOUNDARY = "123456789000000000000987654321";
......
package org.btik.server.video;
import org.btik.server.util.ByteUtil;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* 视频频道
* 在线的摄像头均有一个频道
*/
public class MJPEGVideoChannel implements VideoChannel, HttpConstant {
private final AsyncTaskExecutor asyncTaskExecutor;
/**
* 暂时没有用,debug时可以分辨属于哪个设备
*/
private String channelId;
private final Set<Socket> clients = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final byte[] clientLock = new byte[0];
public MJPEGVideoChannel(String channelId, AsyncTaskExecutor asyncTaskExecutor) {
this.asyncTaskExecutor = asyncTaskExecutor;
this.channelId = channelId;
}
@Override
public void sendFrame(byte[] frame, int len) {
byte[] lenStrBytes = ByteUtil.toString(len);
byte[] lenHexStrBytes = ByteUtil.toHexString(len);
synchronized (clientLock) {
for (Socket client : clients) {
try {
OutputStream outputStream = client.getOutputStream();
sendChunk(_STREAM_BOUNDARY, outputStream);
sendChunk(outputStream, _STREAM_PART, lenStrBytes, DOUBLE_LINE);
sendChunk(frame, len, lenHexStrBytes, outputStream);
outputStream.flush();
} catch (IOException e) {
checkState(client, e);
}
}
}
}
/**
* 加入频道
*/
public void joinChannel(Socket client) throws IOException {
System.out.println("open:" + client.getRemoteSocketAddress());
OutputStream outputStream = client.getOutputStream();
outputStream.write(STREAM_RESP_HEAD_BYTES);
outputStream.flush();
client.setTcpNoDelay(true);
clients.add(client);
}
private void checkState(Socket socket, Exception e) {
if (socket.isClosed()) {
disConnect(socket, e);
}
}
void sendChunk(byte[] chunk, OutputStream out) throws IOException {
int length = chunk.length;
out.write(NEW_LINE);
out.write(ByteUtil.toHexString(length));
out.write(NEW_LINE);
out.write(chunk);
}
void sendChunk(byte[] chunkBuffer, final int len, byte[] lenHexStrBytes, OutputStream out) throws IOException {
out.write(NEW_LINE);
out.write(lenHexStrBytes);
out.write(NEW_LINE);
out.write(chunkBuffer, 0, len);
}
void sendChunk(OutputStream out, byte[]... chunk) throws IOException {
int length = 0;
for (byte[] bytes : chunk) {
length += bytes.length;
}
out.write(NEW_LINE);
out.write(ByteUtil.toHexString(length));
out.write(NEW_LINE);
for (byte[] bytes : chunk) {
out.write(bytes);
}
}
void disConnect(Socket socket, Exception e) {
asyncTaskExecutor.execute(() -> {
System.err.println(e.getMessage());
try {
System.err.println("close:" + socket.getRemoteSocketAddress());
socket.close();
} catch (IOException e0) {
System.err.println(e.getMessage());
}
});
}
}
package org.btik.server.video;
public interface VideoChannel {
/**
* 字节数组发送给不同客户端
*
* @param frame 一帧jpeg
*/
void sendFrame(byte[] frame, int len);
}
......@@ -3,9 +3,11 @@ package org.btik.server.video.device;
import org.btik.server.VideoServer;
import org.btik.server.video.AsyncTaskExecutor;
import org.btik.server.video.VideoChannel;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Collections;
......@@ -16,9 +18,9 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* 发送帧设备接入通道
*/
public class BioDeviceChannel extends Thread {
public class BioDeviceChannel extends Thread {
private static final int SN_LEN = 12;
private boolean runFlag = true;
private final byte[] countLock = new byte[0];
......@@ -97,7 +99,14 @@ public class BioDeviceChannel extends Thread {
private void onNewStreamOpen(Socket socket) {
try {
FrameReceiver frameReceiver = new FrameReceiver(videoServer, socket);
byte[] sn = new byte[SN_LEN + 1];
InputStream inputStream = socket.getInputStream();
int len = inputStream.read(sn, 1, SN_LEN);
if (len < SN_LEN) {
close(socket);
}
VideoChannel channel = videoServer.createChannel(sn);
FrameReceiver frameReceiver = new FrameReceiver(channel, socket);
receiverMap.put(socket, frameReceiver);
frameReceiver.start();
clients.add(socket);
......
package org.btik.server.video.device;
import org.btik.server.VideoServer;
import org.btik.server.video.VideoChannel;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
/**
* 视频帧缓冲区
* 视频帧缓冲区,拆出帧时后发送帧时不复制到新的buffer,因而只能同步发送帧,适合少数客户端观看视频
*/
public class FrameBuffer extends ByteArrayOutputStream {
......@@ -21,8 +22,6 @@ public class FrameBuffer extends ByteArrayOutputStream {
private static final byte endLast = end[END_TOP_INDEX];
private int checkIndex = END_TOP_INDEX;
private int frameLength;
@Override
public void write(byte[] b, int off, int len) {
synchronized (this) {
......@@ -46,7 +45,6 @@ public class FrameBuffer extends ByteArrayOutputStream {
continue searchEndChar;
}
}
frameLength = checkIndex - END_TOP_INDEX;
return true;
}
}
......@@ -54,18 +52,20 @@ public class FrameBuffer extends ByteArrayOutputStream {
}
/**
* 此处不加锁,但必须在锁对象为this情况下调用
* 滑动下一条消息到头部
*/
public void takeFrame(OutputStream outputStream) throws IOException {
outputStream.write(buf, 0, frameLength);
void slide() {
int nextIndex = checkIndex + 1;
count -= nextIndex;
System.arraycopy(buf, nextIndex, buf, 0, count);
checkIndex = END_TOP_INDEX;
}
public int frameLen(){
return frameLength;
/**
* 此处不加锁,但必须在锁对象为this情况下调用
*/
public void takeFrame(VideoChannel videoChannel) {
videoChannel.sendFrame(buf, checkIndex - END_TOP_INDEX);
}
}
......@@ -2,6 +2,7 @@ package org.btik.server.video.device;
import org.btik.server.VideoServer;
import org.btik.server.video.VideoChannel;
import java.io.IOException;
......@@ -15,7 +16,7 @@ import java.net.SocketAddress;
public class FrameReceiver extends Thread {
private volatile boolean runFlag = true;
private final VideoServer videoServer;
private final VideoChannel videoChannel;
private final Socket socket;
......@@ -32,8 +33,8 @@ public class FrameReceiver extends Thread {
private byte[] preFrameBuffer = new byte[RECEIVE_BUFFER_SIZE];
public FrameReceiver(VideoServer videoServer, Socket socket) throws IOException {
this.videoServer = videoServer;
public FrameReceiver(VideoChannel videoChannel, Socket socket) throws IOException {
this.videoChannel = videoChannel;
this.socket = socket;
this.in = socket.getInputStream();
SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();
......@@ -104,7 +105,8 @@ public class FrameReceiver extends Thread {
while (runFlag) {
synchronized (frameBuffer) {
if (frameBuffer.hasFrame()) {
videoServer.sendFrame(frameBuffer);
frameBuffer.takeFrame(videoChannel);
frameBuffer.slide();
} else {
try {
frameBuffer.wait();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册