提交 cf33a822 编写于 作者: L Liu Jicong

fix type convert

上级 285dc217
...@@ -163,12 +163,13 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -163,12 +163,13 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
printf("subscribe err\n"); printf("subscribe err\n");
return; return;
} }
/*int32_t cnt = 0;*/ int32_t cnt = 0;
/*clock_t startTime = clock();*/ /*clock_t startTime = clock();*/
while (running) { while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) { if (tmqmessage) {
/*cnt++;*/ cnt++;
printf("get data\n");
msg_process(tmqmessage); msg_process(tmqmessage);
tmq_message_destroy(tmqmessage); tmq_message_destroy(tmqmessage);
/*} else {*/ /*} else {*/
......
...@@ -194,14 +194,13 @@ enum { ...@@ -194,14 +194,13 @@ enum {
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) #define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) #define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
struct tmq_message_t { typedef struct SMqRspObj {
int8_t resType; int8_t resType;
SMqPollRsp msg; char* topic;
char* topic; void* vg;
void* vg; SArray* res; // SArray<SReqResultInfo>
SArray* res; // SArray<SReqResultInfo> int32_t resIter;
int32_t resIter; } SMqRspObj;
};
typedef struct SRequestObj { typedef struct SRequestObj {
int8_t resType; // query or tmq int8_t resType; // query or tmq
...@@ -222,13 +221,13 @@ typedef struct SRequestObj { ...@@ -222,13 +221,13 @@ typedef struct SRequestObj {
} SRequestObj; } SRequestObj;
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) { static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
tmq_message_t* msg = (tmq_message_t*)res; SMqRspObj* msg = (SMqRspObj*)res;
int32_t resIter = msg->resIter == -1 ? 0 : msg->resIter; int32_t resIter = msg->resIter == -1 ? 0 : msg->resIter;
return (SReqResultInfo*)taosArrayGet(msg->res, resIter); return (SReqResultInfo*)taosArrayGet(msg->res, resIter);
} }
static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res) { static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res) {
tmq_message_t* msg = (tmq_message_t*)res; SMqRspObj* msg = (SMqRspObj*)res;
if (++msg->resIter < taosArrayGetSize(msg->res)) { if (++msg->resIter < taosArrayGetSize(msg->res)) {
return (SReqResultInfo*)taosArrayGet(msg->res, msg->resIter); return (SReqResultInfo*)taosArrayGet(msg->res, msg->resIter);
} }
......
...@@ -171,7 +171,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { ...@@ -171,7 +171,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return doFetchRow(pRequest, true, true); return doFetchRow(pRequest, true, true);
} else if (TD_RES_TMQ(res)) { } else if (TD_RES_TMQ(res)) {
tmq_message_t *msg = ((tmq_message_t *)res); SMqRspObj *msg = ((SMqRspObj *)res);
SReqResultInfo *pResultInfo = taosArrayGet(msg->res, msg->resIter); SReqResultInfo *pResultInfo = taosArrayGet(msg->res, msg->resIter);
doSetOneRowPtr(pResultInfo); doSetOneRowPtr(pResultInfo);
......
...@@ -24,6 +24,14 @@ ...@@ -24,6 +24,14 @@
#include "tqueue.h" #include "tqueue.h"
#include "tref.h" #include "tref.h"
struct tmq_message_t {
SMqPollRsp msg;
char* topic;
void* vg;
SArray* res; // SArray<SReqResultInfo>
int32_t resIter;
};
struct tmq_list_t { struct tmq_list_t {
SArray container; SArray container;
}; };
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册