From e9f2fba0cd4cac2fc94dc0ec574395ac15d20e68 Mon Sep 17 00:00:00 2001 From: kuangxiaohong <1002361031@qq.com> Date: Fri, 12 Mar 2021 09:51:56 +0800 Subject: [PATCH] add perf test revise --- api/daq_common.h | 3 +- example/daqtest.c | 47 +++++++++++++++++----- example/dpdk.cfg | 16 ++++---- modules/dpdk/daq_dpdk.c | 73 ++++++++++++++++++++++------------- modules/dpdk/dpdk_port_conf.c | 6 +-- modules/dpdk/dpdk_port_conf.h | 2 +- 6 files changed, 97 insertions(+), 50 deletions(-) mode change 100644 => 100755 api/daq_common.h mode change 100644 => 100755 example/daqtest.c diff --git a/api/daq_common.h b/api/daq_common.h old mode 100644 new mode 100755 index deac2b3..2b74305 --- a/api/daq_common.h +++ b/api/daq_common.h @@ -99,6 +99,7 @@ typedef struct _daq_msg void *meta[DAQ_MSG_META_SLOTS]; /* Dynamic message metadata slots */ DAQ_ModuleInstance_h owner; /* Handle for the module instance this message belongs to */ void *priv; /* Pointer to module instance's private data for this message (Optional) */ + void *priv_mbuf; size_t hdr_len; /* Length of the header structure pointed to by 'hdr' */ DAQ_MsgType type; /* Message type (one of DAQ_MsgType or from the user-defined range) */ uint32_t data_len; /* Length of the data pointed to by 'data'. Should be 0 if 'data' is NULL */ @@ -340,7 +341,7 @@ typedef struct _daq_stats uint64_t packets_filtered; /* Packets filtered by this instance's BPF */ uint64_t packets_injected; /* Packets injected by this instance */ uint64_t verdicts[MAX_DAQ_VERDICT]; /* Counters of packets handled per-verdict. */ -} DAQ_Stats_t; +} __attribute__((aligned(64)))DAQ_Stats_t; typedef struct _daq_msg_pool_info { diff --git a/example/daqtest.c b/example/daqtest.c old mode 100644 new mode 100755 index 81bf5e2..50ea1e1 --- a/example/daqtest.c +++ b/example/daqtest.c @@ -22,6 +22,8 @@ #ifdef HAVE_CONFIG_H #include "config.h" #endif +#define _GNU_SOURCE /* See feature_test_macros(7) */ +#include #include #include @@ -45,6 +47,7 @@ #include #include "decode.h" +#define DPDK_TEST (1) typedef enum { @@ -112,7 +115,8 @@ typedef struct _DAQTestThreadContext void *oldconfig; volatile bool done; volatile bool exited; -} DAQTestThreadContext; + int thread_id; +} __attribute__((aligned(64)))DAQTestThreadContext; typedef struct _DAQTestPacket { @@ -755,8 +759,9 @@ static DAQ_Verdict handle_packet_message(DAQTestThreadContext *ctxt, DAQ_Msg_h m ctxt->packet_count++; if (cfg->delay) + { usleep(cfg->delay * 1000); - + } if (cfg->performance_mode) return cfg->default_verdict; @@ -1400,6 +1405,22 @@ static void print_config(DAQTestConfig *cfg) printf(" In performance mode, no decoding will be done!\n"); } +static inline int set_cpu(int i) +{ + cpu_set_t mask; + CPU_ZERO(&mask); + + CPU_SET(i,&mask); + + printf("thread %u, i = %d\n", pthread_self(), i); + if(-1 == pthread_setaffinity_np(pthread_self() ,sizeof(mask),&mask)) + { + printf("error set cpu!\n"); + return -1; + } + return 0; +} + static void *processing_thread(void *arg) { DAQTestThreadContext *ctxt = (DAQTestThreadContext *) arg; @@ -1436,6 +1457,15 @@ static void *processing_thread(void *arg) memset(recv_counters, 0, sizeof(recv_counters)); max_recv = recv_cnt = timeout_count = 0; + + unsigned batch_size = cfg->batch_size; + + DAQ_RecvStatus rstat; + + if ( set_cpu(ctxt->thread_id) ) + goto exit; + + while (!ctxt->done && (!cfg->packet_limit || ctxt->packet_count < cfg->packet_limit)) { /* Check to see if a config swap is pending. */ @@ -1449,24 +1479,20 @@ static void *processing_thread(void *arg) ctxt->oldconfig = oldconfig; } - - unsigned batch_size = cfg->batch_size; if (cfg->packet_limit) - { + { unsigned long remainder = cfg->packet_limit - ctxt->packet_count; if (cfg->batch_size > remainder) batch_size = remainder; } - DAQ_RecvStatus rstat; unsigned num_recv = daq_instance_msg_receive(ctxt->instance, batch_size, ctxt->msgs, &rstat); recv_counters[rstat]++; + if (num_recv > max_recv) max_recv = num_recv; - if (num_recv > 0) recv_cnt++; - for (unsigned idx = 0; idx < num_recv; idx++) { DAQ_Msg_h msg = ctxt->msgs[idx]; @@ -1485,7 +1511,6 @@ static void *processing_thread(void *arg) } daq_instance_msg_finalize(ctxt->instance, msg, verdict); } - if (rstat != DAQ_RSTAT_OK && rstat != DAQ_RSTAT_WOULD_BLOCK) { if (rstat == DAQ_RSTAT_TIMEOUT) @@ -1501,7 +1526,7 @@ static void *processing_thread(void *arg) else if (rstat == DAQ_RSTAT_ERROR) fprintf(stderr, "Error receiving messages: %s\n", daq_instance_get_error(ctxt->instance)); break; - } + } } printf("\nDAQ receive timed out %u times.\n", timeout_count); @@ -1710,7 +1735,9 @@ int main(int argc, char *argv[]) pthread_sigmask(SIG_BLOCK, &set, NULL); for (unsigned i = 0; i < cfg.thread_count; i++) { + DAQTestThreadContext *dttc = &threads[i]; + dttc->thread_id = i; if ((rval = pthread_create(&dttc->tid, NULL, processing_thread, dttc)) != 0) { fprintf(stderr, "Error creating thread: %s (%d)\n", strerror(errno), errno); diff --git a/example/dpdk.cfg b/example/dpdk.cfg index 51415e8..fd5e8f5 100755 --- a/example/dpdk.cfg +++ b/example/dpdk.cfg @@ -1,18 +1,18 @@ [EAL] --l=10-15 +-l=0-9 -w=0000:08:00.0 - +#-w=0000:08:00.1 [PORT-0] -queue-num=63 +queue-num=10 rss-tuple=3 jumbo=no mtu=1500 -[PORT-1] -queue-num=63 -rss-tuple=3 -jumbo=no -mtu=1500 +#[PORT-1] +#queue-num=4 +#rss-tuple=3 +#jumbo=no +#mtu=1500 diff --git a/modules/dpdk/daq_dpdk.c b/modules/dpdk/daq_dpdk.c index 2059f40..b680d8c 100755 --- a/modules/dpdk/daq_dpdk.c +++ b/modules/dpdk/daq_dpdk.c @@ -24,13 +24,13 @@ #include "dpdk_port_conf.h" #include "dpdk_param.h" -#define INJECT_BUF_NUM (128) +#define INJECT_BUF_NUM (1024*4) #define POOL_NAME_LEN (64) #define BURST_SIZE (32) #define DESC_POOL_NUM (2048) #define SET_ERROR(modinst, ...) daq_base_api.set_errbuf(modinst, __VA_ARGS__) -//#define HIGH_PERF_ENABLE (1) +#define HIGH_PERF_ENABLE (1) #define DAQ_DPDK_VERSION 1915 #define MEMPOOL_CACHE_SIZE (64) typedef struct _dpdk_packet_pkt_desc @@ -68,7 +68,17 @@ typedef struct _dpdk_packet_context struct rte_mempool *inject_mbuf_pool; volatile uint8_t interrupted; DAQ_Stats_t stats; -} DPDK_Packet_Context_t; +}__attribute__((aligned(64))) DPDK_Packet_Context_t; + +static const DAQ_Verdict verdict_translation_table[MAX_DAQ_VERDICT] = { + DAQ_VERDICT_PASS, /* DAQ_VERDICT_PASS */ + DAQ_VERDICT_BLOCK, /* DAQ_VERDICT_BLOCK */ + DAQ_VERDICT_PASS, /* DAQ_VERDICT_REPLACE */ + DAQ_VERDICT_PASS, /* DAQ_VERDICT_WHITELIST */ + DAQ_VERDICT_BLOCK, /* DAQ_VERDICT_BLACKLIST */ + DAQ_VERDICT_PASS, /* DAQ_VERDICT_IGNORE */ + DAQ_VERDICT_BLOCK /* DAQ_VERDICT_RETRY */ +}; static DAQ_BaseAPI_t daq_base_api; static pthread_mutex_t bpf_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -229,7 +239,7 @@ static int dpdk_daq_instantiate(const DAQ_ModuleConfig_h modcfg, DAQ_ModuleInsta dpdk_pctx->debug = 1; daq_base_api.config_next_variable(modcfg, &varKey, &varValue); } - + dpdk_pctx->stats.packets_received = 0; dpdk_pctx->snaplen = daq_base_api.config_get_snaplen(modcfg); dpdk_pctx->timeout = (int) daq_base_api.config_get_timeout(modcfg); if (dpdk_pctx->timeout == 0) @@ -246,6 +256,7 @@ static int dpdk_daq_instantiate(const DAQ_ModuleConfig_h modcfg, DAQ_ModuleInsta dpdk_get_port_and_queue(&dpdk_pctx->port_id,&dpdk_pctx->queue_id); + dpdk_pctx->modinst = modinst; *ctxt_ptr = dpdk_pctx; return rval; @@ -321,8 +332,15 @@ static int dpdk_daq_start(void *handle) return DAQ_SUCCESS; } -static int dpdk_inject_packet(DPDK_Packet_Context_t *dpdk_pctx, uint16_t out_port_id, uint16_t out_queue_id,const uint8_t *data, uint32_t data_len) +static int dpdk_inject_packet(DPDK_Packet_Context_t *dpdk_pctx, DAQ_Msg_t *msg,uint16_t out_port_id, uint16_t out_queue_id,const uint8_t *data, uint32_t data_len) { +#ifdef HIGH_PERF_ENABLE + struct rte_mbuf *mbuf = (struct rte_mbuf *)msg->priv_mbuf; + rte_pktmbuf_data_len(mbuf) = data_len; + //rte_pktmbuf_dump(stdout,mbuf,mbuf->pkt_len); + uint16_t nb_tx = rte_eth_tx_burst(out_port_id, out_queue_id, &mbuf, 1); + +#else struct rte_mbuf *m; m = rte_pktmbuf_alloc(dpdk_pctx->inject_mbuf_pool); @@ -344,6 +362,7 @@ static int dpdk_inject_packet(DPDK_Packet_Context_t *dpdk_pctx, uint16_t out_por } rte_pktmbuf_free(m); +#endif dpdk_pctx->stats.packets_injected++; return DAQ_SUCCESS; @@ -356,7 +375,7 @@ static int dpdk_daq_inject(void *handle, DAQ_MsgType type, const void *hdr, cons if (type != DAQ_MSG_TYPE_PACKET) return DAQ_ERROR_NOTSUP; - return dpdk_inject_packet(dpdk_pctx,dpdk_pctx->port_id,dpdk_pctx->queue_id,data,data_len); + return dpdk_inject_packet(dpdk_pctx,NULL,dpdk_pctx->port_id,dpdk_pctx->queue_id,data,data_len); } static int dpdk_daq_inject_relative(void *handle, const DAQ_Msg_t *msg, const uint8_t *data, uint32_t data_len, int reverse) @@ -366,7 +385,7 @@ static int dpdk_daq_inject_relative(void *handle, const DAQ_Msg_t *msg, const ui if (reverse) reverse_port = dpdk_pctx->port_id % 2 ? (dpdk_pctx->port_id - 1):(dpdk_pctx->port_id + 1); - return dpdk_inject_packet(dpdk_pctx,reverse_port,dpdk_pctx->queue_id,data,data_len); + return dpdk_inject_packet(dpdk_pctx,msg,reverse_port,dpdk_pctx->queue_id,data,data_len); } static int dpdk_daq_interrupt(void *handle) @@ -470,6 +489,7 @@ static unsigned dpdk_daq_msg_receive(void *handle, const unsigned max_recv, cons max_recv_ok = BURST_SIZE; uint16_t nb_rx = rte_eth_rx_burst(dpdk_pctx->port_id, dpdk_pctx->queue_id, bufs, max_recv_ok); + dpdk_pctx->stats.packets_received += nb_rx; for (loop = 0; loop < nb_rx; loop++) { @@ -478,7 +498,7 @@ static unsigned dpdk_daq_msg_receive(void *handle, const unsigned max_recv, cons len = rte_pktmbuf_data_len(bufs[loop]); //rte_pktmbuf_dump(stdout,bufs[loop],bufs[loop]->pkt_len); - +#if 0 #ifdef LIBPCAP_AVAILABLE if (dpdk_pctx->fcode.bf_insns && bpf_filter(dpdk_pctx->fcode.bf_insns, data, len, len) == 0) { @@ -487,11 +507,12 @@ static unsigned dpdk_daq_msg_receive(void *handle, const unsigned max_recv, cons continue; } #endif - dpdk_pctx->stats.packets_received++; +#endif DPDKPacketPktDesc *desc = dpdk_pctx->pool.freelist; if (!desc) { + printf("1111 addr:%p availabel:%d\n",desc,dpdk_pctx->pool.info.available); rte_pktmbuf_free(bufs[loop]); status = DAQ_RSTAT_NOBUF; break; @@ -507,12 +528,11 @@ static unsigned dpdk_daq_msg_receive(void *handle, const unsigned max_recv, cons #ifdef HIGH_PERF_ENABLE msg->data = data; - msg->priv = bufs[loop]; + msg->priv_mbuf = bufs[loop]; #else rte_memcpy(desc->data, data, len); rte_pktmbuf_free(bufs[loop]); #endif - /* Then, set up the DAQ packet header. */ DAQ_PktHdr_t *pkthdr = &desc->pkthdr; pkthdr->ts.tv_sec = 0; @@ -527,7 +547,9 @@ static unsigned dpdk_daq_msg_receive(void *handle, const unsigned max_recv, cons desc->next = NULL; dpdk_pctx->pool.info.available--; msgs[idx] = &desc->msg; - idx++; + idx++; + //rte_pktmbuf_dump(stdout,bufs[loop],bufs[loop]->pkt_len); + //uint16_t nb_tx = rte_eth_tx_burst(dpdk_pctx->port_id, dpdk_pctx->queue_id, &bufs[loop], 1); } #if 0 if (!nb_rx && (dpdk_pctx->timeout != -1 )) @@ -549,16 +571,6 @@ err: return idx; } -static const DAQ_Verdict verdict_translation_table[MAX_DAQ_VERDICT] = { - DAQ_VERDICT_PASS, /* DAQ_VERDICT_PASS */ - DAQ_VERDICT_BLOCK, /* DAQ_VERDICT_BLOCK */ - DAQ_VERDICT_PASS, /* DAQ_VERDICT_REPLACE */ - DAQ_VERDICT_PASS, /* DAQ_VERDICT_WHITELIST */ - DAQ_VERDICT_BLOCK, /* DAQ_VERDICT_BLACKLIST */ - DAQ_VERDICT_PASS, /* DAQ_VERDICT_IGNORE */ - DAQ_VERDICT_BLOCK /* DAQ_VERDICT_RETRY */ -}; - static int dpdk_daq_msg_finalize(void *handle, const DAQ_Msg_t *msg, DAQ_Verdict verdict) { DPDK_Packet_Context_t *dpdk_pctx = (DPDK_Packet_Context_t *) handle; @@ -572,15 +584,22 @@ static int dpdk_daq_msg_finalize(void *handle, const DAQ_Msg_t *msg, DAQ_Verdict dpdk_pctx->stats.verdicts[verdict]++; verdict = verdict_translation_table[verdict]; if (verdict == DAQ_VERDICT_PASS) - dpdk_inject_packet(dpdk_pctx,dpdk_pctx->port_id,dpdk_pctx->queue_id,msg->data,msg->data_len); + { + dpdk_daq_inject_relative(dpdk_pctx,msg,msg->data,msg->data_len,0); + } + else + { +#ifdef HIGH_PERF_ENABLE + struct rte_mbuf *mbuf = (struct rte_mbuf *)msg->priv_mbuf; + //rte_pktmbuf_dump(stdout,mbuf,mbuf->pkt_len); + //uint16_t nb_tx = rte_eth_tx_burst(dpdk_pctx->port_id, dpdk_pctx->queue_id, &mbuf, 1); + rte_pktmbuf_free(mbuf); +#endif + } desc->next = dpdk_pctx->pool.freelist; dpdk_pctx->pool.freelist = desc; dpdk_pctx->pool.info.available++; - -#ifdef HIGH_PERF_ENABLE - rte_pktmbuf_free(msg->priv); -#endif return DAQ_SUCCESS; } diff --git a/modules/dpdk/dpdk_port_conf.c b/modules/dpdk/dpdk_port_conf.c index 1fc8f4e..49c7efc 100755 --- a/modules/dpdk/dpdk_port_conf.c +++ b/modules/dpdk/dpdk_port_conf.c @@ -21,8 +21,8 @@ #define MIN(v1, v2) ((v1) < (v2) ? (v1) : (v2)) #endif -#define RX_RING_SIZE (4096) -#define TX_RING_SIZE (512) +#define RX_RING_SIZE (1024) +#define TX_RING_SIZE (1024) #define RSS_HASH_KEY_LENGTH 40 #define PKT_PRIV_SIZE (256) static uint32_t port_queue_num[RTE_MAX_ETHPORTS] = {0}; @@ -117,7 +117,7 @@ static inline void dpdk_port_init(dpdk_port_conf_t *dpdk_port_conf,uint16_t port .rss_conf = { .rss_key = g_arrayHashKey, .rss_key_len = RSS_HASH_KEY_LENGTH, - .rss_hf = ETH_RSS_UDP|ETH_RSS_TCP, + .rss_hf = ETH_RSS_IP, }, }, .txmode = { diff --git a/modules/dpdk/dpdk_port_conf.h b/modules/dpdk/dpdk_port_conf.h index 0e8df95..8e58b42 100755 --- a/modules/dpdk/dpdk_port_conf.h +++ b/modules/dpdk/dpdk_port_conf.h @@ -4,7 +4,7 @@ #include "stdint.h" //can get from dpdk param -#define NB_MBUF (1024*1024) +#define NB_MBUF (32*8192) #define NB_SOCKETS (8) /** -- GitLab