提交 572a0d7b 编写于 作者: E eguid 提交者: wangliang

修改推流控制策略,更加方便控制推流器开启和关闭

上级 c0a23886
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8"/>
<classpathentry kind="output" path="bin"/>
</classpath>
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>livePush</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.source=1.8
使用说明:
(重要:使用前必须保证ffmpeg环境包在该项目的WEB-INF\classes\cc\eguid\livepush\ffmpeg\目录中)
1、对象创建
PushManager pusher = new PushManagerImpl();
2、参数说明及设置
2.1、参数说明
name:应用名;
input:接收地址;
output:推送地址;
fmt:视频格式;
fps:视频帧率;
rs:视频分辨率;
disableAudio:是否开启音频;
2.2、参数使用
Map map = new HashMap();
map.put("appName", "testwanglaing123");
map.put("input","rtsp://admin:admin@192.168.2.236:37779/cam/realmonitor?channel=1&subtype=0");
map.put("output", "rtmp://192.168.30.21/live/");
map.put("fmt", "flv");
map.put("fps", "25");
map.put("rs", "640x360");
map.put("disableAudio", "true");
3、调用方法
3.1、发布方法
id push(map)
pusher.push(map);
3.2、关闭发布方法
void closePush(appName)
pusher.closePush(appName);
3.3、查看所有正在push的列表
Set<String> viewAppName()
pusher.viewAppName();
3.4、检测是否存在某个push
boolean isHave(String appName)
pusher.isHave(appName);
\ No newline at end of file
/**
* 文件名:PushManager.java
* 描述:用于处理push相关操作的接口
* 修改人:eguid
* 修改时间:2016年6月24日
* 修改内容:
*/
package cc.eguid.livepush;
import java.util.Map;
import java.util.Set;
/**
* 用于提供push操作的增删查服务
* @author eguid
* @version 2016年6月24日
* @see PushManager
* @since jdk1.7
*/
public interface PushManager
{
/**
* 发布一个流到服务器
* @param map
* @return pushId(当前发布流的标识,方便操作该push)
*/
public String push(Map<String,String>map);
/**
* 通过应用名删除某个push
* @param pushId
*/
public boolean closePush(String appName);
/**
* 查看全部当前正在运行的应用名称
* @param pushId
*/
public Set<String> viewAppName();
/**
* 应用是否已经存在
* @param appName
* @return true:存在;false:不存在
*/
public boolean isHave(String appName);
}
/**
* 文件名:PushMangerImpl.java 描述:实现push管理器的接口功能 修改人:eguid 修改时间:2016年6月29日 修改内容:增加管理处理器和应用名关系
*/
package cc.eguid.livepush;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import cc.eguid.livepush.conf.ConfUtil;
import cc.eguid.livepush.dao.HandlerDao;
import cc.eguid.livepush.dao.HandlerDaoImpl;
import cc.eguid.livepush.handler.OutHandler;
import cc.eguid.livepush.handler.PushHandler;
import cc.eguid.livepush.handler.PushHandlerImpl;
/**
* 实现push管理器的push,delete,view服务
*
* @author eguid
* @version 2016年6月24日
* @see PushMangerImpl
* @since jdk1.7
*/
public class PushManagerImpl implements PushManager
{
/**
*配置文件
*/
private ConfUtil confUtil = new ConfUtil();
public PushManagerImpl()
{
confUtil.isHave();
}
/**
* 引用push处理器
*/
private PushHandler pusher = new PushHandlerImpl();
/**
* 管理处理器的主进程Process及两个输出线程的关系
*/
private HandlerDao hd = new HandlerDaoImpl();
public void setPusher(PushHandler pusher)
{
this.pusher = pusher;
}
public void setHd(HandlerDao hd)
{
this.hd = hd;
}
@Override
public String push(Map<String, String> parammap)
{
// ffmpeg环境是否配置正确
if (!confUtil.isHave())
{
return null;
}
// 参数是否符合要求
if (parammap == null || parammap.isEmpty() || !parammap.containsKey("appName"))
{
return null;
}
String appName = null;
ConcurrentMap<String, Object> resultMap = null;
try
{
appName = parammap.get("appName");
if (appName != null && "".equals(appName.trim()))
{
return null;
}
parammap.put("ffmpegPath", confUtil.getPath());
resultMap = pusher.push(parammap);
// 处理器和输出线程对应关系
hd.set(appName, resultMap);
}
catch (Exception e)
{
// 暂时先写这样,后期加日志
System.err.println("重大错误:参数不符合要求" + e.getMessage());
}
return appName;
}
@Override
public boolean closePush(String appName)
{
if (hd.isHave(appName))
{
ConcurrentMap<String, Object> map = hd.get(appName);
// 关闭两个线程
((OutHandler)map.get("error")).destroy();
// ((OutHandler)map.get("info")).destroy();
// 暂时先这样写,后期加日志
System.out.println("停止命令-----end commond");
// 关闭命令主进程
((Process)map.get("process")).destroy();
// 删除处理器与线程对应关系表
hd.delete(appName);
return true;
}
return false;
}
@Override
public Set<String> viewAppName()
{
return hd.getAllAppName();
}
@Override
public boolean isHave(String appName)
{
hd.isHave(appName);
return false;
}
}
使用说明:
(重要:使用前必须保证ffmpeg环境包在该项目的WEB-INF\classes\cc\eguid\livepush\ffmpeg\目录中)
1、对象创建
PushManager pusher = new PushManagerImpl();
2、参数说明及设置
2.1、参数说明
name:应用名;
input:接收地址;
output:推送地址;
fmt:视频格式;
fps:视频帧率;
rs:视频分辨率;
disableAudio:是否开启音频;
2.2、参数使用
Map map = new HashMap();
map.put("appName", "testwanglaing123");
map.put("input","rtsp://admin:admin@192.168.2.236:37779/cam/realmonitor?channel=1&subtype=0");
map.put("output", "rtmp://192.168.30.21/live/");
map.put("fmt", "flv");
map.put("fps", "25");
map.put("rs", "640x360");
map.put("disableAudio", "true");
3、调用方法
3.1、发布方法
id push(map)
pusher.push(map);
3.2、关闭发布方法
void closePush(appName)
pusher.closePush(appName);
3.3、查看所有正在push的列表
Set<String> viewAppName()
pusher.viewAppName();
3.4、检测是否存在某个push
boolean isHave(String appName)
pusher.isHave(appName);
\ No newline at end of file
/**
* 文件名:ConfUtil.java 描述:读取配置文件属性 修改人:eguid 修改时间:2016年7月8日 修改内容:
*/
package cc.eguid.livepush.conf;
import java.io.File;
/**
* 读取配置文件
*
* @author eguid
* @version 2016年7月8日
* @see ConfUtil
* @since jdk1.7
*/
public class ConfUtil
{
private volatile static boolean isHave=false;
private volatile static String ffmpegPath=null;
public ConfUtil()
{
super();
initConfInfo();
}
/**
* 从配置文件中初始化参数
*/
private void initConfInfo()
{
System.out.print("预加载配置:");
String path = getClass().getResource("../").getPath() + "ffmpeg/ffmpeg.exe";
File ffmpeg =new File(path);
ffmpegPath=ffmpeg.getPath();
if (isHave=ffmpeg.isFile())
{
System.out.println("加载ffmpeg成功!");
}
else
{
System.out.println("加载ffmpeg失败!");
}
}
/**
*判断ffmpeg环境配置
* @return true:配置成功;false:配置失败
*/
public boolean isHave()
{
return isHave;
}
/**
* 获取ffmpeg环境调用地址
* 添加方法功能描述
* @return
*/
public String getPath()
{
return ffmpegPath;
}
}
/**
* 文件名:HandlerDao.java
* 描述:管理所有命令行处理器的缓存
* 修改人:eguid
* 修改时间:2016年6月27日
* 修改内容:
*/
package cc.eguid.livepush.dao;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
/**
* 命令行执行处理器缓存,方便管理处理器的开启和关闭
* @author eguid
* @version 2016年6月27日
* @see HandlerDao
* @since jdk1.7
*/
public interface HandlerDao
{
/**
* 获取某个处理器
* @param pushId
* @return
*/
public ConcurrentMap<String, Object> get(String pushId);
/**
* 存放一个处理器
* @param handlerMap
*/
public void set(String key, ConcurrentMap<String, Object> resultMap);
/**
* 获取全部处理器
* @return
*/
public ConcurrentMap<String, ConcurrentMap<String, Object>> getAll();
/**
* 获取全部处理器名称
* @return
*/
public Set<String> getAllAppName();
/**
* 删除某个处理器
* @param pushId
*/
public void delete(String appName);
/**
* 是否存在key
*/
public boolean isHave(String appName);
}
/**
* 文件名:HandlerDaoImpl.java 描述:命令行执行处理器缓存的简单实现 修改人:eguid 修改时间:2016年6月27日 修改内容:
*/
package cc.eguid.livepush.dao;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 处理器的缓存简单实现
* @author eguid
* @version 2016年6月27日
* @see HandlerDaoImpl
* @since jdk1.7
*/
public class HandlerDaoImpl implements HandlerDao
{
/**
* 存放process
*/
private static ConcurrentMap<String, ConcurrentMap<String, Object>> handlerMap = new ConcurrentHashMap<String, ConcurrentMap<String, Object>>(20);
@Override
public ConcurrentMap<String,Object> get(String pushId)
{
if(handlerMap.containsKey(pushId))
{
return handlerMap.get(pushId);
}
return null;
}
@Override
public ConcurrentMap<String, ConcurrentMap<String, Object>> getAll()
{
return handlerMap;
}
@Override
public void delete(String pushId)
{
if (pushId != null)
{
handlerMap.remove(pushId);
}
}
@Override
public void set(String key, ConcurrentMap<String, Object> map)
{
if (key != null)
{
handlerMap.put(key, map);
}
}
@Override
public boolean isHave(String pushId)
{
return handlerMap.containsKey(pushId);
}
@Override
public Set<String> getAllAppName()
{
return handlerMap.keySet();
}
}
/**
* 文件名:OutHandler.java 描述:输出命令行主进程消息 修改人:eguid 修改时间:2016年6月27日 修改内容:
*/
package cc.eguid.livepush.handler;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
/**
* 用于输出命令行主进程的消息线程(必须开启,否则命令行主进程无法正常执行) 重要:该类重写了destroy方法,用于安全的关闭该线程
*
* @author eguid
* @version 2016年6月27日
* @see OutHandler
* @since jdk1.7
*/
public class OutHandler extends Thread
{
/**
* 控制状态
*/
private volatile boolean desstatus = true;
/**
* 读取输出流
*/
private BufferedReader br=null;
/**
* 输出类型
*/
private String type=null;
public OutHandler(InputStream is, String type)
{
br = new BufferedReader(new InputStreamReader(is));
this.type = type;
}
/**
* 重写线程销毁方法,安全的关闭线程
*/
@Override
public void destroy()
{
setDesStatus(false);
}
public void setDesStatus(boolean desStatus)
{
this.desstatus = desStatus;
}
/**
* 执行输出线程
*/
@Override
public void run()
{
String msg = null;
int status = 0;
int index = 0;
try
{
while (desstatus&&(msg = br.readLine()) != null)
{
if (msg.indexOf("[rtsp") != -1)
{
if (status > 5)
{
System.err.println(type + "持续发生严重网络丢包错误,建议立即关闭该应用后检查网络状况!");
}
else
{
System.out.println(type + ",网络异常丢包:" + msg);
}
status++ ;
}
else if (msg.indexOf("[h264") != -1)
{
System.out.println(type + ",解码错误:" + msg);
}
else
{
if (index >= 10)
{
System.out.println(type + ",网络消息:接收到" + index + "个数据包");
index = 0;
}
index++ ;
}
}
}
catch (IOException e)
{
System.out.println("发生内部异常错误,自动关闭["+this.getId()+"]线程");
destroy();
}finally {
if(this.isAlive())
{
destroy();
}
}
}
}
/**
* 文件名:PushHandler.java
* 描述:push操作处理器接口
* 修改人:eguid
* 修改时间:2016年6月24日
* 修改内容:
*/
package cc.eguid.livepush.handler;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
/**
* 用于提供处理push操作的服务接口
* @author eguid
* @version 2016年6月24日
* @see PushHandler
* @since jdk1.7
*/
public interface PushHandler
{
/**
* 处理push操作(包含一个主进程和两个输出线程)
* @param map
* 格式:
* name:应用名;input:接收地址;output:推送地址;fmt:视频格式;fps:视频帧率;rs:视频分辨率;disableAudio:是否开启音频
* @return map(进程,消息(info,error))
* @throws IOException
*
*/
public ConcurrentMap<String,Object> push(Map<String,String>map)throws Exception;
}
/**
* 文件名:PushHandlerImpl.java 描述: 修改人:eguid 修改时间:2016年6月24日 修改内容:
*/
package cc.eguid.livepush.handler;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 提供解析参数生成ffmpeg命令并处理push操作
*
* @author eguid
* @version 2016年6月24日
* @see PushHandlerImpl
* @since jdk1.7
*/
public class PushHandlerImpl implements PushHandler
{
/*
* "ffmpeg -i "+ "rtsp://admin:admin@192.168.2.236:37779/cam/realmonitor?channel=1&subtype=0 "+
* " -f flv -r 25 -s 640x360 -an" + " rtmp://192.168.30.21/live/test" 推送流格式:
* name:应用名;input:接收地址;output:推送地址;fmt:视频格式;fps:视频帧率;rs:视频分辨率;disableAudio:是否开启音频
*/
@Override
public ConcurrentMap<String, Object> push(Map<String, String> paramMap)
throws Exception
{
// 从map里面取数据,组装成命令
String comm = getComm4Map(paramMap);
ConcurrentMap<String, Object> resultMap = null;
// 执行命令行
System.out.println("执行命令----start commond:" + comm);
final Process proc = Runtime.getRuntime().exec(comm);
OutHandler errorGobbler = new OutHandler(proc.getErrorStream(), paramMap.get("appName"));
// OutHandler outputGobbler = new OutHandler(proc.getInputStream(), "Info");
errorGobbler.start();
// outputGobbler.start();
// 返回参数
resultMap = new ConcurrentHashMap<String, Object>();
// resultMap.put("info", outputGobbler);
resultMap.put("error", errorGobbler);
resultMap.put("process", proc);
return resultMap;
}
/**
* 通过解析参数生成可执行的命令行字符串;
* name:应用名;input:接收地址;output:推送地址;fmt:视频格式;fps:视频帧率;rs:视频分辨率;disableAudio:是否开启音频
*
* @param paramMap
* @throws Exception
* @return 命令行字符串
*/
protected String getComm4Map(Map<String, String> paramMap) throws Exception
{
if (paramMap.containsKey("ffmpegPath"))
{
StringBuilder comm = new StringBuilder(paramMap.get("ffmpegPath") + " -i ");
// -i:输入流地址或者文件绝对地址
// 是否有必输项:输入地址,输出地址,应用名
if (paramMap.containsKey("input") && paramMap.containsKey("output")
&& paramMap.containsKey("appName"))
{
comm.append(paramMap.get("input")).append(" ");
// -f :转换格式,默认flv
comm.append(" -f ").append(paramMap.containsKey("fmt") ? paramMap.get("fmt") : "flv").append(" ");
// -r :帧率,默认25
comm.append("-r ").append(paramMap.containsKey("fps") ? paramMap.get("fps") : "30").append(" ");
// -s 分辨率 默认是原分辨率
comm.append("-s ").append(paramMap.containsKey("rs") ? paramMap.get("rs") : "").append(" ");
// -an 禁用音频
comm.append("-an ").append(paramMap.containsKey("disableAudio") && ("true".equals(paramMap.get("disableAudio"))) ? "-an" : "").append(" ");
// 输出地址
comm.append(paramMap.get("output"));
// 发布的应用名
comm.append(paramMap.get("appName"));
// 一个视频源,可以有多个输出,第二个输出为拷贝源视频输出,不改变视频的各项参数并且命名为应用名+HD
comm.append(" ").append(" -vcodec copy -f flv -an ").append(paramMap.get("output")).append(paramMap.get("appName")).append("HD");
return comm.toString();
}
}
else
{
throw new Exception("重大错误:必输项不能为空!");
}
return null;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册