提交 2f17ee59 编写于 作者: 丁劲犇's avatar 丁劲犇 😸

多线程与水位优化。

上级 cd8e83c9
#include "listen_thread.h"
#include "tb_interface.h"
int i_watermark =0;
std::atomic<long long> g_watermark (0);
listen_thread::listen_thread(QObject * parent)
:QThread(parent)
{
......@@ -32,6 +36,14 @@ void listen_thread::run()
bfinished = true;
}
}
else if (header.subject_id==i_watermark)
{
if (header.data_length>=8)
{
long long * ptr = (long long *)packagedta.data();
g_watermark = *ptr;
}
}
}
}
emit quit_app();
......
......@@ -2,7 +2,7 @@
#define LISTEN_THREAD_H
#include <QThread>
#include <atomic>
class listen_thread: public QThread
{
Q_OBJECT
......@@ -14,4 +14,6 @@ signals:
void quit_app();
};
extern int i_watermark /*=0*/;
extern std::atomic<long long> g_watermark /*=0*/;
#endif // LISTEN_THREAD_H
......@@ -149,6 +149,7 @@ int do_source(const cmdlineParser & args)
int instance = args.toInt("instance",0);
int isource = args.toInt("source",0);
int timestamp = args.toInt("timestamp",0);
i_watermark = args.toInt("watermark",0);
int encode = args.toInt("encode",0);
//监视的文件夹
if (args["folder"].size()!=1)
......@@ -172,10 +173,11 @@ int do_source(const cmdlineParser & args)
const int read_jump = args.toInt("read_jump",1);
//递归
const int recursive = args.toInt("recursive",0);
//递归
//Sample rate
const double sample_rate = args.toDouble("sample_rate",8000.000);
//Watermark feedback
const long long minmark = args.toInt64("min_watermark",0);
//Initial start of file
long long initial_offset = args.toInt64("initial_offset",0);
if (read_jump<=0)
......@@ -313,7 +315,15 @@ int do_source(const cmdlineParser & args)
);
}
++total_frames;
//时间控制
//水位控制
if (minmark > 0 && i_watermark > 0)
{
while(g_watermark > minmark)
{
QThread::msleep(20);
}
}
//时间控制
finish = clock();
duration = (double)(finish - start)*1000 / CLOCKS_PER_SEC;//毫秒
while (duration/total_frames < 1000.0/frame_rate)
......
{
"source_files":{
"name":"sourcefiles",
"parameters":{
"folder":{
"type":"string",
"tooltip":"watching folder",
"default":"."
},
"type":{
"type":"string",
"tooltip":"watching file filter",
"default":"*.dat"
},
"encode":{
"type":"enum",
"tooltip":"filename encoding",
"default":0,
"range":{
"0":"Local 8 bits",
"1":"UTF-8"
}
},
"mode":{
"type":"enum",
"tooltip":"working mod",
"default":0,
"range":{
"0":"normal",
"1":"repeat"
}
},
"auto_del":{
"type":"bool",
"tooltip":"auto delete files",
"default":0
},
"frame_contines":{
"type":"bool",
"tooltip":"treate files as continues frames",
"default":1
},
"initial_offset":{
"type":"int",
"tooltip":"initial offset",
"default":0
},
"frame_len":{
"type":"int",
"tooltip":"frame length in bytes",
"default":1
},
"read_jump":{
"type":"int",
"tooltip":"reading step in bytes",
"default":1
},
"recursive":{
"type":"int",
"tooltip":"recursive sub folders",
"default":0
},
"frame_rate":{
"type":"float",
"tooltip":"frame rate(frames/seconds)",
"default":100
},
"sample_rate":{
"type":"float",
"tooltip":"sample rate(Hz)",
"default":8000.000
},
"keep_last":{
"type":"bool",
"tooltip":"leave last file undealed",
"default":1
}
},
"input_subjects":
{
},
"output_subjects":{
"source":{
"type":"byte",
"tooltip":"data output"
},
"timestamp":{
"type":"unsigned long long",
"tooltip":"timestamp in bytes"
}
}
"name":"sourcefiles",
"parameters":{
"folder":{
"type":"string",
"tooltip":"watching folder",
"default":"."
},
"type":{
"type":"string",
"tooltip":"watching file filter",
"default":"*.dat"
},
"encode":{
"type":"enum",
"tooltip":"filename encoding",
"default":0,
"range":{
"0":"Local 8 bits",
"1":"UTF-8"
}
},
"mode":{
"type":"enum",
"tooltip":"working mod",
"default":0,
"range":{
"0":"normal",
"1":"repeat"
}
},
"auto_del":{
"type":"bool",
"tooltip":"auto delete files",
"default":0
},
"frame_contines":{
"type":"bool",
"tooltip":"treate files as continues frames",
"default":1
},
"initial_offset":{
"type":"int",
"tooltip":"initial offset",
"default":0
},
"frame_len":{
"type":"int",
"tooltip":"frame length in bytes",
"default":1
},
"read_jump":{
"type":"int",
"tooltip":"reading step in bytes",
"default":1
},
"recursive":{
"type":"int",
"tooltip":"recursive sub folders",
"default":0
},
"frame_rate":{
"type":"float",
"tooltip":"frame rate(frames/seconds)",
"default":100
},
"sample_rate":{
"type":"float",
"tooltip":"sample rate(Hz)",
"default":8000.000
},
"keep_last":{
"type":"bool",
"tooltip":"leave last file undealed",
"default":1
},
"min_watermark":{
"type":"long long",
"tooltip":"Min Watermark",
"default":1
}
},
"input_subjects":
{
"watermark":{
"type":"long long",
"tooltip":"watermark"
}
},
"output_subjects":{
"source":{
"type":"byte",
"tooltip":"data output"
},
"timestamp":{
"type":"unsigned long long",
"tooltip":"timestamp in bytes"
}
}
}
}
......@@ -74,10 +74,19 @@
"type":"bool",
"tooltip":"保留最新的一个文件不处理(避免冲突)",
"default":1
},
"min_watermark":{
"type":"long long",
"tooltip":"水位门限",
"default":1
}
},
"input_subjects":
{
"watermark":{
"type":"long long",
"tooltip":"水位输入"
}
},
"output_subjects":{
"source":{
......
......@@ -330,6 +330,8 @@ int do_iio(const cmdlineParser & args)
stdin_pos.push_back(0);
tx_buff_ptr.push_back(0);
}
std::mutex push_mtx;
std::mutex * pMtx = (i_wav_rx[0]+i_wav_rx[1]+i_txWaterMark)>0?&push_mtx:nullptr;
//reset time
uhd_usrp_set_time_now(usrp,0,0,0);
//Define RX Thread, Producer
......@@ -399,7 +401,8 @@ int do_iio(const cmdlineParser & args)
i_wav_rx[ch],
instance,
rx_frame * 2 * sizeof(SPTYPE),
(unsigned char *) &buf_rx_ptr[(stdout_pos * 2)% sz_buffer_all] );
(unsigned char *) &buf_rx_ptr[(stdout_pos * 2)% sz_buffer_all],
pMtx);
}
stdout_pos += rx_frame;
}
......@@ -490,7 +493,8 @@ int do_iio(const cmdlineParser & args)
i_txWaterMark,
instance,
sizeof(qint64),
(unsigned char *) &watM);
(unsigned char *) &watM,
pMtx);
}
bool can_send = true;
......
......@@ -391,6 +391,9 @@ int do_iio(const cmdlineParser & args)
tx_pos.push_back(0);
tx_buff_ptr.push_back(0);
}
std::mutex push_mtx;
std::mutex * pMtx = (i_wav_rx[0]+i_wav_rx[1]+i_txWaterMark)>0?&push_mtx:nullptr;
//reset time
uhd_usrp_set_time_now(usrp,0,0,0);
//Define RX Thread, Producer
......@@ -467,7 +470,8 @@ int do_iio(const cmdlineParser & args)
i_rx_tm,
instance,
sizeof(tag_timestmp),
(unsigned char *) &buf_rx_list[stdout_pos % sz_buffer_rx].tmstmp );
(unsigned char *) &buf_rx_list[stdout_pos % sz_buffer_rx].tmstmp,
pMtx);
}
for (int ch = 0;ch<rx_channel_count;++ch)
{
......@@ -478,7 +482,8 @@ int do_iio(const cmdlineParser & args)
i_wav_rx[ch],
instance,
buf_rx_list[stdout_pos % sz_buffer_rx].ch_len * 2 * sizeof(SPTYPE),
(unsigned char *) dta.ch_data[ch].data() );
(unsigned char *) dta.ch_data[ch].data(),
pMtx);
}
}
++stdout_pos;
......@@ -602,7 +607,8 @@ int do_iio(const cmdlineParser & args)
i_txWaterMark,
instance,
sizeof(qint64),
(unsigned char *) &watM);
(unsigned char *) &watM,
pMtx);
}
}
......
......@@ -16,7 +16,7 @@
#include <fcntl.h>
#endif
//Comments:
//Next feature: Multithread-push protect.
namespace TASKBUS{
#define TB_SUBJECT_CMD 0xffffffffu
//数据专题包头结构体,1字节序对齐
......@@ -31,49 +31,7 @@ namespace TASKBUS{
};
#pragma pack(pop)
/*!
* \brief init_client A application should call initclient first.
*/
inline void init_client();
//是否为控制指令 Whether to be a control instruction or a normal subject
inline bool is_control_subject(const subject_package_header & header);
inline bool is_valid_header(const subject_package_header & header);
//返回控制信令专题 TB_SUBJECT_CMD
inline unsigned int control_subect_id();
//用于方便连线模式的函数, line connect mod(subject) functions
inline void push_subject(
const unsigned int subject_id,
const unsigned int path_id,
const unsigned int data_length,
const unsigned char *dataptr
);
inline void push_subject(
const unsigned int subject_id,
const unsigned int path_id,
const char *dataptr
);
inline void push_subject(
const unsigned char *allptr,
const unsigned int totalLength
);
inline void push_subject(
const subject_package_header header,
const unsigned char *dataptr
);
//接收专题数据,push subject to stdout
inline std::vector<unsigned char> pull_subject(
subject_package_header * header
);
//调试,利用记录的打桩进程的stdin和命令行调试。会返回命令行
inline std::vector<std::string> debug(const char * logpath, FILE ** old_stdin, FILE ** old_stdout);
//用于方便操作指令的函数, text command mod functions
//--------------------------------------------------------------------
inline std::string trim_elems(const std::string & text);
inline std::map<std::string, std::string> string_to_map(const std::string & s);
inline std::map<std::string, std::string> ctrlpackage_to_map(const std::vector<unsigned char> & s);
inline std::string map_to_string(const std::map<std::string, std::string> & s);
inline std::vector<unsigned char> map_to_ctrlpackage(const std::map<std::string, std::string> & s);
/**
*以下部分为具体实现。The following sections are the implementation
* -----------------------------------------------------------------------
......@@ -94,6 +52,10 @@ namespace TASKBUS{
#endif
}
/*!
* \brief lendian 返回大小端
* \return
*/
inline bool lendian()
{
static const short testv = 0x0102;
......@@ -116,32 +78,42 @@ namespace TASKBUS{
return res;
}
//推送专题数据
/*!
* \brief push_subject 向标准输出stdout写入一个包
* \param subject_id 专题ID
* \param path_id 通道ID
* \param data_length 数据长度
* \param dataptr 数据地址
* \param pMtx 可选的互斥量,用于多线程
*/
inline void push_subject(
const unsigned int subject_id,
const unsigned int path_id,
const unsigned int data_length,
const unsigned char *dataptr
const unsigned char *dataptr,
std::mutex * pMtx = nullptr
)
{
const unsigned char prefix[4] = { 0x3C,0x5A,0x7E,0x69};
if (pMtx) pMtx->lock();
fwrite(prefix,sizeof(char),4,stdout);
fwrite(&subject_id,sizeof(subject_id),1,stdout);
fwrite(&path_id,sizeof(path_id),1,stdout);
fwrite(&data_length,sizeof(data_length),1,stdout);
fwrite(dataptr,sizeof(unsigned char),data_length,stdout);
fflush (stdout);
if (pMtx) pMtx->unlock();
}
//推送专题数据
inline void push_subject(
const unsigned int subject_id,
//const unsigned int source_id,
//const unsigned int destin_id,
const unsigned int path_id,
const char *dataptr
const char *dataptr,
std::mutex * pMtx = nullptr
)
{
const unsigned char prefix[4] = { 0x3C,0x5A,0x7E,0x69};
if (pMtx) pMtx->lock();
fwrite(prefix,sizeof(char),4,stdout);
fwrite(&subject_id,sizeof(subject_id),1,stdout);
fwrite(&path_id,sizeof(path_id),1,stdout);
......@@ -149,29 +121,35 @@ namespace TASKBUS{
fwrite(&lenstr,sizeof(lenstr),1,stdout);
fwrite(dataptr,sizeof(unsigned char),lenstr,stdout);
fflush (stdout);
if (pMtx) pMtx->unlock();
}
//推送专题数据
inline void push_subject(
const subject_package_header header,
const unsigned char *dataptr
const unsigned char *dataptr,
std::mutex * pMtx = nullptr
)
{
assert(header.prefix[0]==0x3C&&header.prefix[1]==0x5A&&header.prefix[2]==0x7E&&header.prefix[3]==0x69);
if (pMtx) pMtx->lock();
fwrite(&header,sizeof(header),1,stdout);
fwrite(dataptr,sizeof(unsigned char),header.data_length,stdout);
fflush (stdout);
if (pMtx) pMtx->unlock();
}
inline void push_subject(
const unsigned char *allptr,
const unsigned int totalLength
const unsigned int totalLength,
std::mutex * pMtx = nullptr
)
{
assert(allptr[0]==0x3C&&allptr[1]==0x5A&&allptr[2]==0x7E&&allptr[3]==0x69);
if (pMtx) pMtx->lock();
fwrite(allptr,sizeof(unsigned char),totalLength,stdout);
fflush (stdout);
if (pMtx) pMtx->unlock();
}
//接收专题数据
inline std::vector<unsigned char> pull_subject(
......@@ -199,7 +177,7 @@ namespace TASKBUS{
{
fread(buf,1,header->data_length % batchdeal,stdin);
std::copy(buf,buf+header->data_length % batchdeal, std::back_inserter( buf_data ));
}
}
}
return buf_data;
}
......@@ -412,7 +390,8 @@ namespace TASKBUS{
const std::string & type = "",
const std::string & range = "",
const std::string & step = "",
const std::string & decimal = ""
const std::string & decimal = "",
std::mutex * pMtx = nullptr
)
{
std::map<std::string, std::string> mp;
......@@ -427,14 +406,15 @@ namespace TASKBUS{
if (range.size()) mp["step"] = step;
if (range.size()) mp["decimal"] = decimal;
std::vector<unsigned char> v = map_to_ctrlpackage(mp);
push_subject(TB_SUBJECT_CMD,0,v.size(),v.data());
push_subject(TB_SUBJECT_CMD,0,v.size(),v.data(),pMtx);
}
//播告功能
inline void set_handle(
const std::string & source,
const std::string & destin,
const std::string & handle,
const std::string & value
const std::string & value,
std::mutex * pMtx = nullptr
)
{
std::map<std::string, std::string> mp;
......@@ -444,7 +424,7 @@ namespace TASKBUS{
mp["handle"] = handle;
mp["value"] = value;
std::vector<unsigned char> v = map_to_ctrlpackage(mp);
push_subject(TB_SUBJECT_CMD,0,v.size(),v.data());
push_subject(TB_SUBJECT_CMD,0,v.size(),v.data(),pMtx);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册