提交 9157116e 编写于 作者: E eguid 提交者: wangliang

1、原API保持不变,修改部分内部逻辑

2、增加两个参数,codec:解码(默认解码器是H264),twoPart:0-2,0=推送元码流,1=推送自定义流,2=推送两个流(一个自定义流,一个元码流)
3、不支持音频解码,默认去除音频
4、输出线程增加动态显示消息功能
5、增加一个实体类并依赖json包
上级 572a0d7b
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.source=1.8
使用说明:
livePush0.2.1使用说明:
(重要:使用前必须保证ffmpeg环境包在该项目的WEB-INF\classes\cc\eguid\livepush\ffmpeg\目录中)
1、对象创建
PushManager pusher = new PushManagerImpl();
......@@ -8,19 +8,21 @@ PushManager pusher = new PushManagerImpl();
name:应用名;
input:接收地址;
output:推送地址;
codec:视频编码;
fmt:视频格式;
fps:视频帧率;
rs:视频分辨率;
disableAudio:是否开启音频;
twoPart:0-推一个元码流;1-推一个自定义推流;2-推两个流(一个是自定义,一个是元码)
2.2、参数使用
Map map = new HashMap();
map.put("appName", "testwanglaing123");
map.put("input","rtsp://admin:admin@192.168.2.236:37779/cam/realmonitor?channel=1&subtype=0");
map.put("output", "rtmp://192.168.30.21/live/");
map.put("codec","h264");
map.put("fmt", "flv");
map.put("fps", "25");
map.put("rs", "640x360");
map.put("disableAudio", "true");
map.put("twoPart","2");
3、调用方法
3.1、发布方法
......
......@@ -25,7 +25,7 @@ public interface PushManager
* @param map
* @return pushId(当前发布流的标识,方便操作该push)
*/
public String push(Map<String,String>map);
public String push(Map<String,Object>map);
/**
* 通过应用名删除某个push
* @param pushId
......
......@@ -14,7 +14,6 @@ import cc.eguid.livepush.handler.OutHandler;
import cc.eguid.livepush.handler.PushHandler;
import cc.eguid.livepush.handler.PushHandlerImpl;
/**
* 实现push管理器的push,delete,view服务
*
......@@ -24,106 +23,91 @@ import cc.eguid.livepush.handler.PushHandlerImpl;
* @since jdk1.7
*/
public class PushManagerImpl implements PushManager
{
/**
*配置文件
*/
private ConfUtil confUtil = new ConfUtil();
public PushManagerImpl()
{
confUtil.isHave();
}
/**
* 引用push处理器
*/
private PushHandler pusher = new PushHandlerImpl();
/**
* 管理处理器的主进程Process及两个输出线程的关系
*/
private HandlerDao hd = new HandlerDaoImpl();
public void setPusher(PushHandler pusher)
{
this.pusher = pusher;
}
public void setHd(HandlerDao hd)
{
this.hd = hd;
}
@Override
public String push(Map<String, String> parammap)
{
// ffmpeg环境是否配置正确
if (!confUtil.isHave())
{
return null;
}
// 参数是否符合要求
if (parammap == null || parammap.isEmpty() || !parammap.containsKey("appName"))
{
return null;
}
String appName = null;
ConcurrentMap<String, Object> resultMap = null;
try
{
appName = parammap.get("appName");
if (appName != null && "".equals(appName.trim()))
{
return null;
}
parammap.put("ffmpegPath", confUtil.getPath());
resultMap = pusher.push(parammap);
// 处理器和输出线程对应关系
hd.set(appName, resultMap);
}
catch (Exception e)
{
// 暂时先写这样,后期加日志
System.err.println("重大错误:参数不符合要求" + e.getMessage());
}
return appName;
}
@Override
public boolean closePush(String appName)
{
if (hd.isHave(appName))
{
ConcurrentMap<String, Object> map = hd.get(appName);
// 关闭两个线程
((OutHandler)map.get("error")).destroy();
// ((OutHandler)map.get("info")).destroy();
// 暂时先这样写,后期加日志
System.out.println("停止命令-----end commond");
// 关闭命令主进程
((Process)map.get("process")).destroy();
// 删除处理器与线程对应关系表
hd.delete(appName);
return true;
}
return false;
}
@Override
public Set<String> viewAppName()
{
return hd.getAllAppName();
}
@Override
public boolean isHave(String appName)
{
hd.isHave(appName);
return false;
}
public class PushManagerImpl implements PushManager {
/**
* 配置文件
*/
private ConfUtil confUtil = new ConfUtil();
public PushManagerImpl() {
confUtil.isHave();
}
/**
* 引用push处理器
*/
private PushHandler pusher = new PushHandlerImpl();
/**
* 管理处理器的主进程Process及两个输出线程的关系
*/
private HandlerDao hd = new HandlerDaoImpl();
public synchronized void setPusher(PushHandler pusher) {
this.pusher = pusher;
}
public synchronized void setHd(HandlerDao hd) {
this.hd = hd;
}
@Override
public synchronized String push(Map<String, Object> parammap) {
String appName = null;
ConcurrentMap<String, Object> resultMap = null;
try {
// ffmpeg环境是否配置正确
if (!confUtil.isHave()) {
return null;
}
// 参数是否符合要求
if (parammap == null || parammap.isEmpty() || !parammap.containsKey("appName")) {
return null;
}
appName = (String) parammap.get("appName");
if (appName != null && "".equals(appName.trim())) {
return null;
}
parammap.put("ffmpegPath", confUtil.getPath());
resultMap = pusher.push(parammap);
// 处理器和输出线程对应关系
hd.set(appName, resultMap);
} catch (Exception e) {
// 暂时先写这样,后期加日志
System.err.println("重大错误:参数不符合要求或运行失败" + e.getMessage());
return null;
}
return appName;
}
@Override
public synchronized boolean closePush(String appName) {
if (hd.isHave(appName)) {
ConcurrentMap<String, Object> map = hd.get(appName);
// 关闭两个线程
((OutHandler) map.get("error")).destroy();
// ((OutHandler)map.get("info")).destroy();
// 暂时先这样写,后期加日志
System.out.println("停止命令-----end commond");
// 关闭命令主进程
((Process) map.get("process")).destroy();
// 删除处理器与线程对应关系表
hd.delete(appName);
return true;
}
return false;
}
@Override
public synchronized Set<String> viewAppName() {
return hd.getAllAppName();
}
@Override
public synchronized boolean isHave(String appName) {
hd.isHave(appName);
return false;
}
}
使用说明:
livePush0.2.1使用说明:
(重要:使用前必须保证ffmpeg环境包在该项目的WEB-INF\classes\cc\eguid\livepush\ffmpeg\目录中)
1、对象创建
PushManager pusher = new PushManagerImpl();
......@@ -8,19 +8,21 @@ PushManager pusher = new PushManagerImpl();
name:应用名;
input:接收地址;
output:推送地址;
codec:视频编码;
fmt:视频格式;
fps:视频帧率;
rs:视频分辨率;
disableAudio:是否开启音频;
twoPart:0-推一个元码流;1-推一个自定义推流;2-推两个流(一个是自定义,一个是元码)
2.2、参数使用
Map map = new HashMap();
map.put("appName", "testwanglaing123");
map.put("input","rtsp://admin:admin@192.168.2.236:37779/cam/realmonitor?channel=1&subtype=0");
map.put("output", "rtmp://192.168.30.21/live/");
map.put("codec","h264");
map.put("fmt", "flv");
map.put("fps", "25");
map.put("rs", "640x360");
map.put("disableAudio", "true");
map.put("twoPart","2");
3、调用方法
3.1、发布方法
......
......@@ -29,8 +29,9 @@ public class ConfUtil
*/
private void initConfInfo()
{
System.out.print("预加载配置:");
String path = getClass().getResource("../").getPath() + "ffmpeg/ffmpeg.exe";
System.out.print("预加载FFMPEG配置:"+path);
File ffmpeg =new File(path);
ffmpegPath=ffmpeg.getPath();
if (isHave=ffmpeg.isFile())
......
package cc.eguid.livepush.entity;
import java.io.Serializable;
public class LivePushEntity implements Serializable {
private static final long serialVersionUID = -1580871796857185739L;
public LivePushEntity() {
super();
}
/**
* 示例: appName="test123"
* input="rtsp://admin:admin@192.168.2.236:37779/cam/realmonitor?channel=1&subtype=0"
* output="rtmp://192.168.30.21/live/" codec="h264" fmt="flv" fps="25"
* rs="640x360" twoPart="2" twoPart:0-推一个元码流;1-推一个自定义推流;2-推两个流(一个是自定义,一个是元码)
*/
public LivePushEntity(String appName, String input, String output, String codec, String fmt, String fps, String rs,
String twoPart) {
super();
this.appName = appName;
this.input = input;
this.output = output;
this.fmt = fmt;
this.fps = fps;
this.rs = rs;
this.codec = codec;
this.twoPart = twoPart;
}
/**
* 应用名
*/
private String appName;
/**
* 视频源地址,可以是实时流地址也可以是文件路径
*/
private String input;
/**
* 实时流输出地址,这个默认是固定的rtmp服务器发布地址
*/
private String output;
/**
* 视频格式,默认flv
*/
private String fmt;
/**
* 帧率,最好是25-60
*/
private String fps;
/**
* 分辨率,例如:640x360
*/
private String rs;
/**
* 视频编码
*/
private String codec;
/**
* twoPart:0-推一个元码流;1-推一个自定义推流;2-推两个流(一个是自定义,一个是元码)
*/
private String twoPart;
/**
* @return the appName
*/
public String getAppName() {
return appName;
}
/**
* @param appName
* the appName to set
*/
public void setAppName(String appName) {
this.appName = appName;
}
/**
* @return the input
*/
public String getInput() {
return input;
}
/**
* @param input
* the input to set
*/
public void setInput(String input) {
this.input = input;
}
/**
* @return the output
*/
public String getOutput() {
return output;
}
/**
* @param output
* the output to set
*/
public void setOutput(String output) {
this.output = output;
}
/**
* @return the fmt
*/
public String getFmt() {
return fmt;
}
/**
* @param fmt
* the fmt to set
*/
public void setFmt(String fmt) {
this.fmt = fmt;
}
/**
* @return the fps
*/
public String getFps() {
return fps;
}
/**
* @param fps
* the fps to set
*/
public void setFps(String fps) {
this.fps = fps;
}
/**
* @return the rs
*/
public String getRs() {
return rs;
}
/**
* @param rs
* the rs to set
*/
public void setRs(String rs) {
this.rs = rs;
}
/**
* @return the codec
*/
public String getCodec() {
return codec;
}
/**
* @param codec
*/
public void setCodec(String codec) {
this.codec = codec;
}
public String getTwoPart() {
return twoPart;
}
public void setTwoPart(String twoPart) {
this.twoPart = twoPart;
}
}
......@@ -3,13 +3,11 @@
*/
package cc.eguid.livepush.handler;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
/**
* 用于输出命令行主进程的消息线程(必须开启,否则命令行主进程无法正常执行) 重要:该类重写了destroy方法,用于安全的关闭该线程
*
......@@ -19,92 +17,71 @@ import java.io.InputStreamReader;
* @since jdk1.7
*/
public class OutHandler extends Thread
{
/**
* 控制状态
*/
private volatile boolean desstatus = true;
public class OutHandler extends Thread {
/**
* 控制状态
*/
private volatile boolean desstatus = true;
/**
* 读取输出流
*/
private BufferedReader br = null;
/**
* 输出类型
*/
private String type = null;
/**
* 读取输出流
*/
private BufferedReader br=null;
public OutHandler(InputStream is, String type) {
br = new BufferedReader(new InputStreamReader(is));
this.type = type;
}
/**
* 输出类型
*/
private String type=null;
/**
* 重写线程销毁方法,安全的关闭线程
*/
@Override
public void destroy() {
setDesStatus(false);
}
public OutHandler(InputStream is, String type)
{
br = new BufferedReader(new InputStreamReader(is));
this.type = type;
}
public void setDesStatus(boolean desStatus) {
this.desstatus = desStatus;
}
/**
* 重写线程销毁方法,安全的关闭线程
*/
@Override
public void destroy()
{
setDesStatus(false);
}
public void setDesStatus(boolean desStatus)
{
this.desstatus = desStatus;
}
/**
* 执行输出线程
*/
@Override
public void run() {
String msg = null;
int index = 0;
int errorIndex = 0;
int status = 10;
try {
System.out.println(type + "开始推流!");
while (desstatus && (msg = br.readLine()) != null) {
if (msg.indexOf("[rtsp") != -1) {
System.out.println("接收" + status + "个数据包" + msg);
System.out.println(type + ",网络异常丢包,丢包次数:" + errorIndex++ + ",消息体:" + msg);
status = 10;
index = 0;
}
/**
* 执行输出线程
*/
@Override
public void run()
{
String msg = null;
int status = 0;
int index = 0;
try
{
while (desstatus&&(msg = br.readLine()) != null)
{
if (msg.indexOf("[rtsp") != -1)
{
if (status > 5)
{
System.err.println(type + "持续发生严重网络丢包错误,建议立即关闭该应用后检查网络状况!");
}
else
{
System.out.println(type + ",网络异常丢包:" + msg);
}
status++ ;
}
else if (msg.indexOf("[h264") != -1)
{
System.out.println(type + ",解码错误:" + msg);
}
else
{
if (index >= 10)
{
System.out.println(type + ",网络消息:接收到" + index + "个数据包");
index = 0;
}
index++ ;
}
}
}
catch (IOException e)
{
System.out.println("发生内部异常错误,自动关闭["+this.getId()+"]线程");
destroy();
}finally {
if(this.isAlive())
{
destroy();
}
}
}
if (index % status == 0) {
System.out.println("接收" + status + "个数据包" + msg);
status *= 2;
}
index++;
}
} catch (IOException e) {
System.out.println("发生内部异常错误,自动关闭[" + this.getId() + "]线程");
destroy();
} finally {
if (this.isAlive()) {
destroy();
}
}
}
}
......@@ -23,12 +23,12 @@ public interface PushHandler
{
/**
* 处理push操作(包含一个主进程和两个输出线程)
* @param map
* @param parammap
* 格式:
* name:应用名;input:接收地址;output:推送地址;fmt:视频格式;fps:视频帧率;rs:视频分辨率;disableAudio:是否开启音频
* @return map(进程,消息(info,error))
* @throws IOException
*
*/
public ConcurrentMap<String,Object> push(Map<String,String>map)throws Exception;
public ConcurrentMap<String,Object> push(Map<String, Object> parammap)throws Exception;
}
......@@ -16,78 +16,111 @@ import java.util.concurrent.ConcurrentMap;
* @since jdk1.7
*/
public class PushHandlerImpl implements PushHandler
{
/*
* "ffmpeg -i "+ "rtsp://admin:admin@192.168.2.236:37779/cam/realmonitor?channel=1&subtype=0 "+
* " -f flv -r 25 -s 640x360 -an" + " rtmp://192.168.30.21/live/test" 推送流格式:
* name:应用名;input:接收地址;output:推送地址;fmt:视频格式;fps:视频帧率;rs:视频分辨率;disableAudio:是否开启音频
*/
@Override
public ConcurrentMap<String, Object> push(Map<String, String> paramMap)
throws Exception
{
// 从map里面取数据,组装成命令
String comm = getComm4Map(paramMap);
ConcurrentMap<String, Object> resultMap = null;
// 执行命令行
System.out.println("执行命令----start commond:" + comm);
final Process proc = Runtime.getRuntime().exec(comm);
OutHandler errorGobbler = new OutHandler(proc.getErrorStream(), paramMap.get("appName"));
// OutHandler outputGobbler = new OutHandler(proc.getInputStream(), "Info");
public class PushHandlerImpl implements PushHandler {
@Override
public ConcurrentMap<String, Object> push(Map<String, Object> paramMap) throws Exception {
ConcurrentMap<String, Object> resultMap = null;
Process proc =null;
OutHandler errorGobbler=null;
try{
// 从map里面取数据,组装成命令
String comm = getComm4Map(paramMap);
if(comm==null)
{
throw new Exception();
}
// 执行命令行
System.out.println("执行命令:" + comm);
proc = Runtime.getRuntime().exec(comm);
errorGobbler = new OutHandler(proc.getErrorStream(), (String) paramMap.get("appName"));
errorGobbler.start();
// 返回参数
resultMap = new ConcurrentHashMap<String, Object>();
// resultMap.put("info", outputGobbler);
resultMap.put("error", errorGobbler);
resultMap.put("process", proc);
}catch(Exception e){
if(proc!=null){
proc.destroy();
}
if(errorGobbler!=null){
errorGobbler.destroy();
}
throw e;
}
return resultMap;
}
errorGobbler.start();
// outputGobbler.start();
// 返回参数
resultMap = new ConcurrentHashMap<String, Object>();
// resultMap.put("info", outputGobbler);
resultMap.put("error", errorGobbler);
resultMap.put("process", proc);
return resultMap;
}
/**
* 通过解析参数生成可执行的命令行字符串;
* name:应用名;input:接收地址;output:推送地址;fmt:视频格式;fps:视频帧率;rs:视频分辨率;disableAudio:是否开启音频
*
* @param paramMap
* @throws Exception
* @return 命令行字符串
*/
protected String getComm4Map(Map<String, String> paramMap) throws Exception
{
if (paramMap.containsKey("ffmpegPath"))
{
StringBuilder comm = new StringBuilder(paramMap.get("ffmpegPath") + " -i ");
// -i:输入流地址或者文件绝对地址
// 是否有必输项:输入地址,输出地址,应用名
if (paramMap.containsKey("input") && paramMap.containsKey("output")
&& paramMap.containsKey("appName"))
{
comm.append(paramMap.get("input")).append(" ");
// -f :转换格式,默认flv
comm.append(" -f ").append(paramMap.containsKey("fmt") ? paramMap.get("fmt") : "flv").append(" ");
// -r :帧率,默认25
comm.append("-r ").append(paramMap.containsKey("fps") ? paramMap.get("fps") : "30").append(" ");
// -s 分辨率 默认是原分辨率
comm.append("-s ").append(paramMap.containsKey("rs") ? paramMap.get("rs") : "").append(" ");
// -an 禁用音频
comm.append("-an ").append(paramMap.containsKey("disableAudio") && ("true".equals(paramMap.get("disableAudio"))) ? "-an" : "").append(" ");
// 输出地址
comm.append(paramMap.get("output"));
// 发布的应用名
comm.append(paramMap.get("appName"));
// 一个视频源,可以有多个输出,第二个输出为拷贝源视频输出,不改变视频的各项参数并且命名为应用名+HD
comm.append(" ").append(" -vcodec copy -f flv -an ").append(paramMap.get("output")).append(paramMap.get("appName")).append("HD");
return comm.toString();
}
}
else
{
throw new Exception("重大错误:必输项不能为空!");
}
return null;
}
/*
* "ffmpeg -i "+
* "rtsp://admin:admin@192.168.2.236:37779/cam/realmonitor?channel=1&subtype=0 "
* + " -f flv -r 25 -s 640x360 -an" + " rtmp://192.168.30.21/live/test"
* 推送流格式:
* name:应用名;input:接收地址;output:推送地址;fmt:视频格式;fps:视频帧率;rs:视频分辨率
* 是否开启音频
*/
/**
* 通过解析参数生成可执行的命令行字符串;
* name:应用名;input:接收地址;output:推送地址;fmt:视频格式;fps:视频帧率;rs:视频分辨率
* "ffmpeg -i rtsp://admin:admin@192.168.2.236:37779/cam/realmonitor?channel=1&subtype=0 -f flv -r 25 -s 640x360 -an rtmp://192.168.30.21/live/test"
* @param paramMap
* @throws Exception
* @return 命令行字符串
*/
protected String getComm4Map(Map<String, Object> paramMap) throws Exception{
try{
if (paramMap.containsKey("ffmpegPath")) {
String ffmpegPath = (String) paramMap.get("ffmpegPath");
// -i:输入流地址或者文件绝对地址
StringBuilder comm = new StringBuilder(ffmpegPath + " -i ");
// 是否有必输项:输入地址,输出地址,应用名,twoPart:0-推一个元码流;1-推一个自定义推流;2-推两个流(一个是自定义,一个是元码)
if (paramMap.containsKey("input") && paramMap.containsKey("output") && paramMap.containsKey("appName")&& paramMap.containsKey("twoPart")) {
String input = (String) paramMap.get("input");
String output = (String) paramMap.get("output");
String appName = (String) paramMap.get("appName");
String twoPart = (String) paramMap.get("twoPart");
String codec=(String) paramMap.get("codec");
//默认h264解码
codec=(codec==null?"h264":(String) paramMap.get("codec"));
// 输入地址
comm.append(input);
// 当twoPart为0时,只推一个元码流
if ("0".equals(twoPart)) {
comm.append(" -vcodec "+codec+" -f flv -an "+output + appName);
}else{
// -f :转换格式,默认flv
if (paramMap.containsKey("fmt")) {
String fmt = (String) paramMap.get("fmt");
comm.append(" -f " + fmt);
}
// -r :帧率,默认25;-g :帧间隔
if (paramMap.containsKey("fps")) {
String fps = (String) paramMap.get("fps");
comm.append(" -r " + fps);
comm.append(" -g " + fps);
}
// -s 分辨率 默认是原分辨率
if (paramMap.containsKey("rs")) {
String rs = (String) paramMap.get("rs");
comm.append(" -s " + rs);
}
// 输出地址+发布的应用名
comm.append(" -an "+output + appName);
// 当twoPart为2时推两个流,一个自定义流,一个元码流
if ("2".equals(twoPart)) {
// 一个视频源,可以有多个输出,第二个输出为拷贝源视频输出,不改变视频的各项参数并且命名为应用名+HD
comm.append(" -vcodec copy -f flv -an ").append(output +appName+ "HD");
}
}
return comm.toString();
}
}
}catch(Exception e){
throw e;
}
return null;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册