提交 3b72a987 编写于 作者: 丁劲犇's avatar 丁劲犇 😸

改进了水位反馈策略,使得水位反馈的灵敏度提高。

上级 6cf3f615
......@@ -97,6 +97,8 @@ int do_iio(const cmdlineParser & args)
const quint32 i_wav_rx[2] = {args.toUInt("wav_rx0",0),args.toUInt("wav_rx1",0)};
const quint32 i_wav_tx[2] = {args.toUInt("wav_tx0",0),args.toUInt("wav_tx1",0)};
const quint32 i_txWaterMark = args.toInt("tx_mark",0);
//设备句柄
uhd_usrp_handle usrp = nullptr;
uhd_rx_streamer_handle rx_streamer = nullptr;
......@@ -104,6 +106,114 @@ int do_iio(const cmdlineParser & args)
uhd_tx_streamer_handle tx_streamer = nullptr;
uhd_tx_metadata_handle tx_meta = nullptr;
//ring buffer in short inters
//初始化环状缓存器
const quint64 sz_buffer_all = 128 * 1024 * 1024;
std::vector<std::vector<SPTYPE> > buf_rx_array;
std::vector<std::vector<SPTYPE> > buf_tx_array;
//In IQ Samples, RF io
quint64 rx_pos = 0, tx_pos = 0,stdout_pos = 0;
//In IQ Samples, Std IO
std::vector<quint64> stdin_pos;
std::vector<void *> rx_buff_ptr, tx_buff_ptr;
size_t num_rx_samps = 0,num_sps_sent = 0 ;
for (int ch = 0;ch<rx_channel_count;++ch)
{
buf_rx_array.push_back(std::vector<SPTYPE>(sz_buffer_all + 1024*1024,0));
rx_buff_ptr.push_back(0);
}
for (int ch = 0;ch<tx_channel_count;++ch)
{
buf_tx_array.push_back(std::vector<SPTYPE>(sz_buffer_all + 1024*1024,0));
stdin_pos.push_back(0);
tx_buff_ptr.push_back(0);
}
std::mutex push_mtx;
std::mutex * pMtx = (i_wav_rx[0]+i_wav_rx[1]+i_txWaterMark)>0?&push_mtx:nullptr;
//要及时启动stdin,以便接收和反馈水位
//Define StdIn Thread, Producer
std::function<void()> thread_stdin = [&]()->void
{
try{
//Read STDIN
while (!stop_signal_called) {
subject_package_header header;
std::vector<unsigned char> packagedta = pull_subject(&header);
if (!is_valid_header(header))
{
fprintf(stderr,"Recived BAD Command.");
fflush(stderr);
QThread::msleep(100);
continue;
}
if (packagedta.size())
{
if ( is_control_subject(header))
{
//收到命令进程退出的广播消息,退出
if (strstr((const char *)packagedta.data(),"function=quit;")!=nullptr)
{
fprintf(stderr,"Recived Quit Command.");
fflush(stderr);
stop_signal_called = true;
}
else
cmd_dealing(usrp,packagedta);
}
else
{
for (int ch = 0; ch < tx_channel_count;++ch)
{
if (header.subject_id == i_wav_tx[ch])
{
SPTYPE * buf_tx_ptr = buf_tx_array[ch].data();
quint64 sps_rev = packagedta.size()/2/sizeof(SPTYPE);
short * iq = (short *)(packagedta.data());
memcpy(&buf_tx_ptr[(stdin_pos[ch] * 2)% sz_buffer_all],
iq, sps_rev * sizeof(SPTYPE)*2
);
const quint64 bgnext = ((stdin_pos[ch] + sps_rev) * 2) % sz_buffer_all;
if (bgnext < sps_rev * 2 && bgnext>0)
{
memcpy(buf_tx_ptr, buf_tx_ptr + sz_buffer_all, bgnext * sizeof(SPTYPE) );
}
stdin_pos[ch] += sps_rev;
}
}
if (i_txWaterMark>0)
{
qint64 watM = stdin_pos[0] - tx_pos;
if (tx_channel_count>1)
{
qint64 wat2 = stdin_pos[1] - tx_pos;
if (wat2 < watM)
watM = wat2;
}
TASKBUS::push_subject(
i_txWaterMark,
instance,
sizeof(qint64),
(unsigned char *) &watM,
pMtx);
}
}
}
}
}
catch (std::string er)
{
fputs(er.c_str(),stderr);
stop_signal_called = true;
}
};
uhd_io_thread th_wav_stdin(thread_stdin,0);
th_wav_stdin.start(QThread::HighestPriority);
try{
fprintf(stderr, "Creating USRP with args \"%s\"...\n", dev_args.c_str());
......@@ -307,31 +417,6 @@ int do_iio(const cmdlineParser & args)
fflush(stderr);
}
//ring buffer in short inters
const quint64 sz_buffer_all = 8 * 1024 * 1024;
std::vector<std::vector<SPTYPE> > buf_rx_array;
std::vector<std::vector<SPTYPE> > buf_tx_array;
//In IQ Samples, RF io
quint64 rx_pos = 0, tx_pos = 0,stdout_pos = 0;
//In IQ Samples, Std IO
std::vector<quint64> stdin_pos;
std::vector<void *> rx_buff_ptr, tx_buff_ptr;
size_t num_rx_samps = 0,num_sps_sent = 0 ;
for (int ch = 0;ch<rx_channel_count;++ch)
{
buf_rx_array.push_back(std::vector<SPTYPE>(sz_buffer_all + 1024*1024,0));
rx_buff_ptr.push_back(0);
}
for (int ch = 0;ch<tx_channel_count;++ch)
{
buf_tx_array.push_back(std::vector<SPTYPE>(sz_buffer_all + 1024*1024,0));
stdin_pos.push_back(0);
tx_buff_ptr.push_back(0);
}
std::mutex push_mtx;
std::mutex * pMtx = (i_wav_rx[0]+i_wav_rx[1]+i_txWaterMark)>0?&push_mtx:nullptr;
//reset time
uhd_usrp_set_time_now(usrp,0,0,0);
//Define RX Thread, Producer
......@@ -414,65 +499,6 @@ int do_iio(const cmdlineParser & args)
}
};
//Define StdIn Thread, Producer
std::function<void()> thread_stdin = [&]()->void
{
try{
//Read STDIN
while (!stop_signal_called) {
subject_package_header header;
std::vector<unsigned char> packagedta = pull_subject(&header);
if (!is_valid_header(header))
{
fprintf(stderr,"Recived BAD Command.");
fflush(stderr);
QThread::msleep(100);
continue;
}
if (packagedta.size())
{
if ( is_control_subject(header))
{
//收到命令进程退出的广播消息,退出
if (strstr((const char *)packagedta.data(),"function=quit;")!=nullptr)
{
fprintf(stderr,"Recived Quit Command.");
fflush(stderr);
stop_signal_called = true;
}
else
cmd_dealing(usrp,packagedta);
}
else
{
for (int ch = 0; ch < tx_channel_count;++ch)
{
if (header.subject_id == i_wav_tx[ch])
{
SPTYPE * buf_tx_ptr = buf_tx_array[ch].data();
quint64 sps_rev = packagedta.size()/2/sizeof(SPTYPE);
short * iq = (short *)(packagedta.data());
memcpy(&buf_tx_ptr[(stdin_pos[ch] * 2)% sz_buffer_all],
iq, sps_rev * sizeof(SPTYPE)*2
);
const quint64 bgnext = ((stdin_pos[ch] + sps_rev) * 2) % sz_buffer_all;
if (bgnext < sps_rev * 2 && bgnext>0)
{
memcpy(buf_tx_ptr, buf_tx_ptr + sz_buffer_all, bgnext * sizeof(SPTYPE) );
}
stdin_pos[ch] += sps_rev;
}
}
}
}
}
}
catch (std::string er)
{
fputs(er.c_str(),stderr);
stop_signal_called = true;
}
};
//Define thread_tx, Consumer
std::function<void()> thread_tx = [&]()->void
......@@ -542,7 +568,6 @@ int do_iio(const cmdlineParser & args)
uhd_io_thread th_wav_rx(thread_rx,0);
uhd_io_thread th_wav_stdout(thread_stdout,0);
uhd_io_thread th_wav_stdin(thread_stdin,0);
uhd_io_thread th_wav_tx(thread_tx,0);
if (rx_on)
......@@ -554,7 +579,6 @@ int do_iio(const cmdlineParser & args)
{
th_wav_tx.start(QThread::HighestPriority);
}
th_wav_stdin.start(QThread::HighestPriority);
while (!stop_signal_called)
{
......@@ -572,7 +596,6 @@ int do_iio(const cmdlineParser & args)
th_wav_rx.wait();
th_wav_stdout.wait();
th_wav_tx.wait();
th_wav_stdin.wait();
}
catch(std::string s)
{
......@@ -592,6 +615,8 @@ int do_iio(const cmdlineParser & args)
}
uhd_usrp_free(&usrp);
th_wav_stdin.wait();
fprintf(stderr, (return_code ? "Failure\n" : "Success\n"));
fflush(stderr);
return return_code;
......@@ -599,6 +624,8 @@ int do_iio(const cmdlineParser & args)
void cmd_dealing(uhd_usrp_handle usrp,std::vector<unsigned char> & cmd)
{
if (!usrp)
return;
std::map<std::string,std::string> mcmd = TASKBUS::ctrlpackage_to_map(cmd);
if (mcmd["function"]=="handle_set")
{
......
......@@ -138,6 +138,7 @@ int do_iio(const cmdlineParser & args)
const quint32 i_rx_tm = args.toInt("rx_time",0);
const quint32 i_tx_tm = args.toInt("tx_time",0);
const quint32 i_txWaterMark = args.toInt("tx_mark",0);
//设备句柄
uhd_usrp_handle usrp = nullptr;
uhd_rx_streamer_handle rx_streamer = nullptr;
......@@ -146,6 +147,129 @@ int do_iio(const cmdlineParser & args)
uhd_tx_metadata_handle tx_meta = nullptr;
//发射缓存要提前配置,以便及时反馈水位
//Time plan for tx
std::list<tag_tx_plain> tx_plan;
std::mutex mtx_plan;
//ring buffer In IQ Points, TX io
const quint64 sz_buffer_tx = 256 * 1024 * 1024;
std::vector<std::vector<SPTYPE> > buf_tx_array;
std::vector<quint64> tx_pos;
std::vector<quint64> stdin_pos;
std::vector<void *> tx_buff_ptr;
size_t num_sps_sent = 0;
for (int ch = 0;ch<tx_channel_count;++ch)
{
buf_tx_array.push_back(std::vector<SPTYPE>(sz_buffer_tx + 1024*1024*32,0));
stdin_pos.push_back(0);
tx_pos.push_back(0);
tx_buff_ptr.push_back(0);
}
std::mutex push_mtx;
std::mutex * pMtx = (i_wav_rx[0]+i_wav_rx[1]+i_txWaterMark)>0?&push_mtx:nullptr;
qint64 total_points_left = -1;
//Define StdIn Thread, Producer
std::function<void()> thread_stdin = [&]()->void
{
try{
//Read STDIN
while (!stop_signal_called)
{
subject_package_header header;
std::vector<unsigned char> packagedta = pull_subject(&header);
if (!is_valid_header(header))
{
fprintf(stderr,"Recived BAD Command.");
fflush(stderr);
QThread::msleep(100);
continue;
}
if (packagedta.size())
{
if ( is_control_subject(header))
{
//收到命令进程退出的广播消息,退出
if (strstr((const char *)packagedta.data(),"function=quit;")!=nullptr)
{
fprintf(stderr,"Recived Quit Command.");
fflush(stderr);
stop_signal_called = true;
}
else
cmd_dealing(usrp,packagedta);
}
else
{
if (header.subject_id==i_tx_tm)
{
if (packagedta.size()==sizeof(tag_tx_plain))
{
tag_tx_plain * plan = (tag_tx_plain *) (packagedta.data());
if (total_points_left<=0)
total_points_left = plan->length_left;
fprintf(stderr,"plan %d points\n",plan->length_left);
mtx_plan.lock();
tx_plan.push_back(*plan);
mtx_plan.unlock();
}
else
fprintf(stderr,"Error Timeplan subject size %d recieved. Expected should be %d.\n",
(int)packagedta.size(),(int)sizeof(tag_tx_plain));
}
else
{
fprintf(stderr,"recv %d points\n",packagedta.size()/2/sizeof(SPTYPE));
for (int ch = 0; ch < tx_channel_count;++ch)
{
if (header.subject_id == i_wav_tx[ch])
{
SPTYPE * buf_tx_ptr = buf_tx_array[ch].data();
quint64 sps_rev = packagedta.size()/2/sizeof(SPTYPE);
short * iq = (short *)(packagedta.data());
memcpy(&buf_tx_ptr[(stdin_pos[ch] * 2)% sz_buffer_tx],
iq, sps_rev * sizeof(SPTYPE)*2
);
const quint64 bgnext = ((stdin_pos[ch] + sps_rev) * 2) % sz_buffer_tx;
if (bgnext < sps_rev * 2 && bgnext>0)
{
memcpy(buf_tx_ptr, buf_tx_ptr + sz_buffer_tx, bgnext * sizeof(SPTYPE) );
}
stdin_pos[ch] += sps_rev;
}//end if (header.subject_id == i_wav_tx[ch])
}//end for (int ch = 0; ch < tx_channel_count;++ch)
if (total_points_left<0&&i_txWaterMark>0)
{
qint64 watM = stdin_pos[0] - tx_pos[0];
if (tx_channel_count>1)
{
qint64 wat2 = stdin_pos[1] - tx_pos[1];
if (wat2 < watM)
watM = wat2;
}
TASKBUS::push_subject(
i_txWaterMark,
instance,
sizeof(qint64),
(unsigned char *) &watM,
pMtx);
}//end if (total_points_left<0)
}//end else if (header.subject_id==i_tx_tm)
}//end else if if ( is_control_subject(header))
}//end if (packagedta.size())
}//end while (!stop_signal_called)
}//end try
catch (std::string er)
{
fputs(er.c_str(),stderr);
stop_signal_called = true;
}
};//end std::function<void()> thread_stdin = [&]()->void
uhd_io_thread th_wav_stdin(thread_stdin,0);
th_wav_stdin.start(QThread::HighestPriority);
try{
fprintf(stderr, "Creating USRP with args \"%s\"...\n", dev_args.c_str());
UHD_DO(uhd_usrp_make(&usrp, dev_args.c_str()));
......@@ -373,27 +497,6 @@ int do_iio(const cmdlineParser & args)
buf_rx_list[buf_ct].ch_ptr.push_back((void *)buf_rx_list[buf_ct].ch_data[ch].data());
}
//Time plan for tx
std::list<tag_tx_plain> tx_plan;
std::mutex mtx_plan;
//ring buffer In IQ Points, TX io
const quint64 sz_buffer_tx = 256 * 1024 * 1024;
std::vector<std::vector<SPTYPE> > buf_tx_array;
std::vector<quint64> tx_pos;
std::vector<quint64> stdin_pos;
std::vector<void *> tx_buff_ptr;
size_t num_sps_sent = 0;
for (int ch = 0;ch<tx_channel_count;++ch)
{
buf_tx_array.push_back(std::vector<SPTYPE>(sz_buffer_tx + 1024*1024*32,0));
stdin_pos.push_back(0);
tx_pos.push_back(0);
tx_buff_ptr.push_back(0);
}
std::mutex push_mtx;
std::mutex * pMtx = (i_wav_rx[0]+i_wav_rx[1]+i_txWaterMark)>0?&push_mtx:nullptr;
//reset time
uhd_usrp_set_time_now(usrp,0,0,0);
//Define RX Thread, Producer
......@@ -496,89 +599,6 @@ int do_iio(const cmdlineParser & args)
}
};//end std::function<void()> thread_stdout = [&]()->void
qint64 total_points_left = -1;
//Define StdIn Thread, Producer
std::function<void()> thread_stdin = [&]()->void
{
try{
//Read STDIN
while (!stop_signal_called)
{
subject_package_header header;
std::vector<unsigned char> packagedta = pull_subject(&header);
if (!is_valid_header(header))
{
fprintf(stderr,"Recived BAD Command.");
fflush(stderr);
QThread::msleep(100);
continue;
}
if (packagedta.size())
{
if ( is_control_subject(header))
{
//收到命令进程退出的广播消息,退出
if (strstr((const char *)packagedta.data(),"function=quit;")!=nullptr)
{
fprintf(stderr,"Recived Quit Command.");
fflush(stderr);
stop_signal_called = true;
}
else
cmd_dealing(usrp,packagedta);
}
else
{
if (header.subject_id==i_tx_tm)
{
if (packagedta.size()==sizeof(tag_tx_plain))
{
tag_tx_plain * plan = (tag_tx_plain *) (packagedta.data());
if (total_points_left<=0)
total_points_left = plan->length_left;
fprintf(stderr,"plan %d points\n",plan->length_left);
mtx_plan.lock();
tx_plan.push_back(*plan);
mtx_plan.unlock();
}
else
fprintf(stderr,"Error Timeplan subject size %d recieved. Expected should be %d.\n",
(int)packagedta.size(),(int)sizeof(tag_tx_plain));
}
else
{
fprintf(stderr,"recv %d points\n",packagedta.size()/2/sizeof(SPTYPE));
for (int ch = 0; ch < tx_channel_count;++ch)
{
if (header.subject_id == i_wav_tx[ch])
{
SPTYPE * buf_tx_ptr = buf_tx_array[ch].data();
quint64 sps_rev = packagedta.size()/2/sizeof(SPTYPE);
short * iq = (short *)(packagedta.data());
memcpy(&buf_tx_ptr[(stdin_pos[ch] * 2)% sz_buffer_tx],
iq, sps_rev * sizeof(SPTYPE)*2
);
const quint64 bgnext = ((stdin_pos[ch] + sps_rev) * 2) % sz_buffer_tx;
if (bgnext < sps_rev * 2 && bgnext>0)
{
memcpy(buf_tx_ptr, buf_tx_ptr + sz_buffer_tx, bgnext * sizeof(SPTYPE) );
}
stdin_pos[ch] += sps_rev;
}//end if (header.subject_id == i_wav_tx[ch])
}//end for (int ch = 0; ch < tx_channel_count;++ch)
}//end else if (header.subject_id==i_tx_tm)
}//end else if if ( is_control_subject(header))
}//end if (packagedta.size())
}//end while (!stop_signal_called)
}//end try
catch (std::string er)
{
fputs(er.c_str(),stderr);
stop_signal_called = true;
}
};//end std::function<void()> thread_stdin = [&]()->void
const int tx_frame = 10000;
//Define thread_tx, Consumer
std::function<void()> thread_tx = [&]()->void
......@@ -730,7 +750,6 @@ int do_iio(const cmdlineParser & args)
uhd_io_thread th_wav_rx(thread_rx,0);
uhd_io_thread th_wav_stdout(thread_stdout,0);
uhd_io_thread th_wav_stdin(thread_stdin,0);
uhd_io_thread th_wav_tx(thread_tx,0);
if (rx_on)
......@@ -742,7 +761,6 @@ int do_iio(const cmdlineParser & args)
{
th_wav_tx.start(QThread::HighestPriority);
}
th_wav_stdin.start(QThread::HighestPriority);
while (!stop_signal_called)
{
......@@ -760,7 +778,6 @@ int do_iio(const cmdlineParser & args)
th_wav_rx.wait();
th_wav_stdout.wait();
th_wav_tx.wait();
th_wav_stdin.wait();
}//end top try
catch(std::string s)
{
......@@ -780,6 +797,8 @@ int do_iio(const cmdlineParser & args)
}
uhd_usrp_free(&usrp);
th_wav_stdin.wait();
fprintf(stderr, (return_code ? "Failure\n" : "Success\n"));
fflush(stderr);
return return_code;
......@@ -787,6 +806,8 @@ int do_iio(const cmdlineParser & args)
void cmd_dealing(uhd_usrp_handle usrp,std::vector<unsigned char> & cmd)
{
if (!usrp)
return ;
std::map<std::string,std::string> mcmd = TASKBUS::ctrlpackage_to_map(cmd);
if (mcmd["function"]=="handle_set")
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册