diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index b10daa9c210ed1a39198a3cf1d0f59ab77be6bee..86db35b4123c87c3d3f6d546caa15e70d09884e4 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -287,22 +287,25 @@ static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) { } static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { - SMqRspObj* msg = (SMqRspObj*)res; - msg->resIter++; - if (msg->resIter < msg->rsp.blockNum) { - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(msg->rsp.blockData, msg->resIter); - if (msg->rsp.withSchema) { - SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(msg->rsp.blockSchema, msg->resIter); - setResSchemaInfo(&msg->resInfo, pSW->pSchema, pSW->nCols); - taosMemoryFreeClear(msg->resInfo.row); - taosMemoryFreeClear(msg->resInfo.pCol); - taosMemoryFreeClear(msg->resInfo.length); - taosMemoryFreeClear(msg->resInfo.convertBuf); - taosMemoryFreeClear(msg->resInfo.convertJson); + SMqRspObj* pRspObj = (SMqRspObj*)res; + pRspObj->resIter++; + + if (pRspObj->resIter < pRspObj->rsp.blockNum) { + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter); + if (pRspObj->rsp.withSchema) { + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter); + setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols); + taosMemoryFreeClear(pRspObj->resInfo.row); + taosMemoryFreeClear(pRspObj->resInfo.pCol); + taosMemoryFreeClear(pRspObj->resInfo.length); + taosMemoryFreeClear(pRspObj->resInfo.convertBuf); + taosMemoryFreeClear(pRspObj->resInfo.convertJson); } - setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4, false); - return &msg->resInfo; + + setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false); + return &pRspObj->resInfo; } + return NULL; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 50d1e78c895a88b16878f7c56f6237f28c16de27..7883f198a8594f04ef9bab47ed6a44aafadcd17e 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -25,6 +25,7 @@ #include "ttimer.h" #define EMPTY_BLOCK_POLL_IDLE_DURATION 100 +#define DEFAULT_AUTO_COMMIT_INTERVAL 5000 struct SMqMgmt { int8_t inited; @@ -220,7 +221,7 @@ tmq_conf_t* tmq_conf_new() { conf->withTbName = false; conf->autoCommit = true; - conf->autoCommitInterval = 5000; + conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL; conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST; conf->hbBgEnable = true; @@ -266,7 +267,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } if (strcasecmp(key, "auto.commit.interval.ms") == 0) { - conf->autoCommitInterval = atoi(value); + conf->autoCommitInterval = taosStr2int64(value); return TMQ_CONF_OK; } @@ -310,7 +311,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) { - conf->snapBatchSize = atoi(value); + conf->snapBatchSize = taosStr2int64(value); return TMQ_CONF_OK; } @@ -330,18 +331,22 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value conf->ip = taosStrdup(value); return TMQ_CONF_OK; } + if (strcasecmp(key, "td.connect.user") == 0) { conf->user = taosStrdup(value); return TMQ_CONF_OK; } + if (strcasecmp(key, "td.connect.pass") == 0) { conf->pass = taosStrdup(value); return TMQ_CONF_OK; } + if (strcasecmp(key, "td.connect.port") == 0) { - conf->port = atoi(value); + conf->port = taosStr2int64(value); return TMQ_CONF_OK; } + if (strcasecmp(key, "td.connect.db") == 0) { return TMQ_CONF_OK; } @@ -463,8 +468,8 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN pOffset->subKey[groupLen] = TMQ_SEPARATOR; strcpy(pOffset->subKey + groupLen + 1, pTopicName); - int32_t len; - int32_t code; + int32_t len = 0; + int32_t code = 0; tEncodeSize(tEncodeSTqOffset, pOffset, len, code); if (code < 0) { return -1; @@ -624,7 +629,7 @@ FAIL: return 0; } -static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb, +static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb, void* userParam) { int32_t code = -1; @@ -717,31 +722,29 @@ static int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, if (msg) { // user invoked commit return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam); } else { // this for auto commit - return tmqCommitConsumerImpl(tmq, automatic, async, userCb, userParam); + return doAutoCommit(tmq, automatic, async, userCb, userParam); } } -void tmqAssignAskEpTask(void* param, void* tmrId) { - int64_t refId = *(int64_t*)param; - tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); +static void generateTimedTask(int64_t refId, int32_t type) { + tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq != NULL) { int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0); - *pTaskType = TMQ_DELAYED_TASK__ASK_EP; + *pTaskType = type; taosWriteQitem(tmq->delayedTask, pTaskType); tsem_post(&tmq->rspSem); } +} + +void tmqAssignAskEpTask(void* param, void* tmrId) { + int64_t refId = *(int64_t*)param; + generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP); taosMemoryFree(param); } void tmqAssignDelayedCommitTask(void* param, void* tmrId) { int64_t refId = *(int64_t*)param; - tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); - if (tmq != NULL) { - int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0); - *pTaskType = TMQ_DELAYED_TASK__COMMIT; - taosWriteQitem(tmq->delayedTask, pTaskType); - tsem_post(&tmq->rspSem); - } + generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT); taosMemoryFree(param); } @@ -1579,14 +1582,17 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); pRspObj->resType = RES_TYPE__TMQ; + tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); + pRspObj->vgId = pWrapper->vgHandle->vgId; pRspObj->resIter = -1; memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp)); pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; + if (!pWrapper->dataRsp.withSchema) { setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); } @@ -1943,7 +1949,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { return NULL; } - if (timeout != -1) { + if (timeout >= 0) { int64_t currentTime = taosGetTimestampMs(); int64_t elapsedTime = currentTime - startTime; if (elapsedTime > timeout) {