未验证 提交 f9a8579b 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20163 from taosdata/feature/3_liaohj

refactor: do some internal refactor.
...@@ -80,15 +80,15 @@ int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char ...@@ -80,15 +80,15 @@ int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char
int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision); int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision);
int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit, int32_t timePrecision); int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit, int32_t timePrecision);
int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth); int32_t taosParseTime(const char* timestr, int64_t* pTime, int32_t len, int32_t timePrec, int8_t dayligth);
void deltaToUtcInitOnce(); void deltaToUtcInitOnce();
char getPrecisionUnit(int32_t precision); char getPrecisionUnit(int32_t precision);
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision); int64_t convertTimePrecision(int64_t ts, int32_t fromPrecision, int32_t toPrecision);
int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit); int64_t convertTimeFromPrecisionToUnit(int64_t ts, int32_t fromPrecision, char toUnit);
int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec, int64_t* timeVal); int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec, int64_t* timeVal);
void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t time, int32_t precision); void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t ts, int32_t precision);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -68,12 +68,12 @@ static int64_t user_mktime64(const uint32_t year0, const uint32_t mon0, const ui ...@@ -68,12 +68,12 @@ static int64_t user_mktime64(const uint32_t year0, const uint32_t mon0, const ui
// ==== mktime() kernel code =================// // ==== mktime() kernel code =================//
static int64_t m_deltaUtc = 0; static int64_t m_deltaUtc = 0;
void deltaToUtcInitOnce() {
struct tm tm = {0};
(void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm); void deltaToUtcInitOnce() {
m_deltaUtc = (int64_t)taosMktime(&tm); struct tm tm = {0};
// printf("====delta:%lld\n\n", seconds); (void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm);
m_deltaUtc = (int64_t)taosMktime(&tm);
// printf("====delta:%lld\n\n", seconds);
} }
static int64_t parseFraction(char* str, char** end, int32_t timePrec); static int64_t parseFraction(char* str, char** end, int32_t timePrec);
......
...@@ -559,6 +559,27 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj ...@@ -559,6 +559,27 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj
return 0; return 0;
} }
static int32_t validateTopics(const SArray* pTopicList, SMnode* pMnode, const char* pUser) {
int32_t numOfTopics = taosArrayGetSize(pTopicList);
for (int32_t i = 0; i < numOfTopics; i++) {
char *pOneTopic = taosArrayGetP(pTopicList, i);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
if (pTopic == NULL) { // terrno has been set by callee function
return -1;
}
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
mndReleaseTopic(pMnode, pTopic);
return -1;
}
mndReleaseTopic(pMnode, pTopic);
}
return 0;
}
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
char *msgStr = pMsg->pCont; char *msgStr = pMsg->pCont;
...@@ -568,15 +589,15 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -568,15 +589,15 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
uint64_t consumerId = subscribe.consumerId; uint64_t consumerId = subscribe.consumerId;
char *cgroup = subscribe.cgroup; char *cgroup = subscribe.cgroup;
SMqConsumerObj *pConsumerOld = NULL; SMqConsumerObj *pExistedConsumer = NULL;
SMqConsumerObj *pConsumerNew = NULL; SMqConsumerObj *pConsumerNew = NULL;
int32_t code = -1; int32_t code = -1;
SArray *newSub = subscribe.topicNames; SArray *pTopicList = subscribe.topicNames;
taosArraySort(newSub, taosArrayCompareString); taosArraySort(pTopicList, taosArrayCompareString);
taosArrayRemoveDuplicateP(newSub, taosArrayCompareString, taosMemoryFree); taosArrayRemoveDuplicateP(pTopicList, taosArrayCompareString, taosMemoryFree);
int32_t newTopicNum = taosArrayGetSize(newSub); int32_t newTopicNum = taosArrayGetSize(pTopicList);
// check topic existence // check topic existence
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
...@@ -584,38 +605,24 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -584,38 +605,24 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
goto _over; goto _over;
} }
for (int32_t i = 0; i < newTopicNum; i++) { code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user);
char *topic = taosArrayGetP(newSub, i); if (code != TSDB_CODE_SUCCESS) {
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); goto _over;
if (pTopic == NULL) { // terrno has been set by callee function
goto _over;
}
if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) {
mndReleaseTopic(pMnode, pTopic);
goto _over;
}
if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) {
goto _over;
}
mndReleaseTopic(pMnode, pTopic);
} }
pConsumerOld = mndAcquireConsumer(pMnode, consumerId); pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumerOld == NULL) { if (pExistedConsumer == NULL) {
mInfo("receive subscribe request from new consumer:%" PRId64, consumerId); mInfo("receive subscribe request from new consumer:0x%" PRIx64" cgroup:%s", consumerId, subscribe.cgroup);
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256); tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
taosArrayDestroy(pConsumerNew->rebNewTopics); taosArrayDestroy(pConsumerNew->rebNewTopics);
pConsumerNew->rebNewTopics = newSub; pConsumerNew->rebNewTopics = pTopicList; // all subscribe topics should re-balance.
subscribe.topicNames = NULL; subscribe.topicNames = NULL;
for (int32_t i = 0; i < newTopicNum; i++) { for (int32_t i = 0; i < newTopicNum; i++) {
char *newTopicCopy = taosStrdup(taosArrayGetP(newSub, i)); char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, i));
taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy); taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
} }
...@@ -623,12 +630,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -623,12 +630,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
} else { } else {
/*taosRLockLatch(&pConsumerOld->lock);*/ /*taosRLockLatch(&pExistedConsumer->lock);*/
int32_t status = atomic_load_32(&pExistedConsumer->status);
int32_t status = atomic_load_32(&pConsumerOld->status);
mInfo("receive subscribe request from existing consumer:%" PRId64 ", current status: %s, subscribe topic num: %d", mInfo("receive subscribe request from existed consumer:0x%" PRIx64 " cgroup:%s, current status:%d(%s), subscribe topic num: %d",
consumerId, mndConsumerStatusName(status), newTopicNum); consumerId, subscribe.cgroup, status,mndConsumerStatusName(status), newTopicNum);
if (status != MQ_CONSUMER_STATUS__READY) { if (status != MQ_CONSUMER_STATUS__READY) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
...@@ -637,36 +643,36 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -637,36 +643,36 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
if (pConsumerNew == NULL) { if (pConsumerNew == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _over; goto _over;
} }
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
for (int32_t i = 0; i < newTopicNum; i++) { for (int32_t i = 0; i < newTopicNum; i++) {
char *newTopicCopy = taosStrdup(taosArrayGetP(newSub, i)); char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, i));
taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy); taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
} }
int32_t oldTopicNum = 0; int32_t oldTopicNum = 0;
if (pConsumerOld->currentTopics) { if (pExistedConsumer->currentTopics) {
oldTopicNum = taosArrayGetSize(pConsumerOld->currentTopics); oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
} }
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
while (i < oldTopicNum || j < newTopicNum) { while (i < oldTopicNum || j < newTopicNum) {
if (i >= oldTopicNum) { if (i >= oldTopicNum) {
char *newTopicCopy = taosStrdup(taosArrayGetP(newSub, j)); char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j));
taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy); taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
j++; j++;
continue; continue;
} else if (j >= newTopicNum) { } else if (j >= newTopicNum) {
char *oldTopicCopy = taosStrdup(taosArrayGetP(pConsumerOld->currentTopics, i)); char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i));
taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy); taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
i++; i++;
continue; continue;
} else { } else {
char *oldTopic = taosArrayGetP(pConsumerOld->currentTopics, i); char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
char *newTopic = taosArrayGetP(newSub, j); char *newTopic = taosArrayGetP(pTopicList, j);
int comp = strcmp(oldTopic, newTopic); int comp = strcmp(oldTopic, newTopic);
if (comp == 0) { if (comp == 0) {
i++; i++;
...@@ -686,7 +692,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -686,7 +692,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
} }
} }
if (pConsumerOld && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && if (pExistedConsumer && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 &&
taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
/*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/ /*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/
/*pConsumerNew->updateType = */ /*pConsumerNew->updateType = */
...@@ -703,10 +709,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -703,10 +709,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
_over: _over:
mndTransDrop(pTrans); mndTransDrop(pTrans);
if (pConsumerOld) { if (pExistedConsumer) {
/*taosRUnLockLatch(&pConsumerOld->lock);*/ /*taosRUnLockLatch(&pExistedConsumer->lock);*/
mndReleaseConsumer(pMnode, pConsumerOld); mndReleaseConsumer(pMnode, pExistedConsumer);
} }
if (pConsumerNew) { if (pConsumerNew) {
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew); taosMemoryFree(pConsumerNew);
......
...@@ -655,6 +655,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, ...@@ -655,6 +655,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
taosArrayPush(pTrans->pRpcArray, &pReq->info); taosArrayPush(pTrans->pRpcArray, &pReq->info);
pTrans->originRpcType = pReq->msgType; pTrans->originRpcType = pReq->msgType;
} }
mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans); mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans);
return pTrans; return pTrans;
} }
......
...@@ -275,7 +275,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con ...@@ -275,7 +275,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
char buf2[80] = {0}; char buf2[80] = {0};
tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s", tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, reqOffset:%s, rspOffset:%s",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
return 0; return 0;
...@@ -336,8 +336,7 @@ int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, co ...@@ -336,8 +336,7 @@ int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, co
char buf2[80] = {0}; char buf2[80] = {0};
tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s",
" (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
return 0; return 0;
...@@ -496,14 +495,15 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -496,14 +495,15 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
// update epoch if need // update epoch if need
int32_t consumerEpoch = atomic_load_32(&pHandle->epoch); int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
while (consumerEpoch < reqEpoch) { while (savedEpoch < reqEpoch) {
consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch); tqDebug("tmq poll: consumer:0x%"PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch);
} }
char buf[80]; char buf[80];
tFormatOffset(buf, 80, &reqOffset); tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId, tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s", consumerId,
req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf); req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
// 2.reset offset if needed // 2.reset offset if needed
...@@ -539,7 +539,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -539,7 +539,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType); tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
tqDebug("tmq poll: consumer:0x %" PRIx64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId, tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId,
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version); pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) { if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
code = -1; code = -1;
...@@ -576,6 +576,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -576,6 +576,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
#if 1 #if 1
// till now, all data has been rsp to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version) { dataRsp.reqOffset.version == dataRsp.rspOffset.version) {
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
...@@ -588,8 +589,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -588,8 +589,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
pPushEntry->dataRsp.head.epoch = reqEpoch; pPushEntry->dataRsp.head.epoch = reqEpoch;
pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*)); taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*));
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d save handle to push mgr", consumerId,
pHandle->subKey, TD_VID(pTq->pVnode)); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr",
consumerId, pHandle->subKey, dataRsp.reqOffset.version, TD_VID(pTq->pVnode));
// unlock // unlock
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->pushLock);
return 0; return 0;
...@@ -602,8 +604,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -602,8 +604,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
code = -1; code = -1;
} }
tqDebug("tmq poll: consumer:0x%" PRIx64 tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp data block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts); dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
...@@ -664,11 +665,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -664,11 +665,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
walSetReaderCapacity(pHandle->pWalReader, 2048); walSetReaderCapacity(pHandle->pWalReader, 2048);
while (1) { while (1) {
consumerEpoch = atomic_load_32(&pHandle->epoch); savedEpoch = atomic_load_32(&pHandle->epoch);
if (consumerEpoch > reqEpoch) { if (savedEpoch > reqEpoch) {
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64 tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d", ", found new consumer epoch %d, discard req epoch %d",
consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, savedEpoch, reqEpoch);
break; break;
} }
......
...@@ -197,8 +197,8 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { ...@@ -197,8 +197,8 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
return -1; return -1;
} }
tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey), tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 "epoch:%d vgId:%d", pHandle->subKey,
pHandle->consumerId, TD_VID(pTq->pVnode)); (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode));
void* buf = taosMemoryCalloc(1, vlen); void* buf = taosMemoryCalloc(1, vlen);
if (buf == NULL) { if (buf == NULL) {
......
...@@ -193,7 +193,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ ...@@ -193,7 +193,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo)); memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo));
taosWUnLockLatch(&pHandle->pushHandle.lock); taosWUnLockLatch(&pHandle->pushHandle.lock);
tqDebug("vgId:%d, offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64, tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64,
TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum, TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum,
rsp.reqOffset, rsp.rspOffset); rsp.reqOffset, rsp.rspOffset);
...@@ -210,14 +210,15 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -210,14 +210,15 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
int32_t len = msgLen - sizeof(SSubmitReq2Msg); int32_t len = msgLen - sizeof(SSubmitReq2Msg);
tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver, tqDebug("vgId:%d tq push msg version:%" PRId64 " type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver,
TMSG_INFO(msgType), msg, pReq, len); TMSG_INFO(msgType), msg, pReq, len);
if (msgType == TDMT_VND_SUBMIT) { if (msgType == TDMT_VND_SUBMIT) {
// lock push mgr to avoid potential msg lost // lock push mgr to avoid potential msg lost
taosWLockLatch(&pTq->pushLock); taosWLockLatch(&pTq->pushLock);
tqDebug("vgId:%d, push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr));
if (taosHashGetSize(pTq->pPushMgr) != 0) { if (taosHashGetSize(pTq->pPushMgr) != 0) {
tqDebug("vgId:%d, push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr));
SArray* cachedKeys = taosArrayInit(0, sizeof(void*)); SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t)); SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
void* data = taosMemoryMalloc(len); void* data = taosMemoryMalloc(len);
...@@ -241,11 +242,13 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -241,11 +242,13 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
tqDebug("vgId:%d, cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey); tqDebug("vgId:%d, cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey);
continue; continue;
} }
if (pPushEntry->dataRsp.reqOffset.version >= ver) { if (pPushEntry->dataRsp.reqOffset.version >= ver) {
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip",
pTq->pVnode->config.vgId, pPushEntry->dataRsp.reqOffset.version, ver); pTq->pVnode->config.vgId, pPushEntry->dataRsp.reqOffset.version, ver);
continue; continue;
} }
STqExecHandle* pExec = &pHandle->execHandle; STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task; qTaskInfo_t task = pExec->task;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册