提交 3e4125ae 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/main' into fix/TD-21446

...@@ -91,6 +91,7 @@ typedef struct SCommitInfo SCommitInfo; ...@@ -91,6 +91,7 @@ typedef struct SCommitInfo SCommitInfo;
// vnd.h // vnd.h
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
void* vnodeBufPoolMallocAligned(SVBufPool* pPool, int size);
void vnodeBufPoolFree(SVBufPool* pPool, void* p); void vnodeBufPoolFree(SVBufPool* pPool, void* p);
void vnodeBufPoolRef(SVBufPool* pPool); void vnodeBufPoolRef(SVBufPool* pPool);
void vnodeBufPoolUnRef(SVBufPool* pPool); void vnodeBufPoolUnRef(SVBufPool* pPool);
......
...@@ -54,6 +54,7 @@ struct SMetaCache { ...@@ -54,6 +54,7 @@ struct SMetaCache {
// query cache // query cache
struct STagFilterResCache { struct STagFilterResCache {
TdThreadMutex lock;
SHashObj* pTableEntry; SHashObj* pTableEntry;
SLRUCache* pUidResCache; SLRUCache* pUidResCache;
uint64_t keyBuf[3]; uint64_t keyBuf[3];
...@@ -140,6 +141,8 @@ int32_t metaCacheOpen(SMeta* pMeta) { ...@@ -140,6 +141,8 @@ int32_t metaCacheOpen(SMeta* pMeta) {
} }
taosHashSetFreeFp(pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp); taosHashSetFreeFp(pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
taosThreadMutexInit(&pCache->sTagFilterResCache.lock, NULL);
pMeta->pCache = pCache; pMeta->pCache = pCache;
return code; return code;
...@@ -159,6 +162,8 @@ void metaCacheClose(SMeta* pMeta) { ...@@ -159,6 +162,8 @@ void metaCacheClose(SMeta* pMeta) {
taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry); taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache); taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
taosMemoryFree(pMeta->pCache); taosMemoryFree(pMeta->pCache);
pMeta->pCache = NULL; pMeta->pCache = NULL;
} }
...@@ -422,63 +427,78 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) { ...@@ -422,63 +427,78 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
bool* acquireRes) { bool* acquireRes) {
uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf;
// generate the composed key for LRU cache // generate the composed key for LRU cache
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache; SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf;
SHashObj* pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
uint32_t times = 0;
*acquireRes = 0;
pBuf[0] = suid; pBuf[0] = suid;
memcpy(&pBuf[1], pKey, keyLen); memcpy(&pBuf[1], pKey, keyLen);
taosThreadMutexLock(pLock);
int32_t len = keyLen + sizeof(uint64_t); int32_t len = keyLen + sizeof(uint64_t);
LRUHandle* pHandle = taosLRUCacheLookup(pCache, pBuf, len); LRUHandle* pHandle = taosLRUCacheLookup(pCache, pBuf, len);
if (pHandle == NULL) { if (pHandle == NULL) {
*acquireRes = 0; taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // do some book mark work after acquiring the filter result from cache }
STagFilterResEntry** pEntry = taosHashGet(pMeta->pCache->sTagFilterResCache.pTableEntry, &suid, sizeof(uint64_t));
ASSERT(pEntry != NULL);
*acquireRes = 1;
const char* p = taosLRUCacheValue(pMeta->pCache->sTagFilterResCache.pUidResCache, pHandle);
int32_t size = *(int32_t*)p;
taosArrayAddBatch(pList1, p + sizeof(int32_t), size);
(*pEntry)->qTimes += 1;
taosLRUCacheRelease(pCache, pHandle, false);
// check if scanning all items are necessary or not
if ((*pEntry)->qTimes >= 5000 && TD_DLIST_NELES(&(*pEntry)->list) > 10) {
SArray* pList = taosArrayInit(64, POINTER_BYTES);
SListIter iter = {0};
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
memcpy(&pBuf[1], pNode->data, keyLen);
// check whether it is existed in LRU cache, and remove it from linked list if not.
LRUHandle* pRes = taosLRUCacheLookup(pCache, pBuf, len);
if (pRes == NULL) { // remove the item in the linked list
taosArrayPush(pList, &pNode);
} else {
taosLRUCacheRelease(pCache, pRes, false);
}
}
// remove the keys, of which query uid lists have been replaced already. // do some book mark work after acquiring the filter result from cache
size_t s = taosArrayGetSize(pList); STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
for (int32_t i = 0; i < s; ++i) { ASSERT(pEntry != NULL);
SListNode** p1 = taosArrayGet(pList, i); *acquireRes = 1;
tdListPopNode(&(*pEntry)->list, *p1);
taosMemoryFree(*p1);
}
(*pEntry)->qTimes = 0; // reset the query times const char* p = taosLRUCacheValue(pCache, pHandle);
int32_t size = *(int32_t*)p;
taosArrayDestroy(pList); // set the result into the buffer
taosArrayAddBatch(pList1, p + sizeof(int32_t), size);
times = atomic_add_fetch_32(&(*pEntry)->qTimes, 1);
taosLRUCacheRelease(pCache, pHandle, false);
// unlock meta
taosThreadMutexUnlock(pLock);
// check if scanning all items are necessary or not
if (times >= 5000 && TD_DLIST_NELES(&(*pEntry)->list) > 10) {
taosThreadMutexLock(pLock);
SArray* pInvalidRes = taosArrayInit(64, POINTER_BYTES);
SListIter iter = {0};
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
memcpy(&pBuf[1], pNode->data, keyLen);
// check whether it is existed in LRU cache, and remove it from linked list if not.
LRUHandle* pRes = taosLRUCacheLookup(pCache, pBuf, len);
if (pRes == NULL) { // remove the item in the linked list
taosArrayPush(pInvalidRes, &pNode);
} else {
taosLRUCacheRelease(pCache, pRes, false);
}
} }
// remove the keys, of which query uid lists have been replaced already.
size_t s = taosArrayGetSize(pInvalidRes);
for (int32_t i = 0; i < s; ++i) {
SListNode** p1 = taosArrayGet(pInvalidRes, i);
tdListPopNode(&(*pEntry)->list, *p1);
taosMemoryFree(*p1);
}
atomic_store_32(&(*pEntry)->qTimes, 0); // reset the query times
taosArrayDestroy(pInvalidRes);
taosThreadMutexUnlock(pLock);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -510,8 +530,11 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int ...@@ -510,8 +530,11 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache; SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry; SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
taosThreadMutexLock(pLock);
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t)); STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
if (pEntry == NULL) { if (pEntry == NULL) {
...@@ -533,6 +556,9 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int ...@@ -533,6 +556,9 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
// add to cache. // add to cache.
taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL, taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL,
TAOS_LRU_PRIORITY_LOW); TAOS_LRU_PRIORITY_LOW);
taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), suid, metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), suid,
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry)); (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
...@@ -541,15 +567,19 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int ...@@ -541,15 +567,19 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables // remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) { int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
int32_t keyLen = sizeof(uint64_t) * 3;
uint64_t p[3] = {0};
p[0] = suid;
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
taosThreadMutexLock(pLock);
STagFilterResEntry** pEntry = taosHashGet(pMeta->pCache->sTagFilterResCache.pTableEntry, &suid, sizeof(uint64_t)); STagFilterResEntry** pEntry = taosHashGet(pMeta->pCache->sTagFilterResCache.pTableEntry, &suid, sizeof(uint64_t));
if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) { if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t keyLen = sizeof(uint64_t) * 3;
uint64_t p[3] = {0};
p[0] = suid;
SListIter iter = {0}; SListIter iter = {0};
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD); tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
...@@ -562,5 +592,6 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) { ...@@ -562,5 +592,6 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
(*pEntry)->qTimes = 0; (*pEntry)->qTimes = 0;
tdListEmpty(&(*pEntry)->list); tdListEmpty(&(*pEntry)->list);
taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -15,8 +15,10 @@ ...@@ -15,8 +15,10 @@
#include "meta.h" #include "meta.h"
static FORCE_INLINE void *metaMalloc(void *pPool, size_t size) { return vnodeBufPoolMalloc((SVBufPool *)pPool, size); } static FORCE_INLINE void *metaMalloc(void *pPool, size_t size) {
static FORCE_INLINE void metaFree(void *pPool, void *p) { vnodeBufPoolFree((SVBufPool *)pPool, p); } return vnodeBufPoolMallocAligned((SVBufPool *)pPool, size);
}
static FORCE_INLINE void metaFree(void *pPool, void *p) { vnodeBufPoolFree((SVBufPool *)pPool, p); }
// begin a meta txn // begin a meta txn
int metaBegin(SMeta *pMeta, int8_t heap) { int metaBegin(SMeta *pMeta, int8_t heap) {
......
...@@ -158,9 +158,9 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { ...@@ -158,9 +158,9 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
pBlock, el, idStr); pBlock, el, idStr);
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk; pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
tsdbDebug("last block index list:%d, %d, %s", pInfo->blockIndex[0], pInfo->blockIndex[1], idStr);
pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1; pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow, idStr);
return &pInfo->blockData[pInfo->currentLoadBlockIndex]; return &pInfo->blockData[pInfo->currentLoadBlockIndex];
_exit: _exit:
...@@ -419,6 +419,7 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) { ...@@ -419,6 +419,7 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
pBlockData->aUid != NULL) { pBlockData->aUid != NULL) {
i = binarySearchForStartRowIndex((uint64_t *)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward); i = binarySearchForStartRowIndex((uint64_t *)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward);
if (i == -1) { if (i == -1) {
tsdbDebug("failed to find the data in pBlockData, uid:%"PRIu64" , %s", pIter->uid, idStr);
pIter->iRow = -1; pIter->iRow = -1;
return; return;
} }
...@@ -500,7 +501,12 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { ...@@ -500,7 +501,12 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
if (iBlockL != pIter->iSttBlk) { if (iBlockL != pIter->iSttBlk) {
pBlockData = loadLastBlock(pIter, idStr); pBlockData = loadLastBlock(pIter, idStr);
pIter->iRow += step; if (pBlockData == NULL) {
goto _exit;
}
// set start row index
pIter->iRow = pIter->backward? pBlockData->nRow-1:0;
} }
} }
......
...@@ -3522,6 +3522,8 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p ...@@ -3522,6 +3522,8 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
} }
int32_t code = tRowMergerGetRow(&merge, pTSRow); int32_t code = tRowMergerGetRow(&merge, pTSRow);
tRowMergerClear(&merge);
return code; return code;
} }
......
...@@ -35,7 +35,7 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) ...@@ -35,7 +35,7 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
return -1; return -1;
} }
if (taosThreadSpinInit(pPool->lock, 0) != 0) { if (taosThreadSpinInit(pPool->lock, 0) != 0) {
taosMemoryFree((void*)pPool->lock); taosMemoryFree((void *)pPool->lock);
taosMemoryFree(pPool); taosMemoryFree(pPool);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -62,7 +62,7 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) { ...@@ -62,7 +62,7 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) {
vnodeBufPoolReset(pPool); vnodeBufPoolReset(pPool);
if (pPool->lock) { if (pPool->lock) {
taosThreadSpinDestroy(pPool->lock); taosThreadSpinDestroy(pPool->lock);
taosMemoryFree((void*)pPool->lock); taosMemoryFree((void *)pPool->lock);
} }
taosMemoryFree(pPool); taosMemoryFree(pPool);
return 0; return 0;
...@@ -123,6 +123,46 @@ void vnodeBufPoolReset(SVBufPool *pPool) { ...@@ -123,6 +123,46 @@ void vnodeBufPoolReset(SVBufPool *pPool) {
pPool->ptr = pPool->node.data; pPool->ptr = pPool->node.data;
} }
void *vnodeBufPoolMallocAligned(SVBufPool *pPool, int size) {
SVBufPoolNode *pNode;
void *p = NULL;
uint8_t *ptr = NULL;
int paddingLen = 0;
ASSERT(pPool != NULL);
if (pPool->lock) taosThreadSpinLock(pPool->lock);
ptr = pPool->ptr;
paddingLen = (((long)ptr + 7) & ~7) - (long)ptr;
if (pPool->node.size >= pPool->ptr - pPool->node.data + size + paddingLen) {
// allocate from the anchor node
p = pPool->ptr + paddingLen;
size += paddingLen;
pPool->ptr = pPool->ptr + size;
pPool->size += size;
} else {
// allocate a new node
pNode = taosMemoryMalloc(sizeof(*pNode) + size);
if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
if (pPool->lock) taosThreadSpinUnlock(pPool->lock);
return NULL;
}
p = pNode->data;
pNode->size = size;
pNode->prev = pPool->pTail;
pNode->pnext = &pPool->pTail;
pPool->pTail->pnext = &pNode->prev;
pPool->pTail = pNode;
pPool->size = pPool->size + sizeof(*pNode) + size;
}
if (pPool->lock) taosThreadSpinUnlock(pPool->lock);
return p;
}
void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
SVBufPoolNode *pNode; SVBufPoolNode *pNode;
void *p = NULL; void *p = NULL;
......
...@@ -657,7 +657,6 @@ typedef struct SStreamFillOperatorInfo { ...@@ -657,7 +657,6 @@ typedef struct SStreamFillOperatorInfo {
SSDataBlock* pRes; SSDataBlock* pRes;
SSDataBlock* pSrcBlock; SSDataBlock* pSrcBlock;
int32_t srcRowIndex; int32_t srcRowIndex;
SSDataBlock* pPrevSrcBlock;
SSDataBlock* pSrcDelBlock; SSDataBlock* pSrcDelBlock;
int32_t srcDelRowIndex; int32_t srcDelRowIndex;
SSDataBlock* pDelRes; SSDataBlock* pDelRes;
......
...@@ -470,7 +470,6 @@ static void destroyStreamFillOperatorInfo(void* param) { ...@@ -470,7 +470,6 @@ static void destroyStreamFillOperatorInfo(void* param) {
pInfo->pFillSup = destroyStreamFillSupporter(pInfo->pFillSup); pInfo->pFillSup = destroyStreamFillSupporter(pInfo->pFillSup);
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
pInfo->pSrcBlock = blockDataDestroy(pInfo->pSrcBlock); pInfo->pSrcBlock = blockDataDestroy(pInfo->pSrcBlock);
pInfo->pPrevSrcBlock = blockDataDestroy(pInfo->pPrevSrcBlock);
pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes); pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes);
pInfo->matchInfo.pList = taosArrayDestroy(pInfo->matchInfo.pList); pInfo->matchInfo.pList = taosArrayDestroy(pInfo->matchInfo.pList);
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
...@@ -992,12 +991,6 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { ...@@ -992,12 +991,6 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
if (pInfo->srcRowIndex == 0) { if (pInfo->srcRowIndex == 0) {
keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
SSDataBlock* preBlock = pInfo->pPrevSrcBlock;
if (preBlock->info.rows > 0) {
int preRowId = preBlock->info.rows - 1;
SColumnInfoData* pPreTsCol = taosArrayGet(preBlock->pDataBlock, pInfo->primaryTsCol);
doFillResults(pOperator, pFillSup, pFillInfo, preBlock, (TSKEY*)pPreTsCol->pData, preRowId, pRes);
}
pInfo->srcRowIndex++; pInfo->srcRowIndex++;
} }
...@@ -1011,9 +1004,8 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { ...@@ -1011,9 +1004,8 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
} }
pInfo->srcRowIndex++; pInfo->srcRowIndex++;
} }
doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes);
blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol); blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol);
blockDataCleanup(pInfo->pPrevSrcBlock);
copyDataBlock(pInfo->pPrevSrcBlock, pInfo->pSrcBlock);
blockDataCleanup(pInfo->pSrcBlock); blockDataCleanup(pInfo->pSrcBlock);
} }
...@@ -1173,7 +1165,6 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) { ...@@ -1173,7 +1165,6 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) {
} }
static void resetStreamFillInfo(SStreamFillOperatorInfo* pInfo) { static void resetStreamFillInfo(SStreamFillOperatorInfo* pInfo) {
blockDataCleanup(pInfo->pPrevSrcBlock);
tSimpleHashClear(pInfo->pFillSup->pResMap); tSimpleHashClear(pInfo->pFillSup->pResMap);
pInfo->pFillSup->hasDelete = false; pInfo->pFillSup->hasDelete = false;
taosArrayClear(pInfo->pFillInfo->delRanges); taosArrayClear(pInfo->pFillInfo->delRanges);
...@@ -1231,13 +1222,6 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { ...@@ -1231,13 +1222,6 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
SSDataBlock* preBlock = pInfo->pPrevSrcBlock;
if (preBlock->info.rows > 0) {
int preRowId = preBlock->info.rows - 1;
SColumnInfoData* pPreTsCol = taosArrayGet(preBlock->pDataBlock, pInfo->primaryTsCol);
doFillResults(pOperator, pInfo->pFillSup, pInfo->pFillInfo, preBlock, (TSKEY*)pPreTsCol->pData, preRowId,
pInfo->pRes);
}
pInfo->pFillInfo->preRowKey = INT64_MIN; pInfo->pFillInfo->preRowKey = INT64_MIN;
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill"); printDataBlock(pInfo->pRes, "stream fill");
...@@ -1411,10 +1395,8 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi ...@@ -1411,10 +1395,8 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
pInfo->pPrevSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
blockDataEnsureCapacity(pInfo->pSrcBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pSrcBlock, pOperator->resultInfo.capacity);
blockDataEnsureCapacity(pInfo->pPrevSrcBlock, pOperator->resultInfo.capacity);
pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pInfo->pRes); pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pInfo->pRes);
if (!pInfo->pFillInfo) { if (!pInfo->pFillInfo) {
......
...@@ -351,6 +351,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -351,6 +351,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
SSDataBlock* p = pProjectInfo->mergeDataBlocks ? pFinalRes : pRes; SSDataBlock* p = pProjectInfo->mergeDataBlocks ? pFinalRes : pRes;
pOperator->resultInfo.totalRows += p->info.rows; pOperator->resultInfo.totalRows += p->info.rows;
p->info.dataLoad = 1;
if (pOperator->cost.openCost == 0) { if (pOperator->cost.openCost == 0) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
......
...@@ -1037,7 +1037,7 @@ SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SRe ...@@ -1037,7 +1037,7 @@ SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SRe
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) { int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) {
TSKEY* tsCols = NULL; TSKEY* tsCols = NULL;
if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad == 1) { if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData; tsCols = (int64_t*)pColDataInfo->pData;
ASSERT(tsCols[0] != 0); ASSERT(tsCols[0] != 0);
......
...@@ -138,7 +138,10 @@ static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel ...@@ -138,7 +138,10 @@ static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel
} else if (TSDB_SUPER_TABLE == pScan->tableType) { } else if (TSDB_SUPER_TABLE == pScan->tableType) {
pScan->scanType = SCAN_TYPE_TABLE_MERGE; pScan->scanType = SCAN_TYPE_TABLE_MERGE;
} }
pScan->node.resultDataOrder = requirement;
if (TSDB_NORMAL_TABLE != pScan->tableType && TSDB_CHILD_TABLE != pScan->tableType) {
pScan->node.resultDataOrder = requirement;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -621,11 +621,13 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) { ...@@ -621,11 +621,13 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
const SDiskbasedBufStatis* ps = &pBuf->statis; const SDiskbasedBufStatis* ps = &pBuf->statis;
#if 0
printf( printf(
"Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f " "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f "
"Kb, %s\n", "Kb, %s\n",
pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0, pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id); listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
#endif
if (ps->loadPages > 0) { if (ps->loadPages > 0) {
printf( printf(
...@@ -634,7 +636,7 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) { ...@@ -634,7 +636,7 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages)); ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
} else { } else {
printf("no page loaded\n"); //printf("no page loaded\n");
} }
} }
......
...@@ -628,6 +628,7 @@ ...@@ -628,6 +628,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/concat2.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/concat2.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQueryInterval.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py
...@@ -793,6 +794,7 @@ ...@@ -793,6 +794,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQueryInterval.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 2
...@@ -886,6 +888,7 @@ ...@@ -886,6 +888,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/arctan.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/arctan.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQueryInterval.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 3
...@@ -979,6 +982,7 @@ ...@@ -979,6 +982,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/arctan.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/arctan.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQueryInterval.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
......
...@@ -127,7 +127,114 @@ if $rows != 13 then ...@@ -127,7 +127,114 @@ if $rows != 13 then
goto loop3 goto loop3
endi endi
sql insert into t2 values(1648791217000,11,11,11,11.0,'eee') (1648791219000,11,11,11,11.0,'eee') t1 values(1648791217000,11,11,11,11.0,'eee') (1648791219000,11,11,11,11.0,'eee');
$loop_count = 0
loop4:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt1 order by group_id, ts;
if $rows != 20 then
print ====streamt1=rows1=$rows
goto loop4
endi
if $data04 == 0 then
print ====streamt1=data04=$data04
return -1
endi
sql select group_id,count(*) from streamt1 group by group_id;
if $rows != 2 then
print ====streamt1=rows2=$rows
goto loop4
endi
sql select * from streamt2 order by group_id, ts;
if $rows != 20 then
print ====streamt2=rows2=$rows
goto loop4
endi
if $data04 == 0 then
print ====streamt2=data04=$data04
return -1
endi
sql select group_id,count(*) from streamt2 group by group_id;
if $rows != 2 then
print ====streamt2=rows2=$rows
goto loop4
endi
sql select * from streamt3 order by group_id, ts;
if $rows != 20 then
print ====streamt3=rows3=$rows
goto loop4
endi
if $data04 == 0 then
print ====streamt3=data04=$data04
return -1
endi
sql select group_id,count(*) from streamt3 group by group_id;
if $rows != 2 then
print ====streamt3=rows2=$rows
goto loop4
endi
sql select * from streamt4 order by group_id, ts;
if $rows != 20 then
print ====streamt4=rows4=$rows
goto loop4
endi
if $data04 == 0 then
print ====streamt4=data04=$data04
return -1
endi
sql select group_id,count(*) from streamt4 group by group_id;
if $rows != 2 then
print ====streamt4=rows2=$rows
goto loop4
endi
sql select * from streamt5 order by group_id, ts;
if $rows != 20 then
print ====streamt5=rows5=$rows
goto loop4
endi
if $data04 == 0 then
print ====streamt5=data04=$data04
return -1
endi
sql select group_id,count(*) from streamt5 group by group_id;
if $rows != 2 then
print ====streamt5=rows2=$rows
goto loop4
endi
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
...@@ -251,28 +251,44 @@ class TDTestCase: ...@@ -251,28 +251,44 @@ class TDTestCase:
# wait db ready # wait db ready
while 1: while 1:
tdSql.query("select * from information_schema.ins_databases") tdSql.query("select * from information_schema.ins_databases")
if tdSql.getRows() == 4: nrows = tdSql.getRows()
print ('==================================================') index = -1
print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0)) for i in range(nrows):
index = 0 if tdSql.getData(i, 0) == parameterDict['dbName']:
if tdSql.getData(0,0) == parameterDict['dbName']: index = i
index = 0
elif tdSql.getData(1,0) == parameterDict['dbName']:
index = 1
elif tdSql.getData(2,0) == parameterDict['dbName']:
index = 2
elif tdSql.getData(3,0) == parameterDict['dbName']:
index = 3
else:
continue
if tdSql.getData(index,15) == 'ready':
print("******************** index: %d"%index)
break break
if index == -1:
continue continue
else:
time.sleep(1) if tdSql.getData(index,15) == 'ready':
print("******************** index: %d"%index)
break
time.sleep(1)
# if tdSql.getRows() == 4:
# print ('==================================================')
# print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0))
# index = 0
# if tdSql.getData(0,0) == parameterDict['dbName']:
# index = 0
# elif tdSql.getData(1,0) == parameterDict['dbName']:
# index = 1
# elif tdSql.getData(2,0) == parameterDict['dbName']:
# index = 2
# elif tdSql.getData(3,0) == parameterDict['dbName']:
# index = 3
# else:
# continue
# if tdSql.getData(index,15) == 'ready':
# print("******************** index: %d"%index)
# break
# continue
# else:
# time.sleep(1)
tdSql.query("use %s"%parameterDict['dbName']) tdSql.query("use %s"%parameterDict['dbName'])
# wait stb ready # wait stb ready
...@@ -395,30 +411,46 @@ class TDTestCase: ...@@ -395,30 +411,46 @@ class TDTestCase:
# wait db ready # wait db ready
while 1: while 1:
tdSql.query("select * from information_schema.ins_databases") tdSql.query("select * from information_schema.ins_databases")
if tdSql.getRows() == 5: nrows = tdSql.getRows()
print ('==================================================dbname: %s'%parameterDict['dbName']) index = -1
print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),tdSql.getData(3,0),tdSql.getData(4,0)) for i in range(nrows):
index = 0 if tdSql.getData(i, 0) == parameterDict['dbName']:
if tdSql.getData(0,0) == parameterDict['dbName']: index = i
index = 0
elif tdSql.getData(1,0) == parameterDict['dbName']:
index = 1
elif tdSql.getData(2,0) == parameterDict['dbName']:
index = 2
elif tdSql.getData(3,0) == parameterDict['dbName']:
index = 3
elif tdSql.getData(4,0) == parameterDict['dbName']:
index = 4
else:
continue
if tdSql.getData(index,15) == 'ready':
print("******************** index: %d"%index)
break break
if index == -1:
continue continue
else:
time.sleep(1) if tdSql.getData(index,15) == 'ready':
print("******************** index: %d"%index)
break
time.sleep(1)
# if tdSql.getRows() == 5:
# print ('==================================================dbname: %s'%parameterDict['dbName'])
# print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),tdSql.getData(3,0),tdSql.getData(4,0))
# index = 0
# if tdSql.getData(0,0) == parameterDict['dbName']:
# index = 0
# elif tdSql.getData(1,0) == parameterDict['dbName']:
# index = 1
# elif tdSql.getData(2,0) == parameterDict['dbName']:
# index = 2
# elif tdSql.getData(3,0) == parameterDict['dbName']:
# index = 3
# elif tdSql.getData(4,0) == parameterDict['dbName']:
# index = 4
# else:
# continue
# if tdSql.getData(index,15) == 'ready':
# print("******************** index: %d"%index)
# break
# continue
# else:
# time.sleep(1)
tdSql.query("use %s"%parameterDict['dbName']) tdSql.query("use %s"%parameterDict['dbName'])
# wait stb ready # wait stb ready
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册