提交 e8c11c58 编写于 作者: H Haojun Liao

fix(tq): add some logs.

上级 3730ea4b
......@@ -271,6 +271,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SReqResultInfo *pResultInfo;
if (msg->resIter == -1) {
pResultInfo = tmqGetNextResInfo(res, true);
tscDebug("consumer:0x%" PRIx64 ", vgId:%d, numOfRows:%" PRId64 ", total rows:%" PRId64, msg->rsp.head.consumerId,
msg->vgId, pResultInfo->numOfRows, pResultInfo->totalRows);
} else {
pResultInfo = tmqGetCurResInfo(res);
......@@ -281,7 +283,13 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return pResultInfo->row;
} else {
pResultInfo = tmqGetNextResInfo(res, true);
if (pResultInfo == NULL) return NULL;
if (pResultInfo == NULL) {
return NULL;
tscDebug("consumer:0x%" PRIx64 " vgId:%d, numOfRows:%" PRId64 ", total rows:%" PRId64, msg->rsp.head.consumerId,
msg->vgId, pResultInfo->numOfRows, pResultInfo->totalRows);
pResultInfo->current += 1;
return pResultInfo->row;
......@@ -192,6 +192,7 @@ typedef struct {
SMqClientTopic* pTopic;
int32_t vgId;
tsem_t rspSem;
uint64_t requestId; // request id for debug purpose
} SMqPollCbParam;
typedef struct {
......@@ -1054,7 +1055,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
tscInfo("consumer:0x%" PRIx64 " is setup, consumer groupId %s", pTmq->consumerId, pTmq->groupId);
tscInfo("consumer:0x%" PRIx64 " is setup, groupId:%s", pTmq->consumerId, pTmq->groupId);
return pTmq;
......@@ -1224,6 +1225,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
if (pRspWrapper == NULL) {
......@@ -1277,10 +1279,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
tscDebug("consumer:0x%" PRIx64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req offset:%" PRId64 ", rsp offset:%" PRId64 " type %d, reqId:0x%"PRIx64,
tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
rspType, pParam->requestId);
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
......@@ -1298,7 +1299,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper);
tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue, total in queue:%d", tmq->consumerId, tmq->mqueue->numOfItems);
taosWriteQitem(tmq->mqueue, pRspWrapper);
......@@ -1344,7 +1345,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
char buf[80];
tFormatOffset(buf, 80, &pVgCur->currentOffset);
tscDebug("consumer:0x%" PRIx64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch,
pVgCur->vgId, vgKey, buf);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
......@@ -1552,7 +1553,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
.handle = NULL,
sendInfo->requestId = tmq->consumerId;
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqAskEpCb;
......@@ -1560,7 +1561,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d, reqId:0x%" PRIx64, tmq->consumerId, async,
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
......@@ -1674,6 +1675,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
pParam->pVg = pVg; // pVg may be released,fix it
pParam->pTopic = pTopic;
pParam->vgId = pVg->vgId;
pParam->requestId = req.reqId;
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
......@@ -1698,7 +1700,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
char offsetFormatBuf[80];
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->currentOffset);
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%" PRIx64,
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
pTmq->consumerId, pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
......@@ -1722,10 +1724,9 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus == TMQ_VG_STATUS__WAIT) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
pVg->vgId, vgSkipCnt);
/*if (vgSkipCnt < 10000) continue;*/
#if 0
if (skipCnt < 30000) {
......@@ -1767,7 +1768,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp", tmq->consumerId);
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
while (1) {
SMqRspWrapper* rspWrapper = NULL;
......@@ -1785,25 +1786,33 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
return NULL;
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
tscDebug("consumer:0x%" PRIx64 " process poll rsp", tmq->consumerId);
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
* rspMsg->msg.rspOffset);*/
pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->dataRsp.blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " empty block received in poll rsp", tmq->consumerId);
rspWrapper = NULL;
// build rsp
char buf[80];
tFormatOffset(buf, 80, &pVg->currentOffset);
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d", tmq->consumerId,
pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum);
return pRsp;
} else {
......@@ -1876,6 +1885,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tscDebug("consumer:0x%" PRIx64 " handle the rsp completed", tmq->consumerId);
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
......@@ -6570,7 +6570,7 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
} else if (pVal->type == TMQ_OFFSET__LOG) {
snprintf(buf, maxLen, "offset(log) ver:%" PRId64, pVal->version);
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
snprintf(buf, maxLen, "offset(ss data) uid:%" PRId64 ", ts:%" PRId64, pVal->uid, pVal->ts);
snprintf(buf, maxLen, "offset(snapshot) uid:%" PRId64 " ts:%" PRId64, pVal->uid, pVal->ts);
} else {
......@@ -29,7 +29,7 @@
static int8_t mqRebInExecCnt = 0;
static int32_t mqRebInExecCnt = 0;
static const char *mndConsumerStatusName(int status);
......@@ -76,15 +76,16 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void mndCleanupConsumer(SMnode *pMnode) {}
bool mndRebTryStart() {
int8_t old = atomic_val_compare_exchange_8(&mqRebInExecCnt, 0, 1);
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
mInfo("tq timer, rebalance counter old val:%d", old);
return old == 0;
void mndRebEnd() { atomic_sub_fetch_8(&mqRebInExecCnt, 1); }
void mndRebEnd() { int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1); mInfo("rebalance end, rebalance counter:%d", val); }
void mndRebCntInc() { atomic_add_fetch_8(&mqRebInExecCnt, 1); }
void mndRebCntInc() { int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1); mInfo("rebalance trans start, rebalance count:%d", val);}
void mndRebCntDec() { atomic_sub_fetch_8(&mqRebInExecCnt, 1); }
void mndRebCntDec() { int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1); mInfo("rebalance trans end, rebalance count:%d", val); }
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
......@@ -334,7 +335,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
} else {
mTrace("mq rebalance finished, no modification");
mInfo("mq rebalance finished, no modification");
return 0;
......@@ -503,7 +503,8 @@ static TransCbFp mndTransGetCbFp(ETrnFunc ftype) {
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
mTrace("trans:%d, perform insert action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage));
mInfo("trans:%d, perform insert action, row:%p stage:%s, startFunc:%d, callfunc:1", pTrans->id, pTrans, mndTransStr(pTrans->stage),
if (pTrans->startFunc > 0) {
TransCbFp fp = mndTransGetCbFp(pTrans->startFunc);
......@@ -546,8 +547,9 @@ static void mndTransDropData(STrans *pTrans) {
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
mTrace("trans:%d, perform delete action, row:%p stage:%s callfunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage),
mInfo("trans:%d, perform delete action, row:%p stage:%s callfunc:%d, stopFunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage),
pTrans->stopFunc, callFunc);
if (pTrans->stopFunc > 0 && callFunc) {
TransCbFp fp = mndTransGetCbFp(pTrans->stopFunc);
if (fp) {
......@@ -572,7 +574,7 @@ static void mndTransUpdateActions(SArray *pOldArray, SArray *pNewArray) {
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
mTrace("trans:%d, perform update action, old row:%p stage:%s create:%" PRId64 ", new row:%p stage:%s create:%" PRId64,
mInfo("trans:%d, perform update action, old row:%p stage:%s create:%" PRId64 ", new row:%p stage:%s create:%" PRId64,
pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage),
......@@ -488,7 +488,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
// 2. check rebalance
if (pHandle->consumerId != consumerId) {
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
return -1;
......@@ -822,6 +822,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
pHandle = &tqHandle;
uint64_t oldConsumerId = pHandle->consumerId;
memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
pHandle->consumerId = req.newConsumerId;
pHandle->epoch = -1;
......@@ -884,13 +885,16 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, pHandle->consumerId);
tqDebug("try to persist handle %s consumer:0x%" PRIx64" , old consumer:0x%"PRIx64, req.subKey, pHandle->consumerId,
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
return -1;
} else {
// TODO handle qmsg and exec modification
tqInfo("update the consumer info, old consumer id:0x%"PRIx64", new Id:0x%"PRIx64, pHandle->consumerId, req.newConsumerId);
atomic_store_32(&pHandle->epoch, -1);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_add_fetch_32(&pHandle->epoch, 1);
......@@ -898,6 +902,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
return -1;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册