提交 34e14f77 编写于 作者: C Cary Xu

enh: rsma batch process

上级 dab6c817
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#define RSMA_QTASKINFO_BUFSIZE 32768 #define RSMA_QTASKINFO_BUFSIZE 32768
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
#define RSMA_QTASKEXEC_BUFSIZ 1 // * 1048576 // 8 MB #define RSMA_QTASKEXEC_BUFSIZ 10 * 1048576 // 8 MB
SSmaMgmt smaMgmt = { SSmaMgmt smaMgmt = {
.inited = 0, .inited = 0,
...@@ -373,8 +373,6 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con ...@@ -373,8 +373,6 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
if (!(pRSmaInfo->queue = taosOpenQueue())) { if (!(pRSmaInfo->queue = taosOpenQueue())) {
goto _err; goto _err;
} }
smaError("vgId:%d init bufSize:%" PRIi64 ", qMemSize:%" PRIi64, SMA_VID(pSma), atomic_load_64(&pStat->qBufSize),
taosQueueMemorySize(pRSmaInfo->queue));
if (!(pRSmaInfo->qall = taosAllocateQall())) { if (!(pRSmaInfo->qall = taosAllocateQall())) {
goto _err; goto _err;
...@@ -723,7 +721,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu ...@@ -723,7 +721,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu
tb_uid_t suid) { tb_uid_t suid) {
const SSubmitReq *pReq = (const SSubmitReq *)pMsg; const SSubmitReq *pReq = (const SSubmitReq *)pMsg;
void *qItem = taosAllocateQitem(pReq->length, DEF_QITEM); void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM);
if (!qItem) { if (!qItem) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -733,9 +731,8 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu ...@@ -733,9 +731,8 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu
taosWriteQitem(pInfo->queue, qItem); taosWriteQitem(pInfo->queue, qItem);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
int64_t size = atomic_fetch_add_64(&pRSmaStat->qBufSize, taosQueueMemorySize(pInfo->queue)); atomic_fetch_add_64(&pRSmaStat->qBufSize, taosQueueMemorySize(pInfo->queue));
smaError("vgId:%d originSize:%" PRIi64 ", after push size is:%" PRIi64, SMA_VID(pSma), size,
atomic_load_64(&pRSmaStat->qBufSize));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -899,11 +896,8 @@ static int32_t tdRSmaExecCheck(SSma *pSma) { ...@@ -899,11 +896,8 @@ static int32_t tdRSmaExecCheck(SSma *pSma) {
int64_t bufSize = atomic_load_64(&pRsmaStat->qBufSize); int64_t bufSize = atomic_load_64(&pRsmaStat->qBufSize);
if ((pRsmaStat->execStat == 1) || (bufSize < RSMA_QTASKEXEC_BUFSIZ)) { if ((pRsmaStat->execStat == 1) || (bufSize < RSMA_QTASKEXEC_BUFSIZ)) {
smaError("vgId:%d, return directly as execStat:%" PRIi8 ", bufSize:%" PRIi64, SMA_VID(pSma), pRsmaStat->execStat,
bufSize);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
smaError("vgId:%d, go on exec as execStat:%" PRIi8 ", bufSize:%" PRIi64, SMA_VID(pSma), pRsmaStat->execStat, bufSize);
pRsmaStat->execStat = 1; pRsmaStat->execStat = 1;
...@@ -1726,7 +1720,6 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma) { ...@@ -1726,7 +1720,6 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma) {
atomic_store_64(&pRSmaStat->qBufSize, 0); atomic_store_64(&pRSmaStat->qBufSize, 0);
taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
smaError("vgId:%d after exec qBufSize is:%" PRIi64, SMA_VID(pSma), atomic_load_64(&pRSmaStat->qBufSize));
int32_t qSize = taosArrayGetSize(pSubmitQArr); int32_t qSize = taosArrayGetSize(pSubmitQArr);
for (int32_t i = 0; i < qSize; ++i) { for (int32_t i = 0; i < qSize; ++i) {
...@@ -1748,7 +1741,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma) { ...@@ -1748,7 +1741,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma) {
if (size > 0) { if (size > 0) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo; SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo;
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
if (tdExecuteRSmaImpl(pSma, *(SSubmitReq**)pSubmitArr->pData, size, STREAM_INPUT__DATA_SUBMIT, pInfo, pInfo->suid, i) < 0) { if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, pInfo->suid, i) < 0) {
tdFreeRSmaSubmitItems(pSubmitArr); tdFreeRSmaSubmitItems(pSubmitArr);
goto _err; goto _err;
} }
......
...@@ -22,7 +22,6 @@ ...@@ -22,7 +22,6 @@
#include "tbuffer.h" #include "tbuffer.h"
#include "tcommon.h" #include "tcommon.h"
#include "tpagedbuf.h" #include "tpagedbuf.h"
#include "tsimplehash.h"
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ #define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
do { \ do { \
...@@ -103,7 +102,7 @@ static FORCE_INLINE void setResultBufPageDirty(SDiskbasedBuf* pBuf, SResultRowPo ...@@ -103,7 +102,7 @@ static FORCE_INLINE void setResultBufPageDirty(SDiskbasedBuf* pBuf, SResultRowPo
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
} }
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order); void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
...@@ -141,4 +140,4 @@ int32_t resultrowComparAsc(const void* p1, const void* p2); ...@@ -141,4 +140,4 @@ int32_t resultrowComparAsc(const void* p1, const void* p2);
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified); int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified);
#endif // TDENGINE_QUERYUTIL_H #endif // TDENGINE_QUERYUTIL_H
\ No newline at end of file
...@@ -296,7 +296,7 @@ enum { ...@@ -296,7 +296,7 @@ enum {
}; };
typedef struct SAggSupporter { typedef struct SAggSupporter {
SSHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
...@@ -1045,4 +1045,4 @@ void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); ...@@ -1045,4 +1045,4 @@ void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
} }
#endif #endif
#endif // TDENGINE_EXECUTORIMPL_H #endif // TDENGINE_EXECUTORIMPL_H
\ No newline at end of file
...@@ -97,7 +97,7 @@ int32_t resultrowComparAsc(const void* p1, const void* p2) { ...@@ -97,7 +97,7 @@ int32_t resultrowComparAsc(const void* p1, const void* p2) {
static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); } static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) { void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order) {
if (pGroupResInfo->pRows != NULL) { if (pGroupResInfo->pRows != NULL) {
taosArrayDestroy(pGroupResInfo->pRows); taosArrayDestroy(pGroupResInfo->pRows);
} }
...@@ -106,10 +106,9 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in ...@@ -106,10 +106,9 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in
void* pData = NULL; void* pData = NULL;
pGroupResInfo->pRows = taosArrayInit(10, POINTER_BYTES); pGroupResInfo->pRows = taosArrayInit(10, POINTER_BYTES);
size_t keyLen = 0; size_t keyLen = 0;
int32_t iter = 0; while ((pData = taosHashIterate(pHashmap, pData)) != NULL) {
while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) { void* key = taosHashGetKey(pData, &keyLen);
void* key = tSimpleHashGetKey(pData, &keyLen);
SResKeyPos* p = taosMemoryMalloc(keyLen + sizeof(SResultRowPosition)); SResKeyPos* p = taosMemoryMalloc(keyLen + sizeof(SResultRowPosition));
...@@ -987,4 +986,4 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit ...@@ -987,4 +986,4 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit
pLimitInfo->slimit = slimit; pLimitInfo->slimit = slimit;
pLimitInfo->remainOffset = limit.offset; pLimitInfo->remainOffset = limit.offset;
pLimitInfo->remainGroupOffset = slimit.offset; pLimitInfo->remainGroupOffset = slimit.offset;
} }
\ No newline at end of file
...@@ -250,7 +250,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -250,7 +250,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId); SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 = SResultRowPosition* p1 =
(SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
...@@ -292,7 +292,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -292,7 +292,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// add a new result set for a new group // add a new result set for a new group
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
sizeof(SResultRowPosition)); sizeof(SResultRowPosition));
} }
...@@ -301,7 +301,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -301,7 +301,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// too many time window in query // too many time window in query
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) { taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
} }
...@@ -3017,7 +3017,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len ...@@ -3017,7 +3017,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
} }
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info); SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo)); SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
int32_t size = tSimpleHashGetSize(pSup->pResultRowHashTable); int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length
int32_t totalSize = int32_t totalSize =
sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize); sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
...@@ -3045,10 +3045,9 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len ...@@ -3045,10 +3045,9 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
releaseBufPage(pSup->pResultBuf, pPage); releaseBufPage(pSup->pResultBuf, pPage);
int32_t iter = 0; void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL);
void* pIter = tSimpleHashIterate(pSup->pResultRowHashTable, NULL, &iter);
while (pIter) { while (pIter) {
void* key = tSimpleHashGetKey(pIter, &keyLen); void* key = taosHashGetKey(pIter, &keyLen);
SResultRowPosition* p1 = (SResultRowPosition*)pIter; SResultRowPosition* p1 = (SResultRowPosition*)pIter;
pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId); pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId);
...@@ -3080,7 +3079,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len ...@@ -3080,7 +3079,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
memcpy(*result + offset, pRow, pSup->resultRowSize); memcpy(*result + offset, pRow, pSup->resultRowSize);
offset += pSup->resultRowSize; offset += pSup->resultRowSize;
pIter = tSimpleHashIterate(pSup->pResultRowHashTable, pIter, &iter); pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
} }
*(int32_t*)(*result) = offset; *(int32_t*)(*result) = offset;
...@@ -3115,7 +3114,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { ...@@ -3115,7 +3114,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
// add a new result set for a new group // add a new result set for a new group
SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset}; SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset};
tSimpleHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition)); taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
offset += keyLen; offset += keyLen;
int32_t valueLen = *(int32_t*)(result + offset); int32_t valueLen = *(int32_t*)(result + offset);
...@@ -3454,8 +3453,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n ...@@ -3454,8 +3453,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
// pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
pAggSup->pResultRowHashTable = tSimpleHashInit(100000, hashFn);
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) { if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -3481,7 +3479,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n ...@@ -3481,7 +3479,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
void cleanupAggSup(SAggSupporter* pAggSup) { void cleanupAggSup(SAggSupporter* pAggSup) {
taosMemoryFreeClear(pAggSup->keyBuf); taosMemoryFreeClear(pAggSup->keyBuf);
tSimpleHashCleanup(pAggSup->pResultRowHashTable); taosHashCleanup(pAggSup->pResultRowHashTable);
destroyDiskbasedBuf(pAggSup->pResultBuf); destroyDiskbasedBuf(pAggSup->pResultBuf);
} }
...@@ -4779,4 +4777,4 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF ...@@ -4779,4 +4777,4 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
pCtx[i].pBuf = pSup->pResultBuf; pCtx[i].pBuf = pSup->pResultBuf;
} }
return code; return code;
} }
\ No newline at end of file
...@@ -178,8 +178,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro ...@@ -178,8 +178,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
STableScanInfo* pTableScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pOperator->info;
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, SResultRowPosition* p1 =
GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
if (p1 == NULL) { if (p1 == NULL) {
return NULL; return NULL;
...@@ -3128,4 +3128,4 @@ _error: ...@@ -3128,4 +3128,4 @@ _error:
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
return NULL; return NULL;
} }
\ No newline at end of file
...@@ -1385,7 +1385,7 @@ bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t ...@@ -1385,7 +1385,7 @@ bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t
int32_t numOfOutput) { int32_t numOfOutput) {
SET_RES_WINDOW_KEY(pAggSup->keyBuf, pData, bytes, groupId); SET_RES_WINDOW_KEY(pAggSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 = SResultRowPosition* p1 =
(SResultRowPosition*)tSimpleHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); (SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
if (!p1) { if (!p1) {
// window has been closed // window has been closed
return false; return false;
...@@ -1398,14 +1398,14 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) ...@@ -1398,14 +1398,14 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId)
size_t bytes = sizeof(TSKEY); size_t bytes = sizeof(TSKEY);
SET_RES_WINDOW_KEY(pAggSup->keyBuf, &ts, bytes, groupId); SET_RES_WINDOW_KEY(pAggSup->keyBuf, &ts, bytes, groupId);
SResultRowPosition* p1 = SResultRowPosition* p1 =
(SResultRowPosition*)tSimpleHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); (SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
if (!p1) { if (!p1) {
// window has been closed // window has been closed
return false; return false;
} }
// SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, p1->pageId); // SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, p1->pageId);
// dBufSetBufPageRecycled(pAggSup->pResultBuf, bufPage); // dBufSetBufPageRecycled(pAggSup->pResultBuf, bufPage);
tSimpleHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); taosHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
return true; return true;
} }
...@@ -1455,12 +1455,11 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* ...@@ -1455,12 +1455,11 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
} }
} }
static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) { static int32_t getAllIntervalWindow(SHashObj* pHashMap, SHashObj* resWins) {
void* pIte = NULL; void* pIte = NULL;
size_t keyLen = 0; size_t keyLen = 0;
int32_t iter = 0; while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { void* key = taosHashGetKey(pIte, &keyLen);
void* key = tSimpleHashGetKey(pIte, &keyLen);
uint64_t groupId = *(uint64_t*)key; uint64_t groupId = *(uint64_t*)key;
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))); ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
...@@ -1473,18 +1472,16 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) { ...@@ -1473,18 +1472,16 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t closeIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pRecyPages, SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pRecyPages,
SDiskbasedBuf* pDiscBuf) { SDiskbasedBuf* pDiscBuf) {
qDebug("===stream===close interval window"); qDebug("===stream===close interval window");
void* pIte = NULL; void* pIte = NULL;
void* key = NULL; size_t keyLen = 0;
size_t keyLen = GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)); while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
int32_t iter = 0; void* key = taosHashGetKey(pIte, &keyLen);
while ((pIte = tSimpleHashIterateKV(pHashMap, pIte, &key, &iter)) != NULL) {
// void* key = tSimpleHashGetKey(pIte, &keyLen);
uint64_t groupId = *(uint64_t*)key; uint64_t groupId = *(uint64_t*)key;
// ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))); ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
STimeWindow win; STimeWindow win;
win.skey = ts; win.skey = ts;
...@@ -1520,7 +1517,7 @@ static int32_t closeIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pSup ...@@ -1520,7 +1517,7 @@ static int32_t closeIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pSup
} }
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))]; char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId); SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
tSimpleHashRemove(pHashMap, keyBuf, keyLen); taosHashRemove(pHashMap, keyBuf, keyLen);
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2850,7 +2847,7 @@ bool hasIntervalWindow(SAggSupporter* pSup, TSKEY ts, uint64_t groupId) { ...@@ -2850,7 +2847,7 @@ bool hasIntervalWindow(SAggSupporter* pSup, TSKEY ts, uint64_t groupId) {
int32_t bytes = sizeof(TSKEY); int32_t bytes = sizeof(TSKEY);
SET_RES_WINDOW_KEY(pSup->keyBuf, &ts, bytes, groupId); SET_RES_WINDOW_KEY(pSup->keyBuf, &ts, bytes, groupId);
SResultRowPosition* p1 = SResultRowPosition* p1 =
(SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
return p1 != NULL; return p1 != NULL;
} }
...@@ -2891,9 +2888,8 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr ...@@ -2891,9 +2888,8 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) { bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId); SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId);
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
GET_RES_WINDOW_KEY_LEN(sizeof(int64_t))); GET_RES_WINDOW_KEY_LEN(sizeof(int64_t)));
return p1 == NULL; return p1 == NULL;
} }
...@@ -3021,7 +3017,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -3021,7 +3017,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
} }
static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) { static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) {
tSimpleHashClear(pInfo->aggSup.pResultRowHashTable); taosHashClear(pInfo->aggSup.pResultRowHashTable);
clearDiskbasedBuf(pInfo->aggSup.pResultBuf); clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
cleanupResultRowInfo(&pInfo->binfo.resultRowInfo); cleanupResultRowInfo(&pInfo->binfo.resultRowInfo);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
...@@ -4932,14 +4928,14 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui ...@@ -4932,14 +4928,14 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui
SExprSupp* pSup = &pOperatorInfo->exprSupp; SExprSupp* pSup = &pOperatorInfo->exprSupp;
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId); SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet( SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL); ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs, finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs,
pSup->rowEntryInfoOffset, pResultBlock, pTaskInfo); pSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0); ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -4962,7 +4958,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4962,7 +4958,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
// there is an result exists // there is an result exists
if (miaInfo->curTs != INT64_MIN) { if (miaInfo->curTs != INT64_MIN) {
ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
if (ts != miaInfo->curTs) { if (ts != miaInfo->curTs) {
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs); outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs);
...@@ -4970,7 +4966,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4970,7 +4966,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
} }
} else { } else {
miaInfo->curTs = ts; miaInfo->curTs = ts;
ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0); ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0);
} }
STimeWindow win = {0}; STimeWindow win = {0};
...@@ -5046,7 +5042,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5046,7 +5042,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
if (pBlock == NULL) { if (pBlock == NULL) {
// close last unfinalized time window // close last unfinalized time window
if (miaInfo->curTs != INT64_MIN) { if (miaInfo->curTs != INT64_MIN) {
ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs); outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs);
miaInfo->curTs = INT64_MIN; miaInfo->curTs = INT64_MIN;
} }
...@@ -5223,12 +5219,12 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table ...@@ -5223,12 +5219,12 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table
SExprSupp* pExprSup = &pOperatorInfo->exprSupp; SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId); SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL); ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pExprSup->pCtx, pExprSup->pExprInfo, finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pExprSup->pCtx, pExprSup->pExprInfo,
pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset, pResultBlock, pTaskInfo); pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -5491,4 +5487,4 @@ _error: ...@@ -5491,4 +5487,4 @@ _error:
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册