diff --git a/modules/uhd/uhd_usrp_continous/uhd_io_continous.cpp b/modules/uhd/uhd_usrp_continous/uhd_io_continous.cpp index 219b104b4922c51bfdfb447dc6fd1f7b9c790639..a46af04fa31ebf63fbb61029bedbcb815239ba2a 100644 --- a/modules/uhd/uhd_usrp_continous/uhd_io_continous.cpp +++ b/modules/uhd/uhd_usrp_continous/uhd_io_continous.cpp @@ -97,6 +97,8 @@ int do_iio(const cmdlineParser & args) const quint32 i_wav_rx[2] = {args.toUInt("wav_rx0",0),args.toUInt("wav_rx1",0)}; const quint32 i_wav_tx[2] = {args.toUInt("wav_tx0",0),args.toUInt("wav_tx1",0)}; const quint32 i_txWaterMark = args.toInt("tx_mark",0); + + //设备句柄 uhd_usrp_handle usrp = nullptr; uhd_rx_streamer_handle rx_streamer = nullptr; @@ -104,6 +106,114 @@ int do_iio(const cmdlineParser & args) uhd_tx_streamer_handle tx_streamer = nullptr; uhd_tx_metadata_handle tx_meta = nullptr; + //ring buffer in short inters + //初始化环状缓存器 + const quint64 sz_buffer_all = 128 * 1024 * 1024; + std::vector > buf_rx_array; + std::vector > buf_tx_array; + //In IQ Samples, RF io + quint64 rx_pos = 0, tx_pos = 0,stdout_pos = 0; + //In IQ Samples, Std IO + std::vector stdin_pos; + + std::vector rx_buff_ptr, tx_buff_ptr; + size_t num_rx_samps = 0,num_sps_sent = 0 ; + + for (int ch = 0;ch(sz_buffer_all + 1024*1024,0)); + rx_buff_ptr.push_back(0); + } + for (int ch = 0;ch(sz_buffer_all + 1024*1024,0)); + stdin_pos.push_back(0); + tx_buff_ptr.push_back(0); + } + std::mutex push_mtx; + std::mutex * pMtx = (i_wav_rx[0]+i_wav_rx[1]+i_txWaterMark)>0?&push_mtx:nullptr; + + //要及时启动stdin,以便接收和反馈水位 + //Define StdIn Thread, Producer + std::function thread_stdin = [&]()->void + { + try{ + //Read STDIN + while (!stop_signal_called) { + subject_package_header header; + std::vector packagedta = pull_subject(&header); + if (!is_valid_header(header)) + { + fprintf(stderr,"Recived BAD Command."); + fflush(stderr); + QThread::msleep(100); + continue; + } + if (packagedta.size()) + { + if ( is_control_subject(header)) + { + //收到命令进程退出的广播消息,退出 + if (strstr((const char *)packagedta.data(),"function=quit;")!=nullptr) + { + fprintf(stderr,"Recived Quit Command."); + fflush(stderr); + stop_signal_called = true; + } + else + cmd_dealing(usrp,packagedta); + } + else + { + for (int ch = 0; ch < tx_channel_count;++ch) + { + if (header.subject_id == i_wav_tx[ch]) + { + SPTYPE * buf_tx_ptr = buf_tx_array[ch].data(); + quint64 sps_rev = packagedta.size()/2/sizeof(SPTYPE); + short * iq = (short *)(packagedta.data()); + memcpy(&buf_tx_ptr[(stdin_pos[ch] * 2)% sz_buffer_all], + iq, sps_rev * sizeof(SPTYPE)*2 + ); + const quint64 bgnext = ((stdin_pos[ch] + sps_rev) * 2) % sz_buffer_all; + if (bgnext < sps_rev * 2 && bgnext>0) + { + memcpy(buf_tx_ptr, buf_tx_ptr + sz_buffer_all, bgnext * sizeof(SPTYPE) ); + } + stdin_pos[ch] += sps_rev; + } + } + if (i_txWaterMark>0) + { + qint64 watM = stdin_pos[0] - tx_pos; + if (tx_channel_count>1) + { + qint64 wat2 = stdin_pos[1] - tx_pos; + if (wat2 < watM) + watM = wat2; + } + TASKBUS::push_subject( + i_txWaterMark, + instance, + sizeof(qint64), + (unsigned char *) &watM, + pMtx); + + } + } + } + } + } + catch (std::string er) + { + fputs(er.c_str(),stderr); + stop_signal_called = true; + } + }; + + uhd_io_thread th_wav_stdin(thread_stdin,0); + th_wav_stdin.start(QThread::HighestPriority); + try{ fprintf(stderr, "Creating USRP with args \"%s\"...\n", dev_args.c_str()); @@ -307,31 +417,6 @@ int do_iio(const cmdlineParser & args) fflush(stderr); } - //ring buffer in short inters - const quint64 sz_buffer_all = 8 * 1024 * 1024; - std::vector > buf_rx_array; - std::vector > buf_tx_array; - //In IQ Samples, RF io - quint64 rx_pos = 0, tx_pos = 0,stdout_pos = 0; - //In IQ Samples, Std IO - std::vector stdin_pos; - - std::vector rx_buff_ptr, tx_buff_ptr; - size_t num_rx_samps = 0,num_sps_sent = 0 ; - - for (int ch = 0;ch(sz_buffer_all + 1024*1024,0)); - rx_buff_ptr.push_back(0); - } - for (int ch = 0;ch(sz_buffer_all + 1024*1024,0)); - stdin_pos.push_back(0); - tx_buff_ptr.push_back(0); - } - std::mutex push_mtx; - std::mutex * pMtx = (i_wav_rx[0]+i_wav_rx[1]+i_txWaterMark)>0?&push_mtx:nullptr; //reset time uhd_usrp_set_time_now(usrp,0,0,0); //Define RX Thread, Producer @@ -414,65 +499,6 @@ int do_iio(const cmdlineParser & args) } }; - //Define StdIn Thread, Producer - std::function thread_stdin = [&]()->void - { - try{ - //Read STDIN - while (!stop_signal_called) { - subject_package_header header; - std::vector packagedta = pull_subject(&header); - if (!is_valid_header(header)) - { - fprintf(stderr,"Recived BAD Command."); - fflush(stderr); - QThread::msleep(100); - continue; - } - if (packagedta.size()) - { - if ( is_control_subject(header)) - { - //收到命令进程退出的广播消息,退出 - if (strstr((const char *)packagedta.data(),"function=quit;")!=nullptr) - { - fprintf(stderr,"Recived Quit Command."); - fflush(stderr); - stop_signal_called = true; - } - else - cmd_dealing(usrp,packagedta); - } - else - { - for (int ch = 0; ch < tx_channel_count;++ch) - { - if (header.subject_id == i_wav_tx[ch]) - { - SPTYPE * buf_tx_ptr = buf_tx_array[ch].data(); - quint64 sps_rev = packagedta.size()/2/sizeof(SPTYPE); - short * iq = (short *)(packagedta.data()); - memcpy(&buf_tx_ptr[(stdin_pos[ch] * 2)% sz_buffer_all], - iq, sps_rev * sizeof(SPTYPE)*2 - ); - const quint64 bgnext = ((stdin_pos[ch] + sps_rev) * 2) % sz_buffer_all; - if (bgnext < sps_rev * 2 && bgnext>0) - { - memcpy(buf_tx_ptr, buf_tx_ptr + sz_buffer_all, bgnext * sizeof(SPTYPE) ); - } - stdin_pos[ch] += sps_rev; - } - } - } - } - } - } - catch (std::string er) - { - fputs(er.c_str(),stderr); - stop_signal_called = true; - } - }; //Define thread_tx, Consumer std::function thread_tx = [&]()->void @@ -542,7 +568,6 @@ int do_iio(const cmdlineParser & args) uhd_io_thread th_wav_rx(thread_rx,0); uhd_io_thread th_wav_stdout(thread_stdout,0); - uhd_io_thread th_wav_stdin(thread_stdin,0); uhd_io_thread th_wav_tx(thread_tx,0); if (rx_on) @@ -554,7 +579,6 @@ int do_iio(const cmdlineParser & args) { th_wav_tx.start(QThread::HighestPriority); } - th_wav_stdin.start(QThread::HighestPriority); while (!stop_signal_called) { @@ -572,7 +596,6 @@ int do_iio(const cmdlineParser & args) th_wav_rx.wait(); th_wav_stdout.wait(); th_wav_tx.wait(); - th_wav_stdin.wait(); } catch(std::string s) { @@ -592,6 +615,8 @@ int do_iio(const cmdlineParser & args) } uhd_usrp_free(&usrp); + th_wav_stdin.wait(); + fprintf(stderr, (return_code ? "Failure\n" : "Success\n")); fflush(stderr); return return_code; @@ -599,6 +624,8 @@ int do_iio(const cmdlineParser & args) void cmd_dealing(uhd_usrp_handle usrp,std::vector & cmd) { + if (!usrp) + return; std::map mcmd = TASKBUS::ctrlpackage_to_map(cmd); if (mcmd["function"]=="handle_set") { diff --git a/modules/uhd/uhd_usrp_io/uhd_io.cpp b/modules/uhd/uhd_usrp_io/uhd_io.cpp index 9215fc50a73bbfa850257a9389fcaa9e5c6f8df7..0dd104c37a73e43285b7f4443dff78b185b4e610 100644 --- a/modules/uhd/uhd_usrp_io/uhd_io.cpp +++ b/modules/uhd/uhd_usrp_io/uhd_io.cpp @@ -138,6 +138,7 @@ int do_iio(const cmdlineParser & args) const quint32 i_rx_tm = args.toInt("rx_time",0); const quint32 i_tx_tm = args.toInt("tx_time",0); const quint32 i_txWaterMark = args.toInt("tx_mark",0); + //设备句柄 uhd_usrp_handle usrp = nullptr; uhd_rx_streamer_handle rx_streamer = nullptr; @@ -146,6 +147,129 @@ int do_iio(const cmdlineParser & args) uhd_tx_metadata_handle tx_meta = nullptr; + //发射缓存要提前配置,以便及时反馈水位 + //Time plan for tx + std::list tx_plan; + std::mutex mtx_plan; + + //ring buffer In IQ Points, TX io + const quint64 sz_buffer_tx = 256 * 1024 * 1024; + std::vector > buf_tx_array; + std::vector tx_pos; + std::vector stdin_pos; + std::vector tx_buff_ptr; + size_t num_sps_sent = 0; + for (int ch = 0;ch(sz_buffer_tx + 1024*1024*32,0)); + stdin_pos.push_back(0); + tx_pos.push_back(0); + tx_buff_ptr.push_back(0); + } + std::mutex push_mtx; + std::mutex * pMtx = (i_wav_rx[0]+i_wav_rx[1]+i_txWaterMark)>0?&push_mtx:nullptr; + + qint64 total_points_left = -1; + //Define StdIn Thread, Producer + std::function thread_stdin = [&]()->void + { + try{ + //Read STDIN + while (!stop_signal_called) + { + subject_package_header header; + std::vector packagedta = pull_subject(&header); + if (!is_valid_header(header)) + { + fprintf(stderr,"Recived BAD Command."); + fflush(stderr); + QThread::msleep(100); + continue; + } + if (packagedta.size()) + { + if ( is_control_subject(header)) + { + //收到命令进程退出的广播消息,退出 + if (strstr((const char *)packagedta.data(),"function=quit;")!=nullptr) + { + fprintf(stderr,"Recived Quit Command."); + fflush(stderr); + stop_signal_called = true; + } + else + cmd_dealing(usrp,packagedta); + } + else + { + if (header.subject_id==i_tx_tm) + { + if (packagedta.size()==sizeof(tag_tx_plain)) + { + tag_tx_plain * plan = (tag_tx_plain *) (packagedta.data()); + if (total_points_left<=0) + total_points_left = plan->length_left; + fprintf(stderr,"plan %d points\n",plan->length_left); + mtx_plan.lock(); + tx_plan.push_back(*plan); + mtx_plan.unlock(); + } + else + fprintf(stderr,"Error Timeplan subject size %d recieved. Expected should be %d.\n", + (int)packagedta.size(),(int)sizeof(tag_tx_plain)); + + } + else + { + fprintf(stderr,"recv %d points\n",packagedta.size()/2/sizeof(SPTYPE)); + for (int ch = 0; ch < tx_channel_count;++ch) + { + if (header.subject_id == i_wav_tx[ch]) + { + SPTYPE * buf_tx_ptr = buf_tx_array[ch].data(); + quint64 sps_rev = packagedta.size()/2/sizeof(SPTYPE); + short * iq = (short *)(packagedta.data()); + memcpy(&buf_tx_ptr[(stdin_pos[ch] * 2)% sz_buffer_tx], + iq, sps_rev * sizeof(SPTYPE)*2 + ); + const quint64 bgnext = ((stdin_pos[ch] + sps_rev) * 2) % sz_buffer_tx; + if (bgnext < sps_rev * 2 && bgnext>0) + { + memcpy(buf_tx_ptr, buf_tx_ptr + sz_buffer_tx, bgnext * sizeof(SPTYPE) ); + } + stdin_pos[ch] += sps_rev; + }//end if (header.subject_id == i_wav_tx[ch]) + }//end for (int ch = 0; ch < tx_channel_count;++ch) + if (total_points_left<0&&i_txWaterMark>0) + { + qint64 watM = stdin_pos[0] - tx_pos[0]; + if (tx_channel_count>1) + { + qint64 wat2 = stdin_pos[1] - tx_pos[1]; + if (wat2 < watM) + watM = wat2; + } + TASKBUS::push_subject( + i_txWaterMark, + instance, + sizeof(qint64), + (unsigned char *) &watM, + pMtx); + }//end if (total_points_left<0) + }//end else if (header.subject_id==i_tx_tm) + }//end else if if ( is_control_subject(header)) + }//end if (packagedta.size()) + }//end while (!stop_signal_called) + }//end try + catch (std::string er) + { + fputs(er.c_str(),stderr); + stop_signal_called = true; + } + };//end std::function thread_stdin = [&]()->void + uhd_io_thread th_wav_stdin(thread_stdin,0); + th_wav_stdin.start(QThread::HighestPriority); + try{ fprintf(stderr, "Creating USRP with args \"%s\"...\n", dev_args.c_str()); UHD_DO(uhd_usrp_make(&usrp, dev_args.c_str())); @@ -373,27 +497,6 @@ int do_iio(const cmdlineParser & args) buf_rx_list[buf_ct].ch_ptr.push_back((void *)buf_rx_list[buf_ct].ch_data[ch].data()); } - //Time plan for tx - std::list tx_plan; - std::mutex mtx_plan; - - //ring buffer In IQ Points, TX io - const quint64 sz_buffer_tx = 256 * 1024 * 1024; - std::vector > buf_tx_array; - std::vector tx_pos; - std::vector stdin_pos; - std::vector tx_buff_ptr; - size_t num_sps_sent = 0; - for (int ch = 0;ch(sz_buffer_tx + 1024*1024*32,0)); - stdin_pos.push_back(0); - tx_pos.push_back(0); - tx_buff_ptr.push_back(0); - } - std::mutex push_mtx; - std::mutex * pMtx = (i_wav_rx[0]+i_wav_rx[1]+i_txWaterMark)>0?&push_mtx:nullptr; - //reset time uhd_usrp_set_time_now(usrp,0,0,0); //Define RX Thread, Producer @@ -496,89 +599,6 @@ int do_iio(const cmdlineParser & args) } };//end std::function thread_stdout = [&]()->void - qint64 total_points_left = -1; - //Define StdIn Thread, Producer - std::function thread_stdin = [&]()->void - { - try{ - //Read STDIN - while (!stop_signal_called) - { - subject_package_header header; - std::vector packagedta = pull_subject(&header); - if (!is_valid_header(header)) - { - fprintf(stderr,"Recived BAD Command."); - fflush(stderr); - QThread::msleep(100); - continue; - } - if (packagedta.size()) - { - if ( is_control_subject(header)) - { - //收到命令进程退出的广播消息,退出 - if (strstr((const char *)packagedta.data(),"function=quit;")!=nullptr) - { - fprintf(stderr,"Recived Quit Command."); - fflush(stderr); - stop_signal_called = true; - } - else - cmd_dealing(usrp,packagedta); - } - else - { - if (header.subject_id==i_tx_tm) - { - if (packagedta.size()==sizeof(tag_tx_plain)) - { - tag_tx_plain * plan = (tag_tx_plain *) (packagedta.data()); - if (total_points_left<=0) - total_points_left = plan->length_left; - fprintf(stderr,"plan %d points\n",plan->length_left); - mtx_plan.lock(); - tx_plan.push_back(*plan); - mtx_plan.unlock(); - } - else - fprintf(stderr,"Error Timeplan subject size %d recieved. Expected should be %d.\n", - (int)packagedta.size(),(int)sizeof(tag_tx_plain)); - - } - else - { - fprintf(stderr,"recv %d points\n",packagedta.size()/2/sizeof(SPTYPE)); - for (int ch = 0; ch < tx_channel_count;++ch) - { - if (header.subject_id == i_wav_tx[ch]) - { - SPTYPE * buf_tx_ptr = buf_tx_array[ch].data(); - quint64 sps_rev = packagedta.size()/2/sizeof(SPTYPE); - short * iq = (short *)(packagedta.data()); - memcpy(&buf_tx_ptr[(stdin_pos[ch] * 2)% sz_buffer_tx], - iq, sps_rev * sizeof(SPTYPE)*2 - ); - const quint64 bgnext = ((stdin_pos[ch] + sps_rev) * 2) % sz_buffer_tx; - if (bgnext < sps_rev * 2 && bgnext>0) - { - memcpy(buf_tx_ptr, buf_tx_ptr + sz_buffer_tx, bgnext * sizeof(SPTYPE) ); - } - stdin_pos[ch] += sps_rev; - }//end if (header.subject_id == i_wav_tx[ch]) - }//end for (int ch = 0; ch < tx_channel_count;++ch) - }//end else if (header.subject_id==i_tx_tm) - }//end else if if ( is_control_subject(header)) - }//end if (packagedta.size()) - }//end while (!stop_signal_called) - }//end try - catch (std::string er) - { - fputs(er.c_str(),stderr); - stop_signal_called = true; - } - };//end std::function thread_stdin = [&]()->void - const int tx_frame = 10000; //Define thread_tx, Consumer std::function thread_tx = [&]()->void @@ -730,7 +750,6 @@ int do_iio(const cmdlineParser & args) uhd_io_thread th_wav_rx(thread_rx,0); uhd_io_thread th_wav_stdout(thread_stdout,0); - uhd_io_thread th_wav_stdin(thread_stdin,0); uhd_io_thread th_wav_tx(thread_tx,0); if (rx_on) @@ -742,7 +761,6 @@ int do_iio(const cmdlineParser & args) { th_wav_tx.start(QThread::HighestPriority); } - th_wav_stdin.start(QThread::HighestPriority); while (!stop_signal_called) { @@ -760,7 +778,6 @@ int do_iio(const cmdlineParser & args) th_wav_rx.wait(); th_wav_stdout.wait(); th_wav_tx.wait(); - th_wav_stdin.wait(); }//end top try catch(std::string s) { @@ -780,6 +797,8 @@ int do_iio(const cmdlineParser & args) } uhd_usrp_free(&usrp); + th_wav_stdin.wait(); + fprintf(stderr, (return_code ? "Failure\n" : "Success\n")); fflush(stderr); return return_code; @@ -787,6 +806,8 @@ int do_iio(const cmdlineParser & args) void cmd_dealing(uhd_usrp_handle usrp,std::vector & cmd) { + if (!usrp) + return ; std::map mcmd = TASKBUS::ctrlpackage_to_map(cmd); if (mcmd["function"]=="handle_set") {