diff --git a/example/src/tmq.c b/example/src/tmq.c index efb4d1830eb81e36f5066dacdff5d954ea2793b7..ca80c8fe5a12fcf11d8c90088ec399d47b708756 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -163,12 +163,13 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { printf("subscribe err\n"); return; } - /*int32_t cnt = 0;*/ + int32_t cnt = 0; /*clock_t startTime = clock();*/ while (running) { tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); if (tmqmessage) { - /*cnt++;*/ + cnt++; + printf("get data\n"); msg_process(tmqmessage); tmq_message_destroy(tmqmessage); /*} else {*/ diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 604e6a49aea94d5674ad79850a9add76708cc00a..185b5824d956908aed375bd5efd629da21892a29 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -194,14 +194,13 @@ enum { #define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) #define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) -struct tmq_message_t { - int8_t resType; - SMqPollRsp msg; - char* topic; - void* vg; - SArray* res; // SArray - int32_t resIter; -}; +typedef struct SMqRspObj { + int8_t resType; + char* topic; + void* vg; + SArray* res; // SArray + int32_t resIter; +} SMqRspObj; typedef struct SRequestObj { int8_t resType; // query or tmq @@ -222,13 +221,13 @@ typedef struct SRequestObj { } SRequestObj; static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) { - tmq_message_t* msg = (tmq_message_t*)res; - int32_t resIter = msg->resIter == -1 ? 0 : msg->resIter; + SMqRspObj* msg = (SMqRspObj*)res; + int32_t resIter = msg->resIter == -1 ? 0 : msg->resIter; return (SReqResultInfo*)taosArrayGet(msg->res, resIter); } static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res) { - tmq_message_t* msg = (tmq_message_t*)res; + SMqRspObj* msg = (SMqRspObj*)res; if (++msg->resIter < taosArrayGetSize(msg->res)) { return (SReqResultInfo*)taosArrayGet(msg->res, msg->resIter); } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 080f666cdd4b81793c5c02d1096102a640e7a9ec..040ddde63099be572f3d3facf2b947cb52124235 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -171,7 +171,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return doFetchRow(pRequest, true, true); } else if (TD_RES_TMQ(res)) { - tmq_message_t *msg = ((tmq_message_t *)res); + SMqRspObj *msg = ((SMqRspObj *)res); SReqResultInfo *pResultInfo = taosArrayGet(msg->res, msg->resIter); doSetOneRowPtr(pResultInfo); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index b3e0cc6fd9b6080d4ee5cea773e0a72c40ea21c0..dbe78782f5dd66284167a89fb731d944ab519fa0 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -24,6 +24,14 @@ #include "tqueue.h" #include "tref.h" +struct tmq_message_t { + SMqPollRsp msg; + char* topic; + void* vg; + SArray* res; // SArray + int32_t resIter; +}; + struct tmq_list_t { SArray container; };