提交 d6fca036 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 de0f55e8
......@@ -80,15 +80,15 @@ int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char
int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision);
int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit, int32_t timePrecision);
int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
int32_t taosParseTime(const char* timestr, int64_t* pTime, int32_t len, int32_t timePrec, int8_t dayligth);
void deltaToUtcInitOnce();
char getPrecisionUnit(int32_t precision);
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision);
int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit);
int64_t convertTimePrecision(int64_t ts, int32_t fromPrecision, int32_t toPrecision);
int64_t convertTimeFromPrecisionToUnit(int64_t ts, int32_t fromPrecision, char toUnit);
int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec, int64_t* timeVal);
void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t time, int32_t precision);
void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t ts, int32_t precision);
#ifdef __cplusplus
}
......
......@@ -68,9 +68,9 @@ static int64_t user_mktime64(const uint32_t year0, const uint32_t mon0, const ui
// ==== mktime() kernel code =================//
static int64_t m_deltaUtc = 0;
void deltaToUtcInitOnce() {
struct tm tm = {0};
(void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm);
m_deltaUtc = (int64_t)taosMktime(&tm);
// printf("====delta:%lld\n\n", seconds);
......
......@@ -557,6 +557,27 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj
return 0;
}
static int32_t validateTopics(const SArray* pTopicList, SMnode* pMnode, const char* pUser) {
int32_t numOfTopics = taosArrayGetSize(pTopicList);
for (int32_t i = 0; i < numOfTopics; i++) {
char *pOneTopic = taosArrayGetP(pTopicList, i);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
if (pTopic == NULL) { // terrno has been set by callee function
return -1;
}
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
mndReleaseTopic(pMnode, pTopic);
return -1;
}
mndReleaseTopic(pMnode, pTopic);
}
return 0;
}
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
char *msgStr = pMsg->pCont;
......@@ -570,11 +591,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumerNew = NULL;
int32_t code = -1;
SArray *newSub = subscribe.topicNames;
taosArraySort(newSub, taosArrayCompareString);
taosArrayRemoveDuplicateP(newSub, taosArrayCompareString, taosMemoryFree);
SArray *pTopicList = subscribe.topicNames;
taosArraySort(pTopicList, taosArrayCompareString);
taosArrayRemoveDuplicateP(pTopicList, taosArrayCompareString, taosMemoryFree);
int32_t newTopicNum = taosArrayGetSize(newSub);
int32_t newTopicNum = taosArrayGetSize(pTopicList);
// check topic existence
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
......@@ -582,34 +603,24 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
goto _over;
}
for (int32_t i = 0; i < newTopicNum; i++) {
char *topic = taosArrayGetP(newSub, i);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if (pTopic == NULL) { // terrno has been set by callee function
code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user);
if (code != TSDB_CODE_SUCCESS) {
goto _over;
}
if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) {
mndReleaseTopic(pMnode, pTopic);
goto _over;
}
mndReleaseTopic(pMnode, pTopic);
}
pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
if (pConsumerOld == NULL) {
mInfo("receive subscribe request from new consumer:%" PRId64, consumerId);
mInfo("receive subscribe request from new consumer:0x%" PRIx64" cgroup:%s", consumerId, subscribe.cgroup);
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
taosArrayDestroy(pConsumerNew->rebNewTopics);
pConsumerNew->rebNewTopics = newSub;
pConsumerNew->rebNewTopics = pTopicList; // all subscribe topics should re-balance.
subscribe.topicNames = NULL;
for (int32_t i = 0; i < newTopicNum; i++) {
char *newTopicCopy = strdup(taosArrayGetP(newSub, i));
char *newTopicCopy = strdup(taosArrayGetP(pTopicList, i));
taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
}
......@@ -621,7 +632,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t status = atomic_load_32(&pConsumerOld->status);
mInfo("receive subscribe request from existing consumer:%" PRId64 ", current status: %s, subscribe topic num: %d",
mInfo("receive subscribe request from existing consumer:0x%" PRIx64 ", current status: %s, subscribe topic num: %d",
consumerId, mndConsumerStatusName(status), newTopicNum);
if (status != MQ_CONSUMER_STATUS__READY) {
......@@ -637,7 +648,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
for (int32_t i = 0; i < newTopicNum; i++) {
char *newTopicCopy = strdup(taosArrayGetP(newSub, i));
char *newTopicCopy = strdup(taosArrayGetP(pTopicList, i));
taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
}
......@@ -649,7 +660,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t i = 0, j = 0;
while (i < oldTopicNum || j < newTopicNum) {
if (i >= oldTopicNum) {
char *newTopicCopy = strdup(taosArrayGetP(newSub, j));
char *newTopicCopy = strdup(taosArrayGetP(pTopicList, j));
taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
j++;
continue;
......@@ -660,7 +671,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
continue;
} else {
char *oldTopic = taosArrayGetP(pConsumerOld->currentTopics, i);
char *newTopic = taosArrayGetP(newSub, j);
char *newTopic = taosArrayGetP(pTopicList, j);
int comp = compareLenPrefixedStr(oldTopic, newTopic);
if (comp == 0) {
i++;
......
......@@ -275,7 +275,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
char buf2[80] = {0};
tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, reqOffset:%s, rspOffset:%s",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
return 0;
......@@ -334,7 +334,7 @@ int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, co
char buf2[80] = {0};
tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
return 0;
......@@ -495,14 +495,15 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
}
// update epoch if need
int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
while (consumerEpoch < reqEpoch) {
consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
while (savedEpoch < reqEpoch) {
tqDebug("tmq poll: consumer:0x%"PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch);
}
char buf[80];
tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s", consumerId,
req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
// 2.reset offset if needed
......@@ -538,7 +539,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
tqDebug("tmq poll: consumer:0x %" PRIx64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId,
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId,
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
code = -1;
......@@ -573,6 +574,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);
#if 1
// till now, all data has been rsp to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version) {
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
......@@ -585,8 +587,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
pPushEntry->dataRsp.head.epoch = reqEpoch;
pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*));
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d save handle to push mgr", consumerId, pHandle->subKey,
TD_VID(pTq->pVnode));
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr",
consumerId, pHandle->subKey, dataRsp.reqOffset.version, TD_VID(pTq->pVnode));
// unlock
taosWUnLockLatch(&pTq->pushLock);
return 0;
......@@ -599,7 +602,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
code = -1;
}
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp data block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
......@@ -658,11 +661,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
walSetReaderCapacity(pHandle->pWalReader, 2048);
while (1) {
consumerEpoch = atomic_load_32(&pHandle->epoch);
if (consumerEpoch > reqEpoch) {
savedEpoch = atomic_load_32(&pHandle->epoch);
if (savedEpoch > reqEpoch) {
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d",
consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, savedEpoch, reqEpoch);
break;
}
......
......@@ -195,8 +195,8 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
int32_t vlen;
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey),
pHandle->consumerId, TD_VID(pTq->pVnode));
tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 "epoch:%d vgId:%d", pHandle->subKey,
(int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode));
void* buf = taosMemoryCalloc(1, vlen);
if (buf == NULL) {
......
......@@ -199,7 +199,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo));
taosWUnLockLatch(&pHandle->pushHandle.lock);
tqDebug("vgId:%d, offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64,
tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64,
TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum,
rsp.reqOffset, rsp.rspOffset);
......@@ -213,13 +213,14 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType));
tqDebug("vgId:%d tq push msg version:%" PRId64 " type: %s", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType));
if (msgType == TDMT_VND_SUBMIT) {
// lock push mgr to avoid potential msg lost
taosWLockLatch(&pTq->pushLock);
tqDebug("vgId:%d, push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr));
if (taosHashGetSize(pTq->pPushMgr) != 0) {
tqDebug("vgId:%d, push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr));
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
void* data = taosMemoryMalloc(msgLen);
......@@ -245,11 +246,13 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
tqDebug("vgId:%d, cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey);
continue;
}
if (pPushEntry->dataRsp.reqOffset.version >= ver) {
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip",
pTq->pVnode->config.vgId, pPushEntry->dataRsp.reqOffset.version, ver);
continue;
}
STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册