未验证 提交 6ce945db 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20555 from taosdata/fix/liaohj

fix(tmq): diable commit when close conumer if auto commit is disabled.
......@@ -24,7 +24,7 @@
#include "tref.h"
#include "ttimer.h"
#define EMPTY_BLOCK_POLL_IDLE_DURATION 100
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
struct SMqMgmt {
......@@ -148,7 +148,8 @@ typedef struct {
typedef struct {
int8_t tmqRspType;
int32_t epoch;
int32_t epoch; // epoch can be used to guard the vgHandle
int32_t vgId;
SMqClientVg* vgHandle;
SMqClientTopic* topicHandle;
uint64_t reqId;
......@@ -1329,6 +1330,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
pRspWrapper->topicHandle = pTopic;
pRspWrapper->reqId = requestId;
pRspWrapper->pEpset = pMsg->pEpSet;
pRspWrapper->vgId = pVg->vgId;
pMsg->pEpSet = NULL;
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
......@@ -1747,7 +1749,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for (int j = 0; j < numOfVg; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 100ms before start next poll", tmq->consumerId, tmq->epoch,
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch,
pVg->vgId);
continue;
}
......@@ -1864,9 +1866,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return pRsp;
}
} else {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pVg->vgId, pDataRsp->head.epoch, consumerEpoch);
tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
}
......@@ -1886,7 +1887,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return pRsp;
} else {
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
}
......@@ -1933,7 +1934,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else {
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
}
......@@ -1955,7 +1956,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
void* rspObj;
int64_t startTime = taosGetTimestampMs();
tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64, tmq->consumerId, startTime);
tscDebug("consumer:0x%" PRIx64 " start to poll at %"PRId64", timeout:%" PRId64, tmq->consumerId, startTime, timeout);
#if 0
tmqHandleAllDelayedTask(tmq);
......@@ -2017,37 +2018,43 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
}
}
int32_t tmq_consumer_close(tmq_t* tmq) {
tscDebug("consumer:0x%" PRIx64" start to close consumer, status:%d", tmq->consumerId, tmq->status);
static void displayConsumeStatistics(const tmq_t* pTmq) {
int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
int32_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp != 0) {
return rsp;
tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
for (int32_t i = 0; i < numOfTopics; ++i) {
SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);
tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
for (int32_t j = 0; j < numOfVgs; ++j) {
SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
}
}
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
tmq->consumerId, tmq->pollCnt, tmq->totalRows, numOfTopics, tmq->epoch);
tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
}
tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", tmq->consumerId);
for (int32_t i = 0; i < numOfTopics; ++i) {
SMqClientTopic* pTopics = taosArrayGet(tmq->clientTopics, i);
int32_t tmq_consumer_close(tmq_t* tmq) {
tscDebug("consumer:0x%" PRIx64" start to close consumer, status:%d", tmq->consumerId, tmq->status);
displayConsumeStatistics(tmq);
tscDebug("consumer:0x%" PRIx64 " topic:%d", tmq->consumerId, i);
int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
for (int32_t j = 0; j < numOfVgs; ++j) {
SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
// if auto commit is set, commit before close consumer. Otherwise, do nothing.
if (tmq->autoCommit) {
int32_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp != 0) {
return rsp;
}
}
tscDebug("consumer:0x%" PRIx64 " rows dist end", tmq->consumerId);
int32_t retryCnt = 0;
tmq_list_t* lst = tmq_list_new();
while (1) {
rsp = tmq_subscribe(tmq, lst);
int32_t rsp = tmq_subscribe(tmq, lst);
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
break;
} else {
......@@ -2057,6 +2064,8 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
}
tmq_list_destroy(lst);
} else {
tscWarn("consumer:0x%" PRIx64" not in ready state, close it directly", tmq->consumerId);
}
taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
......
......@@ -141,7 +141,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
// tqRead
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId);
// tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp);
......
......@@ -309,70 +309,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
return 0;
}
//int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
//#if 0
// A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
// A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
//
// if (pRsp->withSchema) {
// A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
// } else {
// A(taosArrayGetSize(pRsp->blockSchema) == 0);
// }
//
// if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
// if (pRsp->blockNum > 0) {
// A(pRsp->rspOffset.version > pRsp->reqOffset.version);
// } else {
// A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
// }
// }
//#endif
//
// int32_t len = 0;
// int32_t code = 0;
// tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
// if (code < 0) {
// return -1;
// }
//
// int32_t tlen = sizeof(SMqRspHead) + len;
// void* buf = rpcMallocCont(tlen);
// if (buf == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
// return -1;
// }
//
// ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP;
// ((SMqRspHead*)buf)->epoch = pReq->epoch;
// ((SMqRspHead*)buf)->consumerId = pReq->consumerId;
//
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
//
// SEncoder encoder = {0};
// tEncoderInit(&encoder, abuf, len);
// tEncodeSTaosxRsp(&encoder, pRsp);
// tEncoderClear(&encoder);
//
// SRpcMsg rsp = {
// .info = pMsg->info,
// .pCont = buf,
// .contLen = tlen,
// .code = 0,
// };
//
// tmsgSendRsp(&rsp);
//
// char buf1[80] = {0};
// char buf2[80] = {0};
// tFormatOffset(buf1, 80, &pRsp->reqOffset);
// tFormatOffset(buf2, 80, &pRsp->rspOffset);
//
// tqDebug("taosx rsp, vgId:%d, consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s",
// TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
// return 0;
//}
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
pLeft->val.version <= pRight->val.version;
......@@ -615,9 +551,9 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
// NOTE: this pHandle->consumerId may have been changed already.
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
", ts:%" PRId64,
", ts:%" PRId64", reqId:0x%"PRIx64,
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
dataRsp.rspOffset.ts);
dataRsp.rspOffset.ts, pRequest->reqId);
tDeleteSMqDataRsp(&dataRsp);
return code;
......@@ -680,17 +616,12 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
break;
}
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
// if (terrno == 0) { // failed to seek to given ver, but no errors happen.
// code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, (SMqDataRsp*) &taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
// return code;
// } else { // error happens, return to consumers
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code;
// }
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code;
}
SWalCont* pHead = &pCkHead->head;
......
......@@ -183,22 +183,24 @@ end:
return tbSuid == realTbSuid;
}
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead, uint64_t reqId) {
int32_t code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
taosThreadMutexLock(&pHandle->pWalReader->mutex);
int64_t offset = *fetchOffset;
while (1) {
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
tqDebug("tmq poll: consumer:%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return, reqId:0x%"PRIx64,
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
*fetchOffset = offset - 1;
code = -1;
goto END;
}
tqDebug("vgId:%d, taosx get msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, offset,
TMSG_INFO((*ppCkHead)->head.msgType));
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId,
pHandle->consumerId, offset, TMSG_INFO((*ppCkHead)->head.msgType), reqId);
if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader, ppCkHead);
......
......@@ -134,7 +134,6 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id
// all queried tables have been dropped already, return immediately.
if (p->pSchema == NULL) {
taosMemoryFree(p);
tsdbWarn("all queried tables has been dropped, try next group, %s", idstr);
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
......
......@@ -922,7 +922,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
pBlockNum->numOfBlocks += 1;
}
if ((pScanInfo->pBlockList != NULL) && (taosArrayGetSize(pScanInfo->pBlockList) > 0)) {
if (taosArrayGetSize(pScanInfo->pBlockList) > 0) {
numOfQTable += 1;
}
}
......@@ -4220,12 +4220,8 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
if (pStatus->loadFromFile) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
if (pBlockInfo != NULL) {
pBlockScanInfo =
*(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
if (pBlockScanInfo == NULL) {
code = TSDB_CODE_INVALID_PARA;
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
goto _err;
}
} else {
......
......@@ -1194,6 +1194,8 @@ static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
if (code != TSDB_CODE_SUCCESS) {
SET_SESSION_WIN_KEY_INVALID(pKey);
}
taosMemoryFree(pCur);
return code;
}
......
......@@ -688,16 +688,17 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
}
static int32_t g_once_commit_flag = 0;
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code);
if (0 == g_once_commit_flag) {
g_once_commit_flag = 1;
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code);
if (0 == g_once_commit_flag) {
g_once_commit_flag = 1;
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
}
char tmpString[128];
taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
char tmpString[128];
taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
}
void build_consumer(SThreadInfo* pInfo) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册