未验证 提交 2790368a 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20269 from taosdata/feature/3_liaohj

fix(qurey): fix som errors and add some logs.
......@@ -66,6 +66,15 @@ extern int32_t tMsgDict[];
typedef uint16_t tmsg_t;
static inline bool tmsgIsValid(tmsg_t type) {
if (type < TDMT_DND_MAX_MSG || type < TDMT_MND_MAX_MSG || type < TDMT_VND_MAX_MSG || type < TDMT_SCH_MAX_MSG ||
type < TDMT_STREAM_MAX_MSG || type < TDMT_MON_MAX_MSG || type < TDMT_SYNC_MAX_MSG || type < TDMT_VND_STREAM_MSG ||
type < TDMT_VND_TMQ_MSG || type < TDMT_VND_TMQ_MAX_MSG) {
return true;
} else {
return false;
}
}
static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
......@@ -1911,10 +1920,10 @@ typedef struct {
} SMqConsumerLostMsg, SMqConsumerRecoverMsg, SMqConsumerClearMsg;
typedef struct {
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
char clientId[256];
SArray* topicNames; // SArray<char**>
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
char clientId[256];
SArray* topicNames; // SArray<char**>
} SCMSubscribeReq;
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
......@@ -2691,7 +2700,7 @@ typedef struct {
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int8_t subType;
int8_t withMeta;
char* qmsg; //SubPlanToString
char* qmsg; // SubPlanToString
int64_t suid;
} SMqRebVgReq;
......
......@@ -159,9 +159,9 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 5);
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500);
connLimitNum = TMIN(connLimitNum, 1000);
rpcInit.connLimitNum = connLimitNum;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
......
......@@ -271,6 +271,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SReqResultInfo *pResultInfo;
if (msg->resIter == -1) {
pResultInfo = tmqGetNextResInfo(res, true);
tscDebug("consumer:0x%" PRIx64 ", vgId:%d, numOfRows:%" PRId64 ", total rows:%" PRId64, msg->rsp.head.consumerId,
msg->vgId, pResultInfo->numOfRows, pResultInfo->totalRows);
} else {
pResultInfo = tmqGetCurResInfo(res);
}
......@@ -281,7 +283,13 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return pResultInfo->row;
} else {
pResultInfo = tmqGetNextResInfo(res, true);
if (pResultInfo == NULL) return NULL;
if (pResultInfo == NULL) {
return NULL;
}
tscDebug("consumer:0x%" PRIx64 " vgId:%d, numOfRows:%" PRId64 ", total rows:%" PRId64, msg->rsp.head.consumerId,
msg->vgId, pResultInfo->numOfRows, pResultInfo->totalRows);
doSetOneRowPtr(pResultInfo);
pResultInfo->current += 1;
return pResultInfo->row;
......
......@@ -192,6 +192,7 @@ typedef struct {
SMqClientTopic* pTopic;
int32_t vgId;
tsem_t rspSem;
uint64_t requestId; // request id for debug purpose
} SMqPollCbParam;
typedef struct {
......@@ -1054,7 +1055,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
}
tscInfo("consumer:0x%" PRIx64 " is setup, consumer groupId %s", pTmq->consumerId, pTmq->groupId);
tscInfo("consumer:0x%" PRIx64 " is setup, groupId:%s", pTmq->consumerId, pTmq->groupId);
return pTmq;
FAIL:
......@@ -1075,7 +1076,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SCMSubscribeReq req = {0};
int32_t code = 0;
tscDebug("consumer:0x%" PRIx64 " subscribe %d topics", tmq->consumerId, sz);
tscDebug("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
req.consumerId = tmq->consumerId;
tstrncpy(req.clientId, tmq->clientId, 256);
......@@ -1161,7 +1162,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
if (retryCnt++ > 10) {
if (retryCnt++ > 40) {
goto FAIL;
}
......@@ -1212,30 +1213,39 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
return -1;
}
int32_t epoch = pParam->epoch;
int32_t vgId = pParam->vgId;
int32_t epoch = pParam->epoch;
int32_t vgId = pParam->vgId;
uint64_t requestId = pParam->requestId;
taosMemoryFree(pParam);
if (code != 0) {
tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
tscWarn("consumer:0x%"PRIx64" msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%"PRIx64, tmq->consumerId, vgId,
epoch, tstrerror(code), requestId);
if (pMsg->pData) taosMemoryFree(pMsg->pData);
if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);
// in case of consumer mismatch, wait for 500ms and retry
if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
taosMsleep(500);
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
goto CREATE_MSG_FAIL;
}
if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
tscDebug("consumer:0x%" PRIx64" wait for the re-balance, wait for 500ms and set status to be RECOVER", tmq->consumerId);
} else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
if (pRspWrapper == NULL) {
tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d since out of memory, reqId:0x%" PRIx64,
tmq->consumerId, vgId, epoch, requestId);
goto CREATE_MSG_FAIL;
}
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
/*pRspWrapper->vgHandle = pVg;*/
/*pRspWrapper->topicHandle = pTopic;*/
taosWriteQitem(tmq->mqueue, pRspWrapper);
tsem_post(&tmq->rspSem);
}
goto CREATE_MSG_FAIL;
}
......@@ -1243,8 +1253,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
if (msgEpoch < tmqEpoch) {
// do not write into queue since updating epoch reset
tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
tmqEpoch);
tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%"PRIx64,
tmq->consumerId, vgId, msgEpoch, tmqEpoch, requestId);
tsem_post(&tmq->rspSem);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
......@@ -1252,7 +1263,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
}
if (msgEpoch != tmqEpoch) {
tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
tscWarn("consumer:0x%"PRIx64" mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%"PRIx64, tmq->consumerId, vgId,
msgEpoch, tmqEpoch, requestId);
}
// handle meta rsp
......@@ -1262,7 +1274,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
if (pRspWrapper == NULL) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
tscWarn("consumer:0x%"PRIx64" msg discard from vgId:%d, epoch %d since out of memory", tmq->consumerId, vgId, epoch);
goto CREATE_MSG_FAIL;
}
......@@ -1277,10 +1289,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecoderClear(&decoder);
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
tscDebug("consumer:0x%" PRIx64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req offset:%" PRId64 ", rsp offset:%" PRId64 " type %d, reqId:0x%"PRIx64,
tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
rspType);
rspType, requestId);
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
......@@ -1298,16 +1309,19 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper);
tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue, total in queue:%d, reqId:0x%" PRIx64, tmq->consumerId,
tmq->mqueue->numOfItems, requestId);
taosWriteQitem(tmq->mqueue, pRspWrapper);
tsem_post(&tmq->rspSem);
return 0;
CREATE_MSG_FAIL:
if (epoch == tmq->epoch) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
tsem_post(&tmq->rspSem);
return -1;
}
......@@ -1344,7 +1358,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
char buf[80];
tFormatOffset(buf, 80, &pVgCur->currentOffset);
tscDebug("consumer:0x%" PRIx64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch,
pVgCur->vgId, vgKey, buf);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
}
......@@ -1552,7 +1566,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
.handle = NULL,
};
sendInfo->requestId = tmq->consumerId;
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqAskEpCb;
......@@ -1560,7 +1574,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d, reqId:0x%" PRIx64, tmq->consumerId, async,
tmq->consumerId);
sendInfo->requestId);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
......@@ -1674,6 +1688,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
pParam->pVg = pVg; // pVg may be released,fix it
pParam->pTopic = pTopic;
pParam->vgId = pVg->vgId;
pParam->requestId = req.reqId;
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
......@@ -1698,7 +1713,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
char offsetFormatBuf[80];
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->currentOffset);
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%" PRIx64,
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
pTmq->consumerId, pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
......@@ -1722,10 +1737,9 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus == TMQ_VG_STATUS__WAIT) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
pVg->vgId, vgSkipCnt);
continue;
/*if (vgSkipCnt < 10000) continue;*/
#if 0
if (skipCnt < 30000) {
continue;
......@@ -1767,7 +1781,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
}
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp", tmq->consumerId);
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
while (1) {
SMqRspWrapper* rspWrapper = NULL;
......@@ -1785,25 +1799,32 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
taosFreeQitem(rspWrapper);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
return NULL;
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
tscDebug("consumer:0x%" PRIx64 " process poll rsp", tmq->consumerId);
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
* rspMsg->msg.rspOffset);*/
pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->dataRsp.blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d", tmq->consumerId, pVg->vgId);
taosFreeQitem(pollRspWrapper);
rspWrapper = NULL;
continue;
}
// build rsp
char buf[80];
tFormatOffset(buf, 80, &pVg->currentOffset);
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d", tmq->consumerId,
pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum);
taosFreeQitem(pollRspWrapper);
return pRsp;
} else {
......@@ -1876,6 +1897,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
}
}
tscDebug("consumer:0x%" PRIx64 " handle the rsp completed", tmq->consumerId);
}
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
......@@ -1896,17 +1919,18 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
// in no topic status, delayed task also need to be processed
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
taosMsleep(500); // sleep for a while
return NULL;
}
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
if (retryCnt++ > 10) {
if (retryCnt++ > 40) {
return NULL;
}
tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/10 in 500ms", tmq->consumerId, retryCnt);
tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt);
taosMsleep(500);
}
}
......
......@@ -925,7 +925,7 @@ TEST(clientCase, subscription_test) {
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topic_t1");
// tmq_list_append(topicList, "topic_t1");
// 启动订阅
tmq_subscribe(tmq, topicList);
......@@ -938,6 +938,8 @@ TEST(clientCase, subscription_test) {
int32_t msgCnt = 0;
int32_t timeout = 5000;
int32_t count = 0;
while (1) {
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
......@@ -952,6 +954,11 @@ TEST(clientCase, subscription_test) {
printf("db: %s\n", dbName);
printf("vgroup id: %d\n", vgroupId);
if (count ++ > 20) {
tmq_unsubscribe(tmq);
break;
}
while (1) {
TAOS_ROW row = taos_fetch_row(pRes);
if (row == NULL) break;
......
......@@ -41,7 +41,7 @@ bool tsPrintAuth = false;
// queue & threads
int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 6000;
int32_t tsNumOfRpcSessions = 10000;
int32_t tsTimeToGetAvailableConn = 500000;
int32_t tsNumOfCommitThreads = 2;
int32_t tsNumOfTaskQueueThreads = 4;
......
......@@ -6584,7 +6584,7 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
} else if (pVal->type == TMQ_OFFSET__LOG) {
snprintf(buf, maxLen, "offset(log) ver:%" PRId64, pVal->version);
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
snprintf(buf, maxLen, "offset(ss data) uid:%" PRId64 ", ts:%" PRId64, pVal->uid, pVal->ts);
snprintf(buf, maxLen, "offset(snapshot) uid:%" PRId64 " ts:%" PRId64, pVal->uid, pVal->ts);
} else {
ASSERT(0);
}
......
......@@ -26,18 +26,18 @@
#define MND_CONSUMER_VER_NUMBER 1
#define MND_CONSUMER_RESERVE_SIZE 64
#define MND_CONSUMER_LOST_HB_CNT 3
#define MND_CONSUMER_LOST_HB_CNT 6
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
static int8_t mqRebInExecCnt = 0;
static int32_t mqRebInExecCnt = 0;
static const char *mndConsumerStatusName(int status);
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
static int32_t mndRetrieveConsumer(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
......@@ -76,15 +76,36 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void mndCleanupConsumer(SMnode *pMnode) {}
bool mndRebTryStart() {
int8_t old = atomic_val_compare_exchange_8(&mqRebInExecCnt, 0, 1);
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
mInfo("tq timer, rebalance counter old val:%d", old);
return old == 0;
}
void mndRebEnd() { atomic_sub_fetch_8(&mqRebInExecCnt, 1); }
void mndRebEnd() {
mndRebCntDec();
}
void mndRebCntInc() { atomic_add_fetch_8(&mqRebInExecCnt, 1); }
void mndRebCntInc() {
int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
mInfo("rebalance trans start, rebalance counter:%d", val);
}
void mndRebCntDec() { atomic_sub_fetch_8(&mqRebInExecCnt, 1); }
void mndRebCntDec() {
while (1) {
int32_t val = atomic_load_32(&mqRebInExecCnt);
if (val <= 0) {
mError("rebalance trans end, rebalance counter:%d should not be less equalled than 0, ignore counter desc", val);
break;
}
int32_t newVal = val - 1;
int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
if (oldVal == val) {
mInfo("rebalance trans end, rebalance counter:%d", newVal);
break;
}
}
}
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
......@@ -298,6 +319,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
taosRUnLockLatch(&pConsumer->lock);
} else if (status == MQ_CONSUMER_STATUS__MODIFY) {
taosRLockLatch(&pConsumer->lock);
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
for (int32_t i = 0; i < newTopicNum; i++) {
char key[TSDB_SUBSCRIBE_KEY_LEN];
......@@ -334,7 +356,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
} else {
taosHashCleanup(pRebMsg->rebSubHash);
rpcFreeCont(pRebMsg);
mTrace("mq rebalance finished, no modification");
mInfo("mq rebalance finished, no modification");
mndRebEnd();
}
return 0;
......@@ -611,10 +633,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pExistedConsumer == NULL) {
mInfo("receive subscribe request from new consumer:0x%" PRIx64" cgroup:%s", consumerId, subscribe.cgroup);
mInfo("receive subscribe request from new consumer:0x%" PRIx64" cgroup:%s, numOfTopics:%d", consumerId,
subscribe.cgroup, (int32_t) taosArrayGetSize(pTopicList));
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
// set the update type
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
......@@ -943,8 +966,9 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
mDebug("consumer:0x%" PRIx64 " state %s -> %s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
mndConsumerStatusName(pOldConsumer->status),
pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
/*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);*/
......@@ -1002,8 +1026,9 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
mDebug("consumer:0x%" PRIx64 " state %s -> %s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
mDebug("consumer:0x%" PRIx64 " state %d(%s) -> %d(%s), new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
mndConsumerStatusName(pOldConsumer->status),
pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics));
}
......@@ -1044,7 +1069,6 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
}
taosRLockLatch(&pConsumer->lock);
mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
......
......@@ -706,6 +706,9 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
} else if (code == 0) {
mGTrace("msg:%p, successfully processed", pMsg);
} else {
if (code == -1) {
code = terrno;
}
mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, tstrerror(code), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType));
}
......
......@@ -224,7 +224,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRId64, sub, pVgEp->vgId, consumerId);
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, sub, pVgEp->vgId, consumerId);
}
taosArrayDestroy(pConsumerEp->vgs);
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
......@@ -263,7 +263,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
imbConsumerNum = totalVgNum % afterRebConsumerNum;
}
mInfo("sub:%s mq re-balance %d consumers: at least %d vg each, %d consumer has more vg", sub,
mInfo("sub:%s mq re-balance %d consumers: at least %d vgs each, %d consumers has more vgs", sub,
afterRebConsumerNum, minVgCnt, imbConsumerNum);
// 4. first scan: remove consumer more than wanted, put to remove hash
......@@ -296,7 +296,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId,
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", sub, pVgEp->vgId,
pConsumerEp->consumerId);
}
imbCnt++;
......@@ -311,7 +311,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId,
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", sub, pVgEp->vgId,
pConsumerEp->consumerId);
}
}
......@@ -329,7 +329,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
taosArrayPush(pOutput->newConsumers, &consumerId);
mInfo("sub:%s mq rebalance add new consumer:%" PRId64, sub, consumerId);
mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, sub, consumerId);
}
}
......@@ -357,7 +357,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId;
taosArrayPush(pOutput->rebVgs, pRebVg);
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 " (second scan) (not enough)", pRebVg->pVgEp->vgId,
mInfo("mq rebalance: add vgId:%d to consumer:%" PRIx64 " (second scan) (not enough)", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId);
}
}
......@@ -387,12 +387,12 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId;
if (pRebVg->newConsumerId == pRebVg->oldConsumerId) {
mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 " (second scan)", pRebVg->pVgEp->vgId,
mInfo("mq rebalance: skip vg %d for same consumer:%" PRIx64 " (second scan)", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId);
continue;
}
taosArrayPush(pOutput->rebVgs, pRebVg);
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId,
mInfo("mq rebalance: add vgId:%d to consumer:%" PRIx64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId);
}
} else {
......@@ -427,10 +427,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
if (pIter == NULL) break;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRId64 " has %d vg", sub, pConsumerEp->consumerId, sz);
mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", sub, pConsumerEp->consumerId, sz);
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRId64, sub, pVgEp->vgId,
mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, sub, pVgEp->vgId,
pConsumerEp->consumerId);
}
}
......@@ -444,7 +444,9 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
if (pTrans == NULL) return -1;
if (pTrans == NULL) {
return -1;
}
mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
......@@ -591,13 +593,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
rebOutput.pSub = mndCreateSubscription(pMnode, pTopic, pRebInfo->key);
if (rebOutput.pSub == NULL) {
mError("mq rebalance %s failed create sub since %s, abort", pRebInfo->key, terrstr());
mError("mq rebalance %s failed create sub since %s, ignore", pRebInfo->key, terrstr());
taosRUnLockLatch(&pTopic->lock);
mndReleaseTopic(pMnode, pTopic);
continue;
}
memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
taosRUnLockLatch(&pTopic->lock);
mndReleaseTopic(pMnode, pTopic);
......@@ -616,9 +618,9 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
// if add more consumer to balanced subscribe,
// possibly no vg is changed
// when each topic is re-balanced, issue an trans to save the results in sdb.
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
mError("mq re-balance persist re-balance output error, possibly vnode splitted or dropped");
mError("mq re-balance persist output error, possibly vnode splitted or dropped");
}
taosArrayDestroy(pRebInfo->lostConsumers);
......@@ -1017,7 +1019,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false);
mDebug("mnd show subscriptions: topic %s, consumer %" PRId64 " cgroup %s vgid %d", varDataVal(topic),
mDebug("mnd show subscriptions: topic %s, consumer:%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId);
// offset
......
......@@ -505,7 +505,8 @@ static TransCbFp mndTransGetCbFp(ETrnFunc ftype) {
}
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
mTrace("trans:%d, perform insert action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage));
mInfo("trans:%d, perform insert action, row:%p stage:%s, callfunc:1, startFunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage),
pTrans->startFunc);
if (pTrans->startFunc > 0) {
TransCbFp fp = mndTransGetCbFp(pTrans->startFunc);
......@@ -548,8 +549,9 @@ static void mndTransDropData(STrans *pTrans) {
}
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
mTrace("trans:%d, perform delete action, row:%p stage:%s callfunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage),
callFunc);
mInfo("trans:%d, perform delete action, row:%p stage:%s callfunc:%d, stopFunc:%d", pTrans->id, pTrans,
mndTransStr(pTrans->stage), callFunc, pTrans->stopFunc);
if (pTrans->stopFunc > 0 && callFunc) {
TransCbFp fp = mndTransGetCbFp(pTrans->stopFunc);
if (fp) {
......@@ -574,7 +576,7 @@ static void mndTransUpdateActions(SArray *pOldArray, SArray *pNewArray) {
}
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
mTrace("trans:%d, perform update action, old row:%p stage:%s create:%" PRId64 ", new row:%p stage:%s create:%" PRId64,
mInfo("trans:%d, perform update action, old row:%p stage:%s create:%" PRId64 ", new row:%p stage:%s create:%" PRId64,
pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage),
pNew->createdTime);
......@@ -873,6 +875,7 @@ int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans) {
}
}
if (mndCheckTransConflict(pMnode, pTrans)) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
......
......@@ -264,7 +264,7 @@ int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqSeekVer(STqReader *pReader, int64_t ver);
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char* id);
int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret);
int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
......
......@@ -488,7 +488,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
// 2. check rebalance
if (pHandle->consumerId != consumerId) {
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
return -1;
......@@ -822,6 +822,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
pHandle = &tqHandle;
/*taosInitRWLatch(&pExec->lock);*/
uint64_t oldConsumerId = pHandle->consumerId;
memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
pHandle->consumerId = req.newConsumerId;
pHandle->epoch = -1;
......@@ -884,13 +885,16 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
(SSnapContext**)(&handle.sContext));
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
}
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, pHandle->consumerId);
tqDebug("try to persist handle %s consumer:0x%" PRIx64" , old consumer:0x%"PRIx64, req.subKey, pHandle->consumerId,
oldConsumerId);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
return -1;
}
} else {
// TODO handle qmsg and exec modification
tqInfo("update the consumer info, old consumer id:0x%"PRIx64", new Id:0x%"PRIx64, pHandle->consumerId, req.newConsumerId);
atomic_store_32(&pHandle->epoch, -1);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_add_fetch_32(&pHandle->epoch, 1);
......@@ -898,6 +902,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
qStreamCloseTsdbReader(pHandle->execHandle.task);
}
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
return -1;
}
......
......@@ -290,14 +290,14 @@ void tqCloseReader(STqReader* pReader) {
taosMemoryFree(pReader);
}
int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) {
// todo set the correct vgId
tqDebug("tmq poll: vgId:%d wal seek to version:%"PRId64, 0, ver);
tqDebug("tmq poll: wal seek to version:%"PRId64" %s", ver, id);
if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
tqError("tmq poll: wal reader failed to seek to ver:%"PRId64, ver);
tqError("tmq poll: wal reader failed to seek to ver:%"PRId64" code:%s, %s", ver, tstrerror(terrno), id);
return -1;
} else {
tqDebug("tmq poll: wal reader seek to ver:%"PRId64, ver);
tqDebug("tmq poll: wal reader seek to ver:%"PRId64" %s", ver, id);
return 0;
}
}
......@@ -308,13 +308,17 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
while (1) {
if (!fromProcessedMsg) {
if (walNextValidMsg(pReader->pWalReader) < 0) {
pReader->ver =
pReader->pWalReader->curVersion - pReader->pWalReader->curStopped;
// pReader->pWalReader->curVersion - (pReader->pWalReader->curInvalid | pReader->pWalReader->curStopped);
// pReader->ver = pReader->pWalReader->curVersion - pReader->pWalReader->curStopped;
if(pReader->pWalReader->curInvalid == 0){
pReader->ver = pReader->pWalReader->curVersion - pReader->pWalReader->curStopped;
}else{
pReader->ver = walGetLastVer(pReader->pWalReader->pWal);
}
ret->offset.type = TMQ_OFFSET__LOG;
ret->offset.version = pReader->ver;
ret->fetchType = FETCH_TYPE__NONE;
tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version);
tqDebug("return offset %" PRId64 ", no more valid msg in wal", ret->offset.version);
return -1;
}
......
......@@ -1080,7 +1080,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
tsdbReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL;
// let's seek to the next version in wal file
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) {
return -1;
}
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
......
......@@ -1609,6 +1609,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pResult && pResult->info.rows > 0) {
qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64, pResult->info.rows,
pResult->info.window.skey, pResult->info.window.ekey);
qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, pResult->info.rows,
pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion);
pTaskInfo->streamInfo.returned = 1;
return pResult;
} else {
......@@ -1618,7 +1620,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
pTSInfo->base.dataReader = NULL;
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
return NULL;
}
......@@ -1632,8 +1634,12 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
while (1) {
SFetchRet ret = {0};
if (tqNextBlock(pInfo->tqReader, &ret) < 0) {
qError("failed to get next log block since %s", terrstr());
// if the end is reached, terrno is 0
if (terrno != 0) {
qError("failed to get next log block since %s", terrstr());
}
}
if (ret.fetchType == FETCH_TYPE__DATA) {
blockDataCleanup(pInfo->pRes);
setBlockIntoRes(pInfo, &ret.data, true);
......
......@@ -13,7 +13,6 @@
*/
#include "transComm.h"
#include "tutil.h"
typedef struct {
int32_t numOfConn;
......@@ -121,6 +120,9 @@ typedef struct SCliThrd {
SCliMsg* stopMsg;
bool quit;
int newConnCount;
SHashObj* msgCount;
} SCliThrd;
typedef struct SCliObj {
......@@ -423,6 +425,21 @@ void cliHandleResp(SCliConn* conn) {
tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
}
// if (TMSG_INFO(pHead->msgType - 1) != 0) {
// char buf[128] = {0};
// sprintf(buf, "%s", TMSG_INFO(pHead->msgType - 1));
// int* count = taosHashGet(pThrd->msgCount, TMSG_INFO(pHead->msgType - 1), strlen(TMSG_INFO(pHead->msgType - 1)));
// if (NULL == 0) {
// int localCount = 1;
// taosHashPut(pThrd->msgCount, TMSG_INFO(pHead->msgType - 1), strlen(TMSG_INFO(pHead->msgType - 1)), &localCount,
// sizeof(localCount));
// } else {
// int localCount = *count - 1;
// taosHashPut(pThrd->msgCount, TMSG_INFO(pHead->msgType - 1), strlen(TMSG_INFO(pHead->msgType - 1)), &localCount,
// sizeof(localCount));
// }
// }
STraceId* trace = &transMsg.info.traceId;
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code));
......@@ -675,6 +692,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
}
list->numOfConn++;
}
tTrace("%s numOfConn: %d, limit: %d", pTransInst->label, list->numOfConn, pTransInst->connLimitNum);
return NULL;
}
......@@ -786,7 +804,6 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
tTrace("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn);
transAllocBuffer(pBuf, buf);
}
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
......@@ -860,7 +877,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
static void cliDestroyConn(SCliConn* conn, bool clear) {
SCliThrd* pThrd = conn->hostThrd;
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
conn->broken = true;
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
......@@ -875,6 +892,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
connList->list->numOfConn--;
}
conn->list = NULL;
pThrd->newConnCount--;
transReleaseExHandle(transGetRefMgt(), conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId);
......@@ -1100,6 +1118,19 @@ void cliSend(SCliConn* pConn) {
msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
}
// if (tmsgIsValid(pHead->msgType)) {
// char buf[128] = {0};
// sprintf(buf, "%s", TMSG_INFO(pHead->msgType));
// int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
// if (NULL == 0) {
// int localCount = 1;
// taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
// } else {
// int localCount = *count + 1;
// taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
// }
// }
tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen);
......@@ -1175,6 +1206,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
addr.sin_port = (uint16_t)htons(pList->port);
tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst);
pThrd->newConnCount++;
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10);
if (fd == -1) {
tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
......@@ -1493,6 +1525,18 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
destroyCmsg(pMsg);
return;
}
if (tmsgIsValid(pMsg->msg.msgType)) {
char buf[128] = {0};
sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType));
int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
if (NULL == 0) {
int localCount = 1;
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
} else {
int localCount = *count + 1;
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
}
}
char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet);
uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet);
......@@ -1548,6 +1592,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
addr.sin_port = (uint16_t)htons(port);
tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip);
pThrd->newConnCount++;
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
if (fd == -1) {
tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
......@@ -1737,6 +1782,19 @@ static void cliAsyncCb(uv_async_t* handle) {
QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx);
void* pIter = taosHashIterate(pThrd->msgCount, NULL);
while (pIter != NULL) {
int* count = pIter;
size_t len = 0;
char* key = taosHashGetKey(pIter, &len);
if (*count != 0) {
tDebug("key: %s count: %d", key, *count);
}
pIter = taosHashIterate(pThrd->msgCount, pIter);
}
tDebug("all conn count: %d", pThrd->newConnCount);
int8_t supportBatch = pTransInst->supportBatch;
if (supportBatch == 0) {
cliNoBatchDealReq(&wq, pThrd);
......@@ -1971,6 +2029,9 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->quit = false;
pThrd->newConnCount = 0;
pThrd->msgCount = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
return pThrd;
}
static void destroyThrdObj(SCliThrd* pThrd) {
......@@ -2016,6 +2077,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
}
taosHashCleanup(pThrd->batchCache);
taosHashCleanup(pThrd->msgCount);
taosMemoryFree(pThrd);
}
......@@ -2318,6 +2380,18 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
}
}
if (tmsgIsValid(pResp->msgType - 1)) {
char buf[128] = {0};
sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1));
int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
if (NULL == 0) {
int localCount = 0;
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
} else {
int localCount = *count - 1;
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
}
}
if (pCtx->pSem != NULL) {
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (pCtx->pRsp == NULL) {
......
......@@ -435,7 +435,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TQ_META_KEY_NOT_IN_TXN, "TQ meta key not in tx
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_META_KEY_DUP_IN_TXN, "TQ met key dup in txn")
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_GROUP_NOT_SET, "TQ group not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND, "TQ table schema not found")
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "TQ no commited offset")
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "TQ no committed offset")
// wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted")
......
......@@ -79,15 +79,15 @@ CFG_DIR=$PRG_DIR/cfg
LOG_DIR=$PRG_DIR/log
echo "------------------------------------------------------------------------"
echo "TOP_DIR: $TOP_DIR"
echo "BUILD_DIR: $BUILD_DIR"
echo "SIM_DIR : $SIM_DIR"
echo "CFG_DIR : $CFG_DIR"
echo "PROGRAM: $PROGRAM
echo "CFG_DIR: $CFG_DIR
echo "POLL_DELAY: $POLL_DELAY
echo "DB_NAME: $DB_NAME
echo "PROGRAM: $PROGRAM"
echo "CFG_DIR: $CFG_DIR"
echo "POLL_DELAY: $POLL_DELAY"
echo "DB_NAME: $DB_NAME"
echo "------------------------------------------------------------------------"
if [ "$EXEC_OPTON" = "start" ]; then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册