提交 e9f2fba0 编写于 作者: weixin_43103506's avatar weixin_43103506

add perf test revise

上级 90550120
......@@ -99,6 +99,7 @@ typedef struct _daq_msg
void *meta[DAQ_MSG_META_SLOTS]; /* Dynamic message metadata slots */
DAQ_ModuleInstance_h owner; /* Handle for the module instance this message belongs to */
void *priv; /* Pointer to module instance's private data for this message (Optional) */
void *priv_mbuf;
size_t hdr_len; /* Length of the header structure pointed to by 'hdr' */
DAQ_MsgType type; /* Message type (one of DAQ_MsgType or from the user-defined range) */
uint32_t data_len; /* Length of the data pointed to by 'data'. Should be 0 if 'data' is NULL */
......@@ -340,7 +341,7 @@ typedef struct _daq_stats
uint64_t packets_filtered; /* Packets filtered by this instance's BPF */
uint64_t packets_injected; /* Packets injected by this instance */
uint64_t verdicts[MAX_DAQ_VERDICT]; /* Counters of packets handled per-verdict. */
} DAQ_Stats_t;
} __attribute__((aligned(64)))DAQ_Stats_t;
typedef struct _daq_msg_pool_info
{
......
......@@ -22,6 +22,8 @@
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#define _GNU_SOURCE /* See feature_test_macros(7) */
#include <sched.h>
#include <arpa/inet.h>
#include <errno.h>
......@@ -45,6 +47,7 @@
#include <daq_module_api.h>
#include "decode.h"
#define DPDK_TEST (1)
typedef enum
{
......@@ -112,7 +115,8 @@ typedef struct _DAQTestThreadContext
void *oldconfig;
volatile bool done;
volatile bool exited;
} DAQTestThreadContext;
int thread_id;
} __attribute__((aligned(64)))DAQTestThreadContext;
typedef struct _DAQTestPacket
{
......@@ -755,8 +759,9 @@ static DAQ_Verdict handle_packet_message(DAQTestThreadContext *ctxt, DAQ_Msg_h m
ctxt->packet_count++;
if (cfg->delay)
{
usleep(cfg->delay * 1000);
}
if (cfg->performance_mode)
return cfg->default_verdict;
......@@ -1400,6 +1405,22 @@ static void print_config(DAQTestConfig *cfg)
printf(" In performance mode, no decoding will be done!\n");
}
static inline int set_cpu(int i)
{
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(i,&mask);
printf("thread %u, i = %d\n", pthread_self(), i);
if(-1 == pthread_setaffinity_np(pthread_self() ,sizeof(mask),&mask))
{
printf("error set cpu!\n");
return -1;
}
return 0;
}
static void *processing_thread(void *arg)
{
DAQTestThreadContext *ctxt = (DAQTestThreadContext *) arg;
......@@ -1436,6 +1457,15 @@ static void *processing_thread(void *arg)
memset(recv_counters, 0, sizeof(recv_counters));
max_recv = recv_cnt = timeout_count = 0;
unsigned batch_size = cfg->batch_size;
DAQ_RecvStatus rstat;
if ( set_cpu(ctxt->thread_id) )
goto exit;
while (!ctxt->done && (!cfg->packet_limit || ctxt->packet_count < cfg->packet_limit))
{
/* Check to see if a config swap is pending. */
......@@ -1449,24 +1479,20 @@ static void *processing_thread(void *arg)
ctxt->oldconfig = oldconfig;
}
unsigned batch_size = cfg->batch_size;
if (cfg->packet_limit)
{
{
unsigned long remainder = cfg->packet_limit - ctxt->packet_count;
if (cfg->batch_size > remainder)
batch_size = remainder;
}
DAQ_RecvStatus rstat;
unsigned num_recv = daq_instance_msg_receive(ctxt->instance, batch_size, ctxt->msgs, &rstat);
recv_counters[rstat]++;
if (num_recv > max_recv)
max_recv = num_recv;
if (num_recv > 0)
recv_cnt++;
for (unsigned idx = 0; idx < num_recv; idx++)
{
DAQ_Msg_h msg = ctxt->msgs[idx];
......@@ -1485,7 +1511,6 @@ static void *processing_thread(void *arg)
}
daq_instance_msg_finalize(ctxt->instance, msg, verdict);
}
if (rstat != DAQ_RSTAT_OK && rstat != DAQ_RSTAT_WOULD_BLOCK)
{
if (rstat == DAQ_RSTAT_TIMEOUT)
......@@ -1501,7 +1526,7 @@ static void *processing_thread(void *arg)
else if (rstat == DAQ_RSTAT_ERROR)
fprintf(stderr, "Error receiving messages: %s\n", daq_instance_get_error(ctxt->instance));
break;
}
}
}
printf("\nDAQ receive timed out %u times.\n", timeout_count);
......@@ -1710,7 +1735,9 @@ int main(int argc, char *argv[])
pthread_sigmask(SIG_BLOCK, &set, NULL);
for (unsigned i = 0; i < cfg.thread_count; i++)
{
DAQTestThreadContext *dttc = &threads[i];
dttc->thread_id = i;
if ((rval = pthread_create(&dttc->tid, NULL, processing_thread, dttc)) != 0)
{
fprintf(stderr, "Error creating thread: %s (%d)\n", strerror(errno), errno);
......
[EAL]
-l=10-15
-l=0-9
-w=0000:08:00.0
#-w=0000:08:00.1
[PORT-0]
queue-num=63
queue-num=10
rss-tuple=3
jumbo=no
mtu=1500
[PORT-1]
queue-num=63
rss-tuple=3
jumbo=no
mtu=1500
#[PORT-1]
#queue-num=4
#rss-tuple=3
#jumbo=no
#mtu=1500
......@@ -24,13 +24,13 @@
#include "dpdk_port_conf.h"
#include "dpdk_param.h"
#define INJECT_BUF_NUM (128)
#define INJECT_BUF_NUM (1024*4)
#define POOL_NAME_LEN (64)
#define BURST_SIZE (32)
#define DESC_POOL_NUM (2048)
#define SET_ERROR(modinst, ...) daq_base_api.set_errbuf(modinst, __VA_ARGS__)
//#define HIGH_PERF_ENABLE (1)
#define HIGH_PERF_ENABLE (1)
#define DAQ_DPDK_VERSION 1915
#define MEMPOOL_CACHE_SIZE (64)
typedef struct _dpdk_packet_pkt_desc
......@@ -68,7 +68,17 @@ typedef struct _dpdk_packet_context
struct rte_mempool *inject_mbuf_pool;
volatile uint8_t interrupted;
DAQ_Stats_t stats;
} DPDK_Packet_Context_t;
}__attribute__((aligned(64))) DPDK_Packet_Context_t;
static const DAQ_Verdict verdict_translation_table[MAX_DAQ_VERDICT] = {
DAQ_VERDICT_PASS, /* DAQ_VERDICT_PASS */
DAQ_VERDICT_BLOCK, /* DAQ_VERDICT_BLOCK */
DAQ_VERDICT_PASS, /* DAQ_VERDICT_REPLACE */
DAQ_VERDICT_PASS, /* DAQ_VERDICT_WHITELIST */
DAQ_VERDICT_BLOCK, /* DAQ_VERDICT_BLACKLIST */
DAQ_VERDICT_PASS, /* DAQ_VERDICT_IGNORE */
DAQ_VERDICT_BLOCK /* DAQ_VERDICT_RETRY */
};
static DAQ_BaseAPI_t daq_base_api;
static pthread_mutex_t bpf_mutex = PTHREAD_MUTEX_INITIALIZER;
......@@ -229,7 +239,7 @@ static int dpdk_daq_instantiate(const DAQ_ModuleConfig_h modcfg, DAQ_ModuleInsta
dpdk_pctx->debug = 1;
daq_base_api.config_next_variable(modcfg, &varKey, &varValue);
}
dpdk_pctx->stats.packets_received = 0;
dpdk_pctx->snaplen = daq_base_api.config_get_snaplen(modcfg);
dpdk_pctx->timeout = (int) daq_base_api.config_get_timeout(modcfg);
if (dpdk_pctx->timeout == 0)
......@@ -246,6 +256,7 @@ static int dpdk_daq_instantiate(const DAQ_ModuleConfig_h modcfg, DAQ_ModuleInsta
dpdk_get_port_and_queue(&dpdk_pctx->port_id,&dpdk_pctx->queue_id);
dpdk_pctx->modinst = modinst;
*ctxt_ptr = dpdk_pctx;
return rval;
......@@ -321,8 +332,15 @@ static int dpdk_daq_start(void *handle)
return DAQ_SUCCESS;
}
static int dpdk_inject_packet(DPDK_Packet_Context_t *dpdk_pctx, uint16_t out_port_id, uint16_t out_queue_id,const uint8_t *data, uint32_t data_len)
static int dpdk_inject_packet(DPDK_Packet_Context_t *dpdk_pctx, DAQ_Msg_t *msg,uint16_t out_port_id, uint16_t out_queue_id,const uint8_t *data, uint32_t data_len)
{
#ifdef HIGH_PERF_ENABLE
struct rte_mbuf *mbuf = (struct rte_mbuf *)msg->priv_mbuf;
rte_pktmbuf_data_len(mbuf) = data_len;
//rte_pktmbuf_dump(stdout,mbuf,mbuf->pkt_len);
uint16_t nb_tx = rte_eth_tx_burst(out_port_id, out_queue_id, &mbuf, 1);
#else
struct rte_mbuf *m;
m = rte_pktmbuf_alloc(dpdk_pctx->inject_mbuf_pool);
......@@ -344,6 +362,7 @@ static int dpdk_inject_packet(DPDK_Packet_Context_t *dpdk_pctx, uint16_t out_por
}
rte_pktmbuf_free(m);
#endif
dpdk_pctx->stats.packets_injected++;
return DAQ_SUCCESS;
......@@ -356,7 +375,7 @@ static int dpdk_daq_inject(void *handle, DAQ_MsgType type, const void *hdr, cons
if (type != DAQ_MSG_TYPE_PACKET)
return DAQ_ERROR_NOTSUP;
return dpdk_inject_packet(dpdk_pctx,dpdk_pctx->port_id,dpdk_pctx->queue_id,data,data_len);
return dpdk_inject_packet(dpdk_pctx,NULL,dpdk_pctx->port_id,dpdk_pctx->queue_id,data,data_len);
}
static int dpdk_daq_inject_relative(void *handle, const DAQ_Msg_t *msg, const uint8_t *data, uint32_t data_len, int reverse)
......@@ -366,7 +385,7 @@ static int dpdk_daq_inject_relative(void *handle, const DAQ_Msg_t *msg, const ui
if (reverse)
reverse_port = dpdk_pctx->port_id % 2 ? (dpdk_pctx->port_id - 1):(dpdk_pctx->port_id + 1);
return dpdk_inject_packet(dpdk_pctx,reverse_port,dpdk_pctx->queue_id,data,data_len);
return dpdk_inject_packet(dpdk_pctx,msg,reverse_port,dpdk_pctx->queue_id,data,data_len);
}
static int dpdk_daq_interrupt(void *handle)
......@@ -470,6 +489,7 @@ static unsigned dpdk_daq_msg_receive(void *handle, const unsigned max_recv, cons
max_recv_ok = BURST_SIZE;
uint16_t nb_rx = rte_eth_rx_burst(dpdk_pctx->port_id, dpdk_pctx->queue_id, bufs, max_recv_ok);
dpdk_pctx->stats.packets_received += nb_rx;
for (loop = 0; loop < nb_rx; loop++)
{
......@@ -478,7 +498,7 @@ static unsigned dpdk_daq_msg_receive(void *handle, const unsigned max_recv, cons
len = rte_pktmbuf_data_len(bufs[loop]);
//rte_pktmbuf_dump(stdout,bufs[loop],bufs[loop]->pkt_len);
#if 0
#ifdef LIBPCAP_AVAILABLE
if (dpdk_pctx->fcode.bf_insns && bpf_filter(dpdk_pctx->fcode.bf_insns, data, len, len) == 0)
{
......@@ -487,11 +507,12 @@ static unsigned dpdk_daq_msg_receive(void *handle, const unsigned max_recv, cons
continue;
}
#endif
dpdk_pctx->stats.packets_received++;
#endif
DPDKPacketPktDesc *desc = dpdk_pctx->pool.freelist;
if (!desc)
{
printf("1111 addr:%p availabel:%d\n",desc,dpdk_pctx->pool.info.available);
rte_pktmbuf_free(bufs[loop]);
status = DAQ_RSTAT_NOBUF;
break;
......@@ -507,12 +528,11 @@ static unsigned dpdk_daq_msg_receive(void *handle, const unsigned max_recv, cons
#ifdef HIGH_PERF_ENABLE
msg->data = data;
msg->priv = bufs[loop];
msg->priv_mbuf = bufs[loop];
#else
rte_memcpy(desc->data, data, len);
rte_pktmbuf_free(bufs[loop]);
#endif
/* Then, set up the DAQ packet header. */
DAQ_PktHdr_t *pkthdr = &desc->pkthdr;
pkthdr->ts.tv_sec = 0;
......@@ -527,7 +547,9 @@ static unsigned dpdk_daq_msg_receive(void *handle, const unsigned max_recv, cons
desc->next = NULL;
dpdk_pctx->pool.info.available--;
msgs[idx] = &desc->msg;
idx++;
idx++;
//rte_pktmbuf_dump(stdout,bufs[loop],bufs[loop]->pkt_len);
//uint16_t nb_tx = rte_eth_tx_burst(dpdk_pctx->port_id, dpdk_pctx->queue_id, &bufs[loop], 1);
}
#if 0
if (!nb_rx && (dpdk_pctx->timeout != -1 ))
......@@ -549,16 +571,6 @@ err:
return idx;
}
static const DAQ_Verdict verdict_translation_table[MAX_DAQ_VERDICT] = {
DAQ_VERDICT_PASS, /* DAQ_VERDICT_PASS */
DAQ_VERDICT_BLOCK, /* DAQ_VERDICT_BLOCK */
DAQ_VERDICT_PASS, /* DAQ_VERDICT_REPLACE */
DAQ_VERDICT_PASS, /* DAQ_VERDICT_WHITELIST */
DAQ_VERDICT_BLOCK, /* DAQ_VERDICT_BLACKLIST */
DAQ_VERDICT_PASS, /* DAQ_VERDICT_IGNORE */
DAQ_VERDICT_BLOCK /* DAQ_VERDICT_RETRY */
};
static int dpdk_daq_msg_finalize(void *handle, const DAQ_Msg_t *msg, DAQ_Verdict verdict)
{
DPDK_Packet_Context_t *dpdk_pctx = (DPDK_Packet_Context_t *) handle;
......@@ -572,15 +584,22 @@ static int dpdk_daq_msg_finalize(void *handle, const DAQ_Msg_t *msg, DAQ_Verdict
dpdk_pctx->stats.verdicts[verdict]++;
verdict = verdict_translation_table[verdict];
if (verdict == DAQ_VERDICT_PASS)
dpdk_inject_packet(dpdk_pctx,dpdk_pctx->port_id,dpdk_pctx->queue_id,msg->data,msg->data_len);
{
dpdk_daq_inject_relative(dpdk_pctx,msg,msg->data,msg->data_len,0);
}
else
{
#ifdef HIGH_PERF_ENABLE
struct rte_mbuf *mbuf = (struct rte_mbuf *)msg->priv_mbuf;
//rte_pktmbuf_dump(stdout,mbuf,mbuf->pkt_len);
//uint16_t nb_tx = rte_eth_tx_burst(dpdk_pctx->port_id, dpdk_pctx->queue_id, &mbuf, 1);
rte_pktmbuf_free(mbuf);
#endif
}
desc->next = dpdk_pctx->pool.freelist;
dpdk_pctx->pool.freelist = desc;
dpdk_pctx->pool.info.available++;
#ifdef HIGH_PERF_ENABLE
rte_pktmbuf_free(msg->priv);
#endif
return DAQ_SUCCESS;
}
......
......@@ -21,8 +21,8 @@
#define MIN(v1, v2) ((v1) < (v2) ? (v1) : (v2))
#endif
#define RX_RING_SIZE (4096)
#define TX_RING_SIZE (512)
#define RX_RING_SIZE (1024)
#define TX_RING_SIZE (1024)
#define RSS_HASH_KEY_LENGTH 40
#define PKT_PRIV_SIZE (256)
static uint32_t port_queue_num[RTE_MAX_ETHPORTS] = {0};
......@@ -117,7 +117,7 @@ static inline void dpdk_port_init(dpdk_port_conf_t *dpdk_port_conf,uint16_t port
.rss_conf = {
.rss_key = g_arrayHashKey,
.rss_key_len = RSS_HASH_KEY_LENGTH,
.rss_hf = ETH_RSS_UDP|ETH_RSS_TCP,
.rss_hf = ETH_RSS_IP,
},
},
.txmode = {
......
......@@ -4,7 +4,7 @@
#include "stdint.h"
//can get from dpdk param
#define NB_MBUF (1024*1024)
#define NB_MBUF (32*8192)
#define NB_SOCKETS (8)
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册