diff --git a/include/common/ttime.h b/include/common/ttime.h index eaf44c2771e2ba4512db63bed2f398a2e613bc34..4a7c47d1725dcc3dd1fde1e5c17a021e992156fb 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -80,15 +80,15 @@ int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision); int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit, int32_t timePrecision); -int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth); +int32_t taosParseTime(const char* timestr, int64_t* pTime, int32_t len, int32_t timePrec, int8_t dayligth); void deltaToUtcInitOnce(); char getPrecisionUnit(int32_t precision); -int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision); -int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit); +int64_t convertTimePrecision(int64_t ts, int32_t fromPrecision, int32_t toPrecision); +int64_t convertTimeFromPrecisionToUnit(int64_t ts, int32_t fromPrecision, char toUnit); int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec, int64_t* timeVal); -void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t time, int32_t precision); +void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t ts, int32_t precision); #ifdef __cplusplus } diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 559ffd2aaf24e3628110a6f5418cd7d8094ed397..7996498d451f95df794fab75c644eed044d37c4b 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -68,12 +68,12 @@ static int64_t user_mktime64(const uint32_t year0, const uint32_t mon0, const ui // ==== mktime() kernel code =================// static int64_t m_deltaUtc = 0; -void deltaToUtcInitOnce() { - struct tm tm = {0}; - (void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm); - m_deltaUtc = (int64_t)taosMktime(&tm); - // printf("====delta:%lld\n\n", seconds); +void deltaToUtcInitOnce() { + struct tm tm = {0}; + (void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm); + m_deltaUtc = (int64_t)taosMktime(&tm); + // printf("====delta:%lld\n\n", seconds); } static int64_t parseFraction(char* str, char** end, int32_t timePrec); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 8fd52d52c68bc26368af321ff7c8f274a46889bb..6621b6a3f9e0efe1536172607441c11e34eeacd6 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -559,6 +559,27 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj return 0; } +static int32_t validateTopics(const SArray* pTopicList, SMnode* pMnode, const char* pUser) { + int32_t numOfTopics = taosArrayGetSize(pTopicList); + + for (int32_t i = 0; i < numOfTopics; i++) { + char *pOneTopic = taosArrayGetP(pTopicList, i); + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic); + if (pTopic == NULL) { // terrno has been set by callee function + return -1; + } + + if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) { + mndReleaseTopic(pMnode, pTopic); + return -1; + } + + mndReleaseTopic(pMnode, pTopic); + } + + return 0; +} + int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; char *msgStr = pMsg->pCont; @@ -568,15 +589,15 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { uint64_t consumerId = subscribe.consumerId; char *cgroup = subscribe.cgroup; - SMqConsumerObj *pConsumerOld = NULL; + SMqConsumerObj *pExistedConsumer = NULL; SMqConsumerObj *pConsumerNew = NULL; int32_t code = -1; - SArray *newSub = subscribe.topicNames; - taosArraySort(newSub, taosArrayCompareString); - taosArrayRemoveDuplicateP(newSub, taosArrayCompareString, taosMemoryFree); + SArray *pTopicList = subscribe.topicNames; + taosArraySort(pTopicList, taosArrayCompareString); + taosArrayRemoveDuplicateP(pTopicList, taosArrayCompareString, taosMemoryFree); - int32_t newTopicNum = taosArrayGetSize(newSub); + int32_t newTopicNum = taosArrayGetSize(pTopicList); // check topic existence STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); @@ -584,38 +605,24 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { goto _over; } - for (int32_t i = 0; i < newTopicNum; i++) { - char *topic = taosArrayGetP(newSub, i); - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); - if (pTopic == NULL) { // terrno has been set by callee function - goto _over; - } - - if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) { - mndReleaseTopic(pMnode, pTopic); - goto _over; - } - - if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) { - goto _over; - } - - mndReleaseTopic(pMnode, pTopic); + code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user); + if (code != TSDB_CODE_SUCCESS) { + goto _over; } - pConsumerOld = mndAcquireConsumer(pMnode, consumerId); - if (pConsumerOld == NULL) { - mInfo("receive subscribe request from new consumer:%" PRId64, consumerId); + pExistedConsumer = mndAcquireConsumer(pMnode, consumerId); + if (pExistedConsumer == NULL) { + mInfo("receive subscribe request from new consumer:0x%" PRIx64" cgroup:%s", consumerId, subscribe.cgroup); pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256); pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; taosArrayDestroy(pConsumerNew->rebNewTopics); - pConsumerNew->rebNewTopics = newSub; + pConsumerNew->rebNewTopics = pTopicList; // all subscribe topics should re-balance. subscribe.topicNames = NULL; for (int32_t i = 0; i < newTopicNum; i++) { - char *newTopicCopy = taosStrdup(taosArrayGetP(newSub, i)); + char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, i)); taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy); } @@ -623,12 +630,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; } else { - /*taosRLockLatch(&pConsumerOld->lock);*/ - - int32_t status = atomic_load_32(&pConsumerOld->status); + /*taosRLockLatch(&pExistedConsumer->lock);*/ + int32_t status = atomic_load_32(&pExistedConsumer->status); - mInfo("receive subscribe request from existing consumer:%" PRId64 ", current status: %s, subscribe topic num: %d", - consumerId, mndConsumerStatusName(status), newTopicNum); + mInfo("receive subscribe request from existed consumer:0x%" PRIx64 " cgroup:%s, current status:%d(%s), subscribe topic num: %d", + consumerId, subscribe.cgroup, status,mndConsumerStatusName(status), newTopicNum); if (status != MQ_CONSUMER_STATUS__READY) { terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; @@ -637,36 +643,36 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); if (pConsumerNew == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; goto _over; } + pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; for (int32_t i = 0; i < newTopicNum; i++) { - char *newTopicCopy = taosStrdup(taosArrayGetP(newSub, i)); + char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, i)); taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy); } int32_t oldTopicNum = 0; - if (pConsumerOld->currentTopics) { - oldTopicNum = taosArrayGetSize(pConsumerOld->currentTopics); + if (pExistedConsumer->currentTopics) { + oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics); } int32_t i = 0, j = 0; while (i < oldTopicNum || j < newTopicNum) { if (i >= oldTopicNum) { - char *newTopicCopy = taosStrdup(taosArrayGetP(newSub, j)); + char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j)); taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy); j++; continue; } else if (j >= newTopicNum) { - char *oldTopicCopy = taosStrdup(taosArrayGetP(pConsumerOld->currentTopics, i)); + char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i)); taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy); i++; continue; } else { - char *oldTopic = taosArrayGetP(pConsumerOld->currentTopics, i); - char *newTopic = taosArrayGetP(newSub, j); + char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i); + char *newTopic = taosArrayGetP(pTopicList, j); int comp = strcmp(oldTopic, newTopic); if (comp == 0) { i++; @@ -686,7 +692,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { } } - if (pConsumerOld && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && + if (pExistedConsumer && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { /*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/ /*pConsumerNew->updateType = */ @@ -703,10 +709,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { _over: mndTransDrop(pTrans); - if (pConsumerOld) { - /*taosRUnLockLatch(&pConsumerOld->lock);*/ - mndReleaseConsumer(pMnode, pConsumerOld); + if (pExistedConsumer) { + /*taosRUnLockLatch(&pExistedConsumer->lock);*/ + mndReleaseConsumer(pMnode, pExistedConsumer); } + if (pConsumerNew) { tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index cfe7b643a90c4b492738aaa64930f7b51788df10..a34dfff4d6ec679e9ddd450df4efa465ed3fc7eb 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -655,6 +655,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, taosArrayPush(pTrans->pRpcArray, &pReq->info); pTrans->originRpcType = pReq->msgType; } + mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans); return pTrans; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 284d2129cf9de8e81717a95d06759d77683e4a19..c9bbb30380a23be201eea3969b0b09beb580b0ed 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -275,7 +275,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con char buf2[80] = {0}; tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s", + tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, reqOffset:%s, rspOffset:%s", TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); return 0; @@ -336,8 +336,7 @@ int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, co char buf2[80] = {0}; tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 - " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s", + tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s", TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); return 0; @@ -496,14 +495,15 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } // update epoch if need - int32_t consumerEpoch = atomic_load_32(&pHandle->epoch); - while (consumerEpoch < reqEpoch) { - consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch); + int32_t savedEpoch = atomic_load_32(&pHandle->epoch); + while (savedEpoch < reqEpoch) { + tqDebug("tmq poll: consumer:0x%"PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch); + savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch); } char buf[80]; tFormatOffset(buf, 80, &reqOffset); - tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId, + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s", consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf); // 2.reset offset if needed @@ -539,7 +539,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType); tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); - tqDebug("tmq poll: consumer:0x %" PRIx64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId, + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version); if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) { code = -1; @@ -576,6 +576,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } #if 1 + // till now, all data has been rsp to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version) { STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); @@ -588,8 +589,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { pPushEntry->dataRsp.head.epoch = reqEpoch; pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*)); - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d save handle to push mgr", consumerId, - pHandle->subKey, TD_VID(pTq->pVnode)); + + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr", + consumerId, pHandle->subKey, dataRsp.reqOffset.version, TD_VID(pTq->pVnode)); // unlock taosWUnLockLatch(&pTq->pushLock); return 0; @@ -602,8 +604,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { code = -1; } - tqDebug("tmq poll: consumer:0x%" PRIx64 - ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp data block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, dataRsp.rspOffset.ts); @@ -664,11 +665,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { walSetReaderCapacity(pHandle->pWalReader, 2048); while (1) { - consumerEpoch = atomic_load_32(&pHandle->epoch); - if (consumerEpoch > reqEpoch) { + savedEpoch = atomic_load_32(&pHandle->epoch); + if (savedEpoch > reqEpoch) { tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64 ", found new consumer epoch %d, discard req epoch %d", - consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); + consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, savedEpoch, reqEpoch); break; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index c505a7d0ae58fe41e3cb663a42789e5b57de3b36..12fd15c63a5088a8da2ceb4de9b57f653976bc8a 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -197,8 +197,8 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { return -1; } - tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey), - pHandle->consumerId, TD_VID(pTq->pVnode)); + tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 "epoch:%d vgId:%d", pHandle->subKey, + (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode)); void* buf = taosMemoryCalloc(1, vlen); if (buf == NULL) { diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index cc1f147ac28b3d6722ca1e132fd0191d3bef41c4..e74726227794b2aab7072e81203e1747f2eb4ea7 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -193,7 +193,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo)); taosWUnLockLatch(&pHandle->pushHandle.lock); - tqDebug("vgId:%d, offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64, + tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64, TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset); @@ -210,14 +210,15 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); int32_t len = msgLen - sizeof(SSubmitReq2Msg); - tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver, + tqDebug("vgId:%d tq push msg version:%" PRId64 " type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType), msg, pReq, len); if (msgType == TDMT_VND_SUBMIT) { // lock push mgr to avoid potential msg lost taosWLockLatch(&pTq->pushLock); - tqDebug("vgId:%d, push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr)); if (taosHashGetSize(pTq->pPushMgr) != 0) { + + tqDebug("vgId:%d, push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr)); SArray* cachedKeys = taosArrayInit(0, sizeof(void*)); SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t)); void* data = taosMemoryMalloc(len); @@ -241,11 +242,13 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) tqDebug("vgId:%d, cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey); continue; } + if (pPushEntry->dataRsp.reqOffset.version >= ver) { tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", pTq->pVnode->config.vgId, pPushEntry->dataRsp.reqOffset.version, ver); continue; } + STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 366b026f8d2af86643d218f1a74ea484844b5b84..a158bbfa8dac8d31c61015b72128b664caca622e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -155,6 +155,7 @@ typedef struct SBlockInfoBuf { int32_t currentIndex; SArray* pData; int32_t numPerBucket; + int32_t numOfTables; } SBlockInfoBuf; struct STsdbReader { @@ -302,6 +303,47 @@ static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { taosArrayPush(pBuf->pData, &p); } + pBuf->numOfTables = numOfTables; + + return TSDB_CODE_SUCCESS; +} + +static int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { + if (numOfTables <= pBuf->numOfTables) { + return TSDB_CODE_SUCCESS; + } + + if (pBuf->numOfTables > 0) { + STableBlockScanInfo **p = (STableBlockScanInfo**)taosArrayPop(pBuf->pData); + taosMemoryFree(*p); + pBuf->numOfTables /= pBuf->numPerBucket; + } + + int32_t num = (numOfTables - pBuf->numOfTables) / pBuf->numPerBucket; + int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket; + if (pBuf->pData == NULL) { + pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES); + } + + for (int32_t i = 0; i < num; ++i) { + char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo)); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(pBuf->pData, &p); + } + + if (remainder > 0) { + char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo)); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + taosArrayPush(pBuf->pData, &p); + } + + pBuf->numOfTables = numOfTables; + return TSDB_CODE_SUCCESS; } @@ -3876,8 +3918,13 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n clearBlockScanInfo(*p); } - // todo handle the case where size is less than the value of num - ASSERT(size >= num); + if (size < num) { + int32_t code = ensureBlockScanInfoBuf(&pReader->blockInfoBuf, num); + if (code) { + return code; + } + pReader->status.uidList.tableUidList = (uint64_t*)taosMemoryRealloc(pReader->status.uidList.tableUidList, sizeof(uint64_t) * num); + } taosHashClear(pReader->status.pTableMap); STableUidList* pUidList = &pReader->status.uidList; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 797983bbbda40c0d67eb3c490e039ed598e4a263..d708227cbac0ec83e0667c1b8ace43507de399bf 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -183,6 +183,7 @@ ,,y,script,./test.sh -f tsim/query/event.sim ,,y,script,./test.sh -f tsim/query/forceFill.sim ,,y,script,./test.sh -f tsim/query/emptyTsRange.sim +,,y,script,./test.sh -f tsim/query/partitionby.sim ,,y,script,./test.sh -f tsim/qnode/basic1.sim ,,y,script,./test.sh -f tsim/snode/basic1.sim ,,y,script,./test.sh -f tsim/mnode/basic1.sim diff --git a/tests/script/tsim/query/partitionby.sim b/tests/script/tsim/query/partitionby.sim new file mode 100644 index 0000000000000000000000000000000000000000..8babd1aa8de8e0ad943312e6712d297c3c5242b4 --- /dev/null +++ b/tests/script/tsim/query/partitionby.sim @@ -0,0 +1,39 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +$dbPrefix = db +$tbPrefix1 = tba +$tbPrefix2 = tbb +$mtPrefix = stb +$tbNum = 10 +$rowNum = 2 + +print =============== step1 +$i = 0 +$db = $dbPrefix . $i +$mt1 = $mtPrefix . $i +$i = 1 +$mt2 = $mtPrefix . $i + +sql drop database $db -x step1 +step1: +sql create database $db vgroups 3 +sql use $db +sql create table $mt1 (ts timestamp, f1 int) TAGS(tag1 int, tag2 binary(500)) +sql create table tb0 using $mt1 tags(0, 'a'); +sql create table tb1 using $mt1 tags(1, 'b'); +sql create table tb2 using $mt1 tags(1, 'a'); +sql create table tb3 using $mt1 tags(1, 'a'); +sql create table tb4 using $mt1 tags(3, 'b'); +sql create table tb5 using $mt1 tags(3, 'a'); +sql create table tb6 using $mt1 tags(3, 'b'); +sql create table tb7 using $mt1 tags(3, 'b'); + +sql select * from $mt1 partition by tag1,tag2 limit 1; +if $rows != 0 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT