提交 08ac1021 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 989ae86a
......@@ -287,22 +287,25 @@ static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
}
static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
SMqRspObj* msg = (SMqRspObj*)res;
msg->resIter++;
if (msg->resIter < msg->rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(msg->rsp.blockData, msg->resIter);
if (msg->rsp.withSchema) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(msg->rsp.blockSchema, msg->resIter);
setResSchemaInfo(&msg->resInfo, pSW->pSchema, pSW->nCols);
taosMemoryFreeClear(msg->resInfo.row);
taosMemoryFreeClear(msg->resInfo.pCol);
taosMemoryFreeClear(msg->resInfo.length);
taosMemoryFreeClear(msg->resInfo.convertBuf);
taosMemoryFreeClear(msg->resInfo.convertJson);
SMqRspObj* pRspObj = (SMqRspObj*)res;
pRspObj->resIter++;
if (pRspObj->resIter < pRspObj->rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
if (pRspObj->rsp.withSchema) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
taosMemoryFreeClear(pRspObj->resInfo.row);
taosMemoryFreeClear(pRspObj->resInfo.pCol);
taosMemoryFreeClear(pRspObj->resInfo.length);
taosMemoryFreeClear(pRspObj->resInfo.convertBuf);
taosMemoryFreeClear(pRspObj->resInfo.convertJson);
}
setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4, false);
return &msg->resInfo;
setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false);
return &pRspObj->resInfo;
}
return NULL;
}
......
......@@ -25,6 +25,7 @@
#include "ttimer.h"
#define EMPTY_BLOCK_POLL_IDLE_DURATION 100
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
struct SMqMgmt {
int8_t inited;
......@@ -220,7 +221,7 @@ tmq_conf_t* tmq_conf_new() {
conf->withTbName = false;
conf->autoCommit = true;
conf->autoCommitInterval = 5000;
conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST;
conf->hbBgEnable = true;
......@@ -266,7 +267,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
conf->autoCommitInterval = atoi(value);
conf->autoCommitInterval = taosStr2int64(value);
return TMQ_CONF_OK;
}
......@@ -310,7 +311,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
conf->snapBatchSize = atoi(value);
conf->snapBatchSize = taosStr2int64(value);
return TMQ_CONF_OK;
}
......@@ -330,18 +331,22 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
conf->ip = taosStrdup(value);
return TMQ_CONF_OK;
}
if (strcasecmp(key, "td.connect.user") == 0) {
conf->user = taosStrdup(value);
return TMQ_CONF_OK;
}
if (strcasecmp(key, "td.connect.pass") == 0) {
conf->pass = taosStrdup(value);
return TMQ_CONF_OK;
}
if (strcasecmp(key, "td.connect.port") == 0) {
conf->port = atoi(value);
conf->port = taosStr2int64(value);
return TMQ_CONF_OK;
}
if (strcasecmp(key, "td.connect.db") == 0) {
return TMQ_CONF_OK;
}
......@@ -463,8 +468,8 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
pOffset->subKey[groupLen] = TMQ_SEPARATOR;
strcpy(pOffset->subKey + groupLen + 1, pTopicName);
int32_t len;
int32_t code;
int32_t len = 0;
int32_t code = 0;
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
if (code < 0) {
return -1;
......@@ -624,7 +629,7 @@ FAIL:
return 0;
}
static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
void* userParam) {
int32_t code = -1;
......@@ -717,31 +722,29 @@ static int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic,
if (msg) { // user invoked commit
return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
} else { // this for auto commit
return tmqCommitConsumerImpl(tmq, automatic, async, userCb, userParam);
return doAutoCommit(tmq, automatic, async, userCb, userParam);
}
}
void tmqAssignAskEpTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
static void generateTimedTask(int64_t refId, int32_t type) {
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq != NULL) {
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
*pTaskType = TMQ_DELAYED_TASK__ASK_EP;
*pTaskType = type;
taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem);
}
}
void tmqAssignAskEpTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param;
generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
taosMemoryFree(param);
}
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq != NULL) {
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
*pTaskType = TMQ_DELAYED_TASK__COMMIT;
taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem);
}
generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
taosMemoryFree(param);
}
......@@ -1579,14 +1582,17 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
pRspObj->resType = RES_TYPE__TMQ;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId;
pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
if (!pWrapper->dataRsp.withSchema) {
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
}
......@@ -1943,7 +1949,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
return NULL;
}
if (timeout > 0) {
if (timeout >= 0) {
int64_t currentTime = taosGetTimestampMs();
int64_t elapsedTime = currentTime - startTime;
if (elapsedTime > timeout) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册