提交 83ce0e3a 编写于 作者: 云逸之's avatar 云逸之 💬

初始化提交

上级
http.port=8003
http.clients.limit=10
stream.port=8004
\ No newline at end of file
package org.btik.server;
/**
*视频服务
*/
public interface VideoServer {
/**
* 字节数组发送给不同客户端
*
* @param frame 一帧jpeg
*/
void sendFrame(byte[] frame);
}
package org.btik.server.util;
/**
* @author lustre
* @version 1.0
* @since 2021/5/15 12:37
* 字节工具
*/
public class ByteUtil {
private static final char[] DIGEST = {
'0', '1', '2', '3',
'4', '5', '6', '7',
'8', '9', 'A', 'B',
'C', 'D', 'E', 'F'
};
private static final byte[] LOW_DIGEST = {
'0', '1', '2', '3',
'4', '5', '6', '7',
'8', '9', 'a', 'b',
'c', 'd', 'e', 'f'
};
private static final byte[] DigitTens = {
'0', '0', '0', '0', '0', '0', '0', '0', '0', '0',
'1', '1', '1', '1', '1', '1', '1', '1', '1', '1',
'2', '2', '2', '2', '2', '2', '2', '2', '2', '2',
'3', '3', '3', '3', '3', '3', '3', '3', '3', '3',
'4', '4', '4', '4', '4', '4', '4', '4', '4', '4',
'5', '5', '5', '5', '5', '5', '5', '5', '5', '5',
'6', '6', '6', '6', '6', '6', '6', '6', '6', '6',
'7', '7', '7', '7', '7', '7', '7', '7', '7', '7',
'8', '8', '8', '8', '8', '8', '8', '8', '8', '8',
'9', '9', '9', '9', '9', '9', '9', '9', '9', '9',
};
private static final byte[] DigitOnes = {
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
};
public static String toHexString(byte[] toByteArray) {
if (null == toByteArray || toByteArray.length == 0) {
return null;
}
char[] result = new char[toByteArray.length << 1];
for (int i = 0; i < toByteArray.length; i++) {
byte b = toByteArray[i];
result[2 * i] = DIGEST[(b & 0xf0) >> 4];
result[2 * i + 1] = DIGEST[b & 0xf];
}
return new String(result);
}
public static byte[] toHexString(int num) {
int mag = Integer.SIZE - Integer.numberOfLeadingZeros(num);
int len = Math.max(((mag + (4 - 1)) / 4), 1);
byte[] result = new byte[len];
int charPos = len;
int mask = 0xf;
do {
result[--charPos] = LOW_DIGEST[num & mask];
num >>>= 4;
} while (charPos > 0);
return result;
}
public static byte[] toHexString(long val) {
int mag = Long.SIZE - Long.numberOfLeadingZeros(val);
int len = Math.max(((mag + (4 - 1)) / 4), 1);
byte[] buf = new byte[len];
int charPos = len;
int mask = 0xf;
do {
buf[--charPos] = LOW_DIGEST[((int) val) & mask];
val >>>= 4;
} while (charPos > 0);
return buf;
}
public static int getChars(long i, int index, byte[] buf) {
long q;
int r;
int charPos = index;
boolean negative = (i < 0);
if (!negative) {
i = -i;
}
// Get 2 digits/iteration using longs until quotient fits into an int
while (i <= Integer.MIN_VALUE) {
q = i / 100;
r = (int) ((q * 100) - i);
i = q;
buf[--charPos] = DigitOnes[r];
buf[--charPos] = DigitTens[r];
}
return fillCharBuf(buf, charPos, negative, (int) i);
}
private static int fillCharBuf(byte[] buf, int charPos, boolean negative, int i) {
int q;
int r;
while (i <= -100) {
q = i / 100;
r = (q * 100) - i;
i = q;
buf[--charPos] = DigitOnes[r];
buf[--charPos] = DigitTens[r];
}
// We know there are at most two digits left at this point.
q = i / 10;
r = (q * 10) - i;
buf[--charPos] = (byte) ('0' + r);
// Whatever left is the remaining digit.
if (q < 0) {
buf[--charPos] = (byte) ('0' - q);
}
if (negative) {
buf[--charPos] = (byte) '-';
}
return charPos;
}
public static int stringSize(long x) {
int d = 1;
if (x >= 0) {
d = 0;
x = -x;
}
long p = -10;
for (int i = 1; i < 19; i++) {
if (x > p)
return i + d;
p = 10 * p;
}
return 19 + d;
}
public static byte[] toString(long i) {
int size = stringSize(i);
byte[] buf = new byte[size];
getChars(i, size, buf);
return buf;
}
static int getChars(int i, int index, byte[] buf) {
boolean negative = i < 0;
if (!negative) {
i = -i;
}
return fillCharBuf(buf, index, negative, i);
}
private static int fillCharBuf(int i, byte[] buf, int charPos, boolean negative) {
return fillCharBuf(buf, charPos, negative, i);
}
static int stringSize(int x) {
int d = 1;
if (x >= 0) {
d = 0;
x = -x;
}
int p = -10;
for (int i = 1; i < 10; i++) {
if (x > p)
return i + d;
p = 10 * p;
}
return 10 + d;
}
public static byte[] toString(int i) {
int size = stringSize(i);
byte[] buf = new byte[size];
getChars(i, size, buf);
return buf;
}
public static int ipToInt(byte[] ip) {
if (ip.length != 4) {
return 0;
}
int result = 0;
for (int i = 0; i < 4; i++) {
result |= (ip[i] & 0xff) << ((3 - i) * 8);
}
return result;
}
}
package org.btik.server.video;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 异步执行器
*/
public class AsyncTaskExecutor extends Thread {
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
/**
* 阻塞消除器
*/
Runnable blockCanceler = () -> {
};
private volatile boolean runFlag = true;
@Override
public void run() {
while (runFlag) {
try {
Runnable take = queue.take();
try {
take.run();
} catch (Exception e) {
// 任务失败不能影响核心线程
e.printStackTrace();
}
} catch (InterruptedException e) {
shutdown(e.getMessage());
}
}
}
public void execute(Runnable runnable) {
queue.add(runnable);
}
public void shutdown(String msg) {
System.out.println("shutdown with:" + msg);
runFlag = false;
queue.add(blockCanceler);
}
}
package org.btik.server.video;
import org.btik.server.VideoServer;
import org.btik.server.util.ByteUtil;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/***
* 以http mjpeg 合成视频流对VideoServer的实现
*
* */
public class BioHttpVideoServer extends Thread implements VideoServer, HttpConstant {
private boolean runFlag = true;
private static final HashSet<Socket> clients = new HashSet<>();
private final ExecutorService executorService = Executors.newFixedThreadPool(3, r -> new Thread(r, "client" + System.currentTimeMillis()));
private AsyncTaskExecutor asyncTaskExecutor;
private int httpPort;
private final byte[] clientLock = new byte[0];
public void setAsyncTaskExecutor(AsyncTaskExecutor asyncTaskExecutor) {
this.asyncTaskExecutor = asyncTaskExecutor;
}
public void setHttpPort(int httpPort) {
this.httpPort = httpPort;
}
@Override
public synchronized void start() {
super.start();
System.out.println("bio video server started");
}
@Override
public void run() {
try (ServerSocket serverSocket = new ServerSocket(httpPort)) {
while (runFlag) {
Socket client = serverSocket.accept();
InputStream inputStream = client.getInputStream();
client.setSoTimeout(300);
byte[] bytes = new byte[URI_LEN];
try {
// 判断EOF
if (inputStream.read(bytes) < 0) {
asyncTaskExecutor.execute(() -> do404(client));
return;
}
// 判断uri
if (!Arrays.equals(bytes, uri)) {
asyncTaskExecutor.execute(() -> do404(client));
return;
}
executorService.submit(() -> doStreamOpen(client));
} catch (IOException e) {
disConnect(client, e);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void doStreamOpen(Socket client) {
try {
System.out.println("open:" + client.getRemoteSocketAddress());
OutputStream outputStream = client.getOutputStream();
outputStream.write(STREAM_RESP_HEAD_BYTES);
outputStream.flush();
client.setTcpNoDelay(true);
clients.add(client);
} catch (IOException e) {
e.printStackTrace();
}
}
private void do404(Socket client) {
try {
client.getOutputStream().write(NOT_FOUND);
} catch (IOException e) {
System.err.println(e.getMessage());
} finally {
disConnect(client, new Exception("404"));
}
}
void disConnect(Socket socket, Exception e) {
asyncTaskExecutor.execute(() -> {
System.err.println(e.getMessage());
synchronized (clientLock) {
clients.remove(socket);
}
try {
System.err.println("close:" + socket.getRemoteSocketAddress());
socket.close();
} catch (IOException e0) {
System.err.println(e.getMessage());
}
});
}
private void checkState(Socket socket, Exception e) {
if (socket.isClosed()) {
disConnect(socket, e);
}
}
void sendChunk(byte[] chunk, OutputStream out) throws IOException {
int length = chunk.length;
out.write(NEW_LINE);
out.write(ByteUtil.toHexString(length));
out.write(NEW_LINE);
out.write(chunk);
}
void sendChunk(OutputStream out, byte[]... chunk) throws IOException {
int length = 0;
for (byte[] bytes : chunk) {
length += bytes.length;
}
out.write(NEW_LINE);
out.write(ByteUtil.toHexString(length));
out.write(NEW_LINE);
for (byte[] bytes : chunk) {
out.write(bytes);
}
}
public void shutDown(String msg) {
System.err.println("exit: " + msg);
runFlag = false;
}
@Override
public void sendFrame(byte[] frame) {
int length = frame.length;
if (frame[length - 1] == 0 && frame[length - 2] == 0) {
System.out.print("\rdrop frame:");
return;
}
synchronized (clientLock) {
for (Socket client : clients) {
try {
OutputStream outputStream = client.getOutputStream();
sendChunk(_STREAM_BOUNDARY, outputStream);
sendChunk(outputStream, _STREAM_PART, ByteUtil.toString(length), DOUBLE_LINE);
sendChunk(frame, outputStream);
outputStream.flush();
} catch (IOException e) {
checkState(client, e);
}
}
}
}
}
package org.btik.server.video;
import java.nio.charset.StandardCharsets;
public interface HttpConstant {
byte[] uri = "GET /video HTTP/1.1".getBytes(StandardCharsets.UTF_8);
int URI_LEN = uri.length;
byte[] NOT_FOUND = "HTTP/1.1 404 \r\nContent-Length: 3\r\n\r\n404".getBytes(StandardCharsets.UTF_8);
String PART_BOUNDARY = "123456789000000000000987654321";
String STREAM_RESP_HEAD = "HTTP/1.1 200 OK\r\n" +
"Content-Type: multipart/x-mixed-replace;boundary=" + PART_BOUNDARY + "\r\n" +
"Transfer-Encoding: chunked\r\n" +
"Access-Control-Allow-Origin: *\r\n";
byte[] STREAM_RESP_HEAD_BYTES = STREAM_RESP_HEAD.getBytes(StandardCharsets.UTF_8);
byte[] _STREAM_BOUNDARY = ("\r\n--" + PART_BOUNDARY + "\r\n").getBytes(StandardCharsets.UTF_8);
byte[] _STREAM_PART =
"Content-Type: image/jpeg\r\nContent-Length: ".getBytes(StandardCharsets.UTF_8);
byte[] NEW_LINE = new byte[]{'\r', '\n'};
byte[] DOUBLE_LINE = new byte[]{'\r', '\n', '\r', '\n'};
}
package org.btik.server.video;
import org.btik.server.video.device.BioDeviceChannel;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
/**
* 启动类
*/
public class Main {
static Properties properties;
static {
properties = new Properties();
try {
properties.load(new FileInputStream("light-video.properties"));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 读取配置信息
*
* @param key 配置key
* @param def 获取为空时的默认值
*/
private static String getProp(String key, String def) {
Object o = properties.get(key);
if (o == null) {
return def;
}
return String.valueOf(o);
}
public static void main(String[] args) {
AsyncTaskExecutor asyncTaskExecutor = new AsyncTaskExecutor();
asyncTaskExecutor.start();
BioHttpVideoServer bioHttpVideoServer = new BioHttpVideoServer();
bioHttpVideoServer.setHttpPort(Integer.parseInt(
getProp("http.port", "8003")));
bioHttpVideoServer.setAsyncTaskExecutor(asyncTaskExecutor);
bioHttpVideoServer.start();
BioDeviceChannel bioDeviceChannel = new BioDeviceChannel();
bioDeviceChannel.setAsyncTaskExecutor(asyncTaskExecutor);
bioDeviceChannel.setClientsLimit(Integer.parseInt(
getProp("http.clients.limit", "10")));
bioDeviceChannel.setVideoServer(bioHttpVideoServer);
bioDeviceChannel.setStreamPort(Integer.parseInt(
getProp("stream.port", "8004")));
bioDeviceChannel.start();
}
}
package org.btik.server.video.device;
import org.btik.server.VideoServer;
import org.btik.server.video.AsyncTaskExecutor;
import org.btik.server.video.HttpConstant;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* 发送帧设备接入通道
*/
public class BioDeviceChannel extends Thread implements HttpConstant {
private boolean runFlag = true;
private final byte[] countLock = new byte[0];
private int clientsLimit;
private int streamPort;
private AsyncTaskExecutor asyncTaskExecutor;
private Set<Socket> clients;
private Map<Socket, FrameReceiver> receiverMap;
private VideoServer videoServer;
public BioDeviceChannel() {
super("bioDevChannel");
}
@Override
public synchronized void start() {
clients = Collections.newSetFromMap(new ConcurrentHashMap<>());
receiverMap = new ConcurrentHashMap<>();
super.start();
System.out.println("bio Device Channel started");
}
public void shutDown(String msg) {
runFlag = false;
}
@Override
public void run() {
try (ServerSocket serverSocket = new ServerSocket(streamPort)) {
while (runFlag) {
Socket cam = serverSocket.accept();
synchronized (countLock) {
onNewStreamOpen(cam);
if (clients.size() >= clientsLimit) {
try {
countLock.wait();
} catch (InterruptedException e) {
System.out.println("break on wait:" + e.getMessage());
break;
}
}
}
}
} catch (IOException e) {
System.out.println(" start server failed:" + e.getMessage());
}
}
private void close(Socket socket) {
asyncTaskExecutor.execute(() -> {
System.out.println("close:" + socket);
try {
socket.close();
synchronized (countLock) {
clients.remove(socket);
FrameReceiver remove = receiverMap.remove(socket);
if (null != remove) {
remove.shutDown("connect close");
}
}
} catch (IOException e) {
System.err.println(e.getMessage());
}
});
}
private void onNewStreamOpen(Socket socket) {
try {
FrameReceiver frameReceiver = new FrameReceiver(videoServer, socket);
receiverMap.put(socket, frameReceiver);
frameReceiver.start();
clients.add(socket);
} catch (IOException e) {
System.err.println(e.getMessage());
}
}
public void setStreamPort(int streamPort) {
this.streamPort = streamPort;
}
public int getStreamPort() {
return streamPort;
}
public void setAsyncTaskExecutor(AsyncTaskExecutor asyncTaskExecutor) {
this.asyncTaskExecutor = asyncTaskExecutor;
}
public void setClientsLimit(int clientsLimit) {
this.clientsLimit = clientsLimit;
}
public void setVideoServer(VideoServer videoServer) {
this.videoServer = videoServer;
}
}
package org.btik.server.video.device;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
/**
* 视频帧缓冲区
*/
public class FrameBuffer extends ByteArrayOutputStream {
/**
* 帧分割标识
*/
private static final byte[] end = new byte[]{'j', 'p', 'e', 'g', '\n'};
private static final int END_LEN = end.length;
private static final int END_TOP_INDEX = END_LEN - 1;
private static final byte endLast = end[END_TOP_INDEX];
private int checkIndex = END_TOP_INDEX;
@Override
public void write(byte[] b, int off, int len) {
synchronized (this) {
super.write(b, off, len);
this.notify();
}
}
/**
* 此处不加锁,但必须在锁对象为this情况下调用
*/
boolean hasFrame() {
if (count < END_LEN) {
return false;
}
searchEndChar:
for (; checkIndex < count; checkIndex++) {
if (buf[checkIndex] == endLast) {
for (int i = checkIndex - 1, j = END_TOP_INDEX - 1; j > 0; i--, j--) {
if (buf[i] != end[j]) {
continue searchEndChar;
}
}
return true;
}
}
return false;
}
/**
* 此处不加锁,但必须在锁对象为this情况下调用
*/
byte[] takeFrame() {
int newLength = checkIndex - END_TOP_INDEX;
byte[] bytes = Arrays.copyOf(buf, newLength);
int nextIndex = checkIndex + 1;
count -= nextIndex;
System.arraycopy(buf, nextIndex, buf, 0, count);
checkIndex = END_TOP_INDEX;
return bytes;
}
}
package org.btik.server.video.device;
import org.btik.server.VideoServer;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.net.SocketAddress;
/**
* 帧接收器
*/
public class FrameReceiver extends Thread {
private volatile boolean runFlag = true;
private final VideoServer videoServer;
private final Socket socket;
private final InputStream in;
private final FrameSplit frameSplit = new FrameSplit();
public FrameReceiver(VideoServer videoServer, Socket socket) throws IOException {
this.videoServer = videoServer;
this.socket = socket;
this.in = socket.getInputStream();
SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();
setName("frameReceiver" + remoteSocketAddress);
frameSplit.start();
}
private final FrameBuffer frameBuffer = new FrameBuffer();
@Override
public void run() {
while (runFlag) {
try {
int available = in.available();
if (available > 0) {
byte[] buffer = new byte[available];
int read = in.read(buffer);
if (read == -1) {
shutDown("eof");
}
frameBuffer.write(buffer, 0, read);
} else {
synchronized (in) {
in.wait(10);
}
}
} catch (IOException e) {
String message = e.getMessage();
System.err.println(message);
if ("Connection reset".equals(message)) {
shutDown(message);
}
} catch (InterruptedException e) {
System.err.println("wait by break");
shutDown(e.getMessage());
}
}
}
public void shutDown(String msg) {
System.err.println("wait by break");
runFlag = false;
synchronized (frameBuffer) {
frameBuffer.notify();
}
}
@Override
public synchronized void start() {
super.start();
System.out.println("start " + socket.getRemoteSocketAddress());
}
class FrameSplit extends Thread {
public FrameSplit() {
setName("FrameSplit");
}
@Override
public void run() {
while (runFlag) {
synchronized (frameBuffer) {
if (frameBuffer.hasFrame()) {
videoServer.sendFrame(frameBuffer.takeFrame());
} else {
try {
frameBuffer.wait();
} catch (InterruptedException e) {
System.err.println("wait by break");
shutDown(e.getMessage());
}
}
}
}
}
}
}
.\jre\bin\java -classpath SimpleVideoServer org.btik.server.video.Main
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册