未验证 提交 86e3d838 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20495 from taosdata/fix/liaohj

fix(tmq): adjust the time out value check.
...@@ -287,22 +287,25 @@ static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) { ...@@ -287,22 +287,25 @@ static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
} }
static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
SMqRspObj* msg = (SMqRspObj*)res; SMqRspObj* pRspObj = (SMqRspObj*)res;
msg->resIter++; pRspObj->resIter++;
if (msg->resIter < msg->rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(msg->rsp.blockData, msg->resIter); if (pRspObj->resIter < pRspObj->rsp.blockNum) {
if (msg->rsp.withSchema) { SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(msg->rsp.blockSchema, msg->resIter); if (pRspObj->rsp.withSchema) {
setResSchemaInfo(&msg->resInfo, pSW->pSchema, pSW->nCols); SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
taosMemoryFreeClear(msg->resInfo.row); setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
taosMemoryFreeClear(msg->resInfo.pCol); taosMemoryFreeClear(pRspObj->resInfo.row);
taosMemoryFreeClear(msg->resInfo.length); taosMemoryFreeClear(pRspObj->resInfo.pCol);
taosMemoryFreeClear(msg->resInfo.convertBuf); taosMemoryFreeClear(pRspObj->resInfo.length);
taosMemoryFreeClear(msg->resInfo.convertJson); taosMemoryFreeClear(pRspObj->resInfo.convertBuf);
taosMemoryFreeClear(pRspObj->resInfo.convertJson);
} }
setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4, false);
return &msg->resInfo; setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false);
return &pRspObj->resInfo;
} }
return NULL; return NULL;
} }
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "ttimer.h" #include "ttimer.h"
#define EMPTY_BLOCK_POLL_IDLE_DURATION 100 #define EMPTY_BLOCK_POLL_IDLE_DURATION 100
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
struct SMqMgmt { struct SMqMgmt {
int8_t inited; int8_t inited;
...@@ -220,7 +221,7 @@ tmq_conf_t* tmq_conf_new() { ...@@ -220,7 +221,7 @@ tmq_conf_t* tmq_conf_new() {
conf->withTbName = false; conf->withTbName = false;
conf->autoCommit = true; conf->autoCommit = true;
conf->autoCommitInterval = 5000; conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST; conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST;
conf->hbBgEnable = true; conf->hbBgEnable = true;
...@@ -266,7 +267,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value ...@@ -266,7 +267,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
} }
if (strcasecmp(key, "auto.commit.interval.ms") == 0) { if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
conf->autoCommitInterval = atoi(value); conf->autoCommitInterval = taosStr2int64(value);
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
...@@ -310,7 +311,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value ...@@ -310,7 +311,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
} }
if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) { if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
conf->snapBatchSize = atoi(value); conf->snapBatchSize = taosStr2int64(value);
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
...@@ -330,18 +331,22 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value ...@@ -330,18 +331,22 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
conf->ip = taosStrdup(value); conf->ip = taosStrdup(value);
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
if (strcasecmp(key, "td.connect.user") == 0) { if (strcasecmp(key, "td.connect.user") == 0) {
conf->user = taosStrdup(value); conf->user = taosStrdup(value);
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
if (strcasecmp(key, "td.connect.pass") == 0) { if (strcasecmp(key, "td.connect.pass") == 0) {
conf->pass = taosStrdup(value); conf->pass = taosStrdup(value);
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
if (strcasecmp(key, "td.connect.port") == 0) { if (strcasecmp(key, "td.connect.port") == 0) {
conf->port = atoi(value); conf->port = taosStr2int64(value);
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
if (strcasecmp(key, "td.connect.db") == 0) { if (strcasecmp(key, "td.connect.db") == 0) {
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
...@@ -463,8 +468,8 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN ...@@ -463,8 +468,8 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
pOffset->subKey[groupLen] = TMQ_SEPARATOR; pOffset->subKey[groupLen] = TMQ_SEPARATOR;
strcpy(pOffset->subKey + groupLen + 1, pTopicName); strcpy(pOffset->subKey + groupLen + 1, pTopicName);
int32_t len; int32_t len = 0;
int32_t code; int32_t code = 0;
tEncodeSize(tEncodeSTqOffset, pOffset, len, code); tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
if (code < 0) { if (code < 0) {
return -1; return -1;
...@@ -624,7 +629,7 @@ FAIL: ...@@ -624,7 +629,7 @@ FAIL:
return 0; return 0;
} }
static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb, static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
void* userParam) { void* userParam) {
int32_t code = -1; int32_t code = -1;
...@@ -717,31 +722,29 @@ static int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, ...@@ -717,31 +722,29 @@ static int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic,
if (msg) { // user invoked commit if (msg) { // user invoked commit
return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam); return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
} else { // this for auto commit } else { // this for auto commit
return tmqCommitConsumerImpl(tmq, automatic, async, userCb, userParam); return doAutoCommit(tmq, automatic, async, userCb, userParam);
} }
} }
void tmqAssignAskEpTask(void* param, void* tmrId) { static void generateTimedTask(int64_t refId, int32_t type) {
int64_t refId = *(int64_t*)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq != NULL) { if (tmq != NULL) {
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0); int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
*pTaskType = TMQ_DELAYED_TASK__ASK_EP; *pTaskType = type;
taosWriteQitem(tmq->delayedTask, pTaskType); taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
} }
}
void tmqAssignAskEpTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param;
generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
taosMemoryFree(param); taosMemoryFree(param);
} }
void tmqAssignDelayedCommitTask(void* param, void* tmrId) { void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param; int64_t refId = *(int64_t*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
if (tmq != NULL) {
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
*pTaskType = TMQ_DELAYED_TASK__COMMIT;
taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem);
}
taosMemoryFree(param); taosMemoryFree(param);
} }
...@@ -1579,14 +1582,17 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { ...@@ -1579,14 +1582,17 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
pRspObj->resType = RES_TYPE__TMQ; pRspObj->resType = RES_TYPE__TMQ;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId; pRspObj->vgId = pWrapper->vgHandle->vgId;
pRspObj->resIter = -1; pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp)); memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
if (!pWrapper->dataRsp.withSchema) { if (!pWrapper->dataRsp.withSchema) {
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
} }
...@@ -1943,7 +1949,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -1943,7 +1949,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
return NULL; return NULL;
} }
if (timeout != -1) { if (timeout >= 0) {
int64_t currentTime = taosGetTimestampMs(); int64_t currentTime = taosGetTimestampMs();
int64_t elapsedTime = currentTime - startTime; int64_t elapsedTime = currentTime - startTime;
if (elapsedTime > timeout) { if (elapsedTime > timeout) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册