提交 4abb6e14 编写于 作者: C Cary Xu

enh: use simple hash to save agg result

上级 32ce4b31
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "tbuffer.h" #include "tbuffer.h"
#include "tcommon.h" #include "tcommon.h"
#include "tpagedbuf.h" #include "tpagedbuf.h"
#include "tsimplehash.h"
#define T_LONG_JMP(_obj, _c) \ #define T_LONG_JMP(_obj, _c) \
do { \ do { \
...@@ -106,7 +107,7 @@ static FORCE_INLINE void setResultBufPageDirty(SDiskbasedBuf* pBuf, SResultRowPo ...@@ -106,7 +107,7 @@ static FORCE_INLINE void setResultBufPageDirty(SDiskbasedBuf* pBuf, SResultRowPo
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
} }
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order); void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
......
...@@ -297,10 +297,10 @@ enum { ...@@ -297,10 +297,10 @@ enum {
}; };
typedef struct SAggSupporter { typedef struct SAggSupporter {
SHashObj* pResultRowHashTable; // quick locate the window object for each result SSHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
} SAggSupporter; } SAggSupporter;
typedef struct { typedef struct {
......
...@@ -28,7 +28,7 @@ typedef void (*_hash_free_fn_t)(void *); ...@@ -28,7 +28,7 @@ typedef void (*_hash_free_fn_t)(void *);
/** /**
* @brief single thread hash * @brief single thread hash
* *
*/ */
typedef struct SSHashObj SSHashObj; typedef struct SSHashObj SSHashObj;
...@@ -52,13 +52,13 @@ int32_t tSimpleHashPrint(const SSHashObj *pHashObj); ...@@ -52,13 +52,13 @@ int32_t tSimpleHashPrint(const SSHashObj *pHashObj);
/** /**
* @brief put element into hash table, if the element with the same key exists, update it * @brief put element into hash table, if the element with the same key exists, update it
* *
* @param pHashObj * @param pHashObj
* @param key * @param key
* @param keyLen * @param keyLen
* @param data * @param data
* @param dataLen * @param dataLen
* @return int32_t * @return int32_t
*/ */
int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t dataLen); int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t dataLen);
...@@ -80,6 +80,18 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen); ...@@ -80,6 +80,18 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen);
*/ */
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen); int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen);
/**
* remove item with the specified key during hash iterate
*
* @param pHashObj
* @param key
* @param keyLen
* @param pIter
* @param iter
* @return int32_t
*/
int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t keyLen, void **pIter, int32_t *iter);
/** /**
* Clear the hash table. * Clear the hash table.
* @param pHashObj * @param pHashObj
...@@ -99,13 +111,27 @@ void tSimpleHashCleanup(SSHashObj *pHashObj); ...@@ -99,13 +111,27 @@ void tSimpleHashCleanup(SSHashObj *pHashObj);
*/ */
size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj); size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj);
#pragma pack(push, 4)
typedef struct SHNode{
struct SHNode *next;
uint32_t keyLen : 20;
uint32_t dataLen : 12;
char data[];
} SHNode;
#pragma pack(pop)
/** /**
* Get the corresponding key information for a given data in hash table * Get the corresponding key information for a given data in hash table
* @param data * @param data
* @param keyLen * @param keyLen
* @return * @return
*/ */
void *tSimpleHashGetKey(void *data, size_t* keyLen); static FORCE_INLINE void *tSimpleHashGetKey(void *data, size_t *keyLen) {
SHNode *node = (SHNode *)((char *)data - offsetof(SHNode, data));
if (keyLen) *keyLen = node->keyLen;
return POINTER_SHIFT(data, node->dataLen);
}
/** /**
* Create the hash table iterator * Create the hash table iterator
...@@ -116,17 +142,6 @@ void *tSimpleHashGetKey(void *data, size_t* keyLen); ...@@ -116,17 +142,6 @@ void *tSimpleHashGetKey(void *data, size_t* keyLen);
*/ */
void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter); void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter);
/**
* Create the hash table iterator
*
* @param pHashObj
* @param data
* @param key
* @param iter
* @return void*
*/
void *tSimpleHashIterateKV(const SSHashObj *pHashObj, void *data, void **key, int32_t *iter);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -83,7 +83,7 @@ int32_t resultrowComparAsc(const void* p1, const void* p2) { ...@@ -83,7 +83,7 @@ int32_t resultrowComparAsc(const void* p1, const void* p2) {
static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); } static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order) { void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) {
if (pGroupResInfo->pRows != NULL) { if (pGroupResInfo->pRows != NULL) {
taosArrayDestroy(pGroupResInfo->pRows); taosArrayDestroy(pGroupResInfo->pRows);
} }
...@@ -92,9 +92,10 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int ...@@ -92,9 +92,10 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int
void* pData = NULL; void* pData = NULL;
pGroupResInfo->pRows = taosArrayInit(10, POINTER_BYTES); pGroupResInfo->pRows = taosArrayInit(10, POINTER_BYTES);
size_t keyLen = 0; size_t keyLen = 0;
while ((pData = taosHashIterate(pHashmap, pData)) != NULL) { int32_t iter = 0;
void* key = taosHashGetKey(pData, &keyLen); while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pData, &keyLen);
SResKeyPos* p = taosMemoryMalloc(keyLen + sizeof(SResultRowPosition)); SResKeyPos* p = taosMemoryMalloc(keyLen + sizeof(SResultRowPosition));
......
...@@ -234,7 +234,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -234,7 +234,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId); SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 = SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
...@@ -273,7 +273,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -273,7 +273,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// add a new result set for a new group // add a new result set for a new group
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
sizeof(SResultRowPosition)); sizeof(SResultRowPosition));
} }
...@@ -282,7 +282,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -282,7 +282,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// too many time window in query // too many time window in query
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) { tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
} }
...@@ -3011,7 +3011,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len ...@@ -3011,7 +3011,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
} }
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info); SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo)); SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
int32_t size = taosHashGetSize(pSup->pResultRowHashTable); int32_t size = tSimpleHashGetSize(pSup->pResultRowHashTable);
size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length
int32_t totalSize = int32_t totalSize =
sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize); sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
...@@ -3038,10 +3038,11 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len ...@@ -3038,10 +3038,11 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset); SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset);
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
releaseBufPage(pSup->pResultBuf, pPage); releaseBufPage(pSup->pResultBuf, pPage);
void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL); int32_t iter = 0;
while (pIter) { void* pIter = NULL;
void* key = taosHashGetKey(pIter, &keyLen); while ((pIter = tSimpleHashIterate(pSup->pResultRowHashTable, pIter, &iter))) {
void* key = tSimpleHashGetKey(pIter, &keyLen);
SResultRowPosition* p1 = (SResultRowPosition*)pIter; SResultRowPosition* p1 = (SResultRowPosition*)pIter;
pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId); pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId);
...@@ -3072,8 +3073,6 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len ...@@ -3072,8 +3073,6 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
offset += sizeof(int32_t); offset += sizeof(int32_t);
memcpy(*result + offset, pRow, pSup->resultRowSize); memcpy(*result + offset, pRow, pSup->resultRowSize);
offset += pSup->resultRowSize; offset += pSup->resultRowSize;
pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
} }
*(int32_t*)(*result) = offset; *(int32_t*)(*result) = offset;
...@@ -3108,7 +3107,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { ...@@ -3108,7 +3107,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
// add a new result set for a new group // add a new result set for a new group
SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset}; SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset};
taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition)); tSimpleHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
offset += keyLen; offset += keyLen;
int32_t valueLen = *(int32_t*)(result + offset); int32_t valueLen = *(int32_t*)(result + offset);
...@@ -3452,7 +3451,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n ...@@ -3452,7 +3451,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); pAggSup->pResultRowHashTable = tSimpleHashInit(10, hashFn);
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) { if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -3479,7 +3478,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n ...@@ -3479,7 +3478,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
void cleanupAggSup(SAggSupporter* pAggSup) { void cleanupAggSup(SAggSupporter* pAggSup) {
taosMemoryFreeClear(pAggSup->keyBuf); taosMemoryFreeClear(pAggSup->keyBuf);
taosHashCleanup(pAggSup->pResultRowHashTable); tSimpleHashCleanup(pAggSup->pResultRowHashTable);
destroyDiskbasedBuf(pAggSup->pResultBuf); destroyDiskbasedBuf(pAggSup->pResultBuf);
} }
......
...@@ -178,8 +178,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro ...@@ -178,8 +178,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
STableScanInfo* pTableScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pOperator->info;
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf,
GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
if (p1 == NULL) { if (p1 == NULL) {
return NULL; return NULL;
......
...@@ -1380,7 +1380,7 @@ bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t ...@@ -1380,7 +1380,7 @@ bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t
int32_t numOfOutput) { int32_t numOfOutput) {
SET_RES_WINDOW_KEY(pAggSup->keyBuf, pData, bytes, groupId); SET_RES_WINDOW_KEY(pAggSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 = SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); (SResultRowPosition*)tSimpleHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
if (!p1) { if (!p1) {
// window has been closed // window has been closed
return false; return false;
...@@ -1393,14 +1393,14 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) ...@@ -1393,14 +1393,14 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId)
size_t bytes = sizeof(TSKEY); size_t bytes = sizeof(TSKEY);
SET_RES_WINDOW_KEY(pAggSup->keyBuf, &ts, bytes, groupId); SET_RES_WINDOW_KEY(pAggSup->keyBuf, &ts, bytes, groupId);
SResultRowPosition* p1 = SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); (SResultRowPosition*)tSimpleHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
if (!p1) { if (!p1) {
// window has been closed // window has been closed
return false; return false;
} }
// SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, p1->pageId); // SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, p1->pageId);
// dBufSetBufPageRecycled(pAggSup->pResultBuf, bufPage); // dBufSetBufPageRecycled(pAggSup->pResultBuf, bufPage);
taosHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); tSimpleHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
return true; return true;
} }
...@@ -1450,11 +1450,13 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* ...@@ -1450,11 +1450,13 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
} }
} }
static int32_t getAllIntervalWindow(SHashObj* pHashMap, SHashObj* resWins) { static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
void* pIte = NULL;
size_t keyLen = 0; void* pIte = NULL;
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { size_t keyLen = 0;
void* key = taosHashGetKey(pIte, &keyLen); int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen);
uint64_t groupId = *(uint64_t*)key; uint64_t groupId = *(uint64_t*)key;
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))); ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
...@@ -1467,14 +1469,15 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SHashObj* resWins) { ...@@ -1467,14 +1469,15 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SHashObj* resWins) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, static int32_t closeIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pRecyPages, SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pRecyPages,
SDiskbasedBuf* pDiscBuf) { SDiskbasedBuf* pDiscBuf) {
qDebug("===stream===close interval window"); qDebug("===stream===close interval window");
void* pIte = NULL; void* pIte = NULL;
size_t keyLen = 0; size_t keyLen = 0;
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { int32_t iter = 0;
void* key = taosHashGetKey(pIte, &keyLen); while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen);
uint64_t groupId = *(uint64_t*)key; uint64_t groupId = *(uint64_t*)key;
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))); ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
...@@ -1512,7 +1515,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, ...@@ -1512,7 +1515,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
} }
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))]; char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId); SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
taosHashRemove(pHashMap, keyBuf, keyLen); tSimpleHashIterateRemove(pHashMap, keyBuf, keyLen, &pIte, &iter);
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2855,7 +2858,7 @@ bool hasIntervalWindow(SAggSupporter* pSup, TSKEY ts, uint64_t groupId) { ...@@ -2855,7 +2858,7 @@ bool hasIntervalWindow(SAggSupporter* pSup, TSKEY ts, uint64_t groupId) {
int32_t bytes = sizeof(TSKEY); int32_t bytes = sizeof(TSKEY);
SET_RES_WINDOW_KEY(pSup->keyBuf, &ts, bytes, groupId); SET_RES_WINDOW_KEY(pSup->keyBuf, &ts, bytes, groupId);
SResultRowPosition* p1 = SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
return p1 != NULL; return p1 != NULL;
} }
...@@ -2896,7 +2899,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr ...@@ -2896,7 +2899,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) { bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId); SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
GET_RES_WINDOW_KEY_LEN(sizeof(int64_t))); GET_RES_WINDOW_KEY_LEN(sizeof(int64_t)));
return p1 == NULL; return p1 == NULL;
} }
...@@ -3025,7 +3028,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -3025,7 +3028,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
} }
static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) { static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) {
taosHashClear(pInfo->aggSup.pResultRowHashTable); tSimpleHashClear(pInfo->aggSup.pResultRowHashTable);
clearDiskbasedBuf(pInfo->aggSup.pResultBuf); clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
} }
...@@ -4926,14 +4929,14 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui ...@@ -4926,14 +4929,14 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui
SExprSupp* pSup = &pOperatorInfo->exprSupp; SExprSupp* pSup = &pOperatorInfo->exprSupp;
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId); SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL); ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs, finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs,
pSup->rowEntryInfoOffset, pResultBlock, pTaskInfo); pSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0); ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -4956,7 +4959,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4956,7 +4959,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
// there is an result exists // there is an result exists
if (miaInfo->curTs != INT64_MIN) { if (miaInfo->curTs != INT64_MIN) {
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
if (ts != miaInfo->curTs) { if (ts != miaInfo->curTs) {
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs); outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs);
...@@ -4964,7 +4967,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4964,7 +4967,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
} }
} else { } else {
miaInfo->curTs = ts; miaInfo->curTs = ts;
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0); ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0);
} }
STimeWindow win = {0}; STimeWindow win = {0};
...@@ -5040,7 +5043,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5040,7 +5043,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
if (pBlock == NULL) { if (pBlock == NULL) {
// close last unfinalized time window // close last unfinalized time window
if (miaInfo->curTs != INT64_MIN) { if (miaInfo->curTs != INT64_MIN) {
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs); outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs);
miaInfo->curTs = INT64_MIN; miaInfo->curTs = INT64_MIN;
} }
...@@ -5221,12 +5224,12 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table ...@@ -5221,12 +5224,12 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table
SExprSupp* pExprSup = &pOperatorInfo->exprSupp; SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId); SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL); ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pExprSup->pCtx, pExprSup->pExprInfo, finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pExprSup->pCtx, pExprSup->pExprInfo,
pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset, pResultBlock, pTaskInfo); pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -31,21 +31,12 @@ ...@@ -31,21 +31,12 @@
taosMemoryFreeClear(_n); \ taosMemoryFreeClear(_n); \
} while (0); } while (0);
#pragma pack(push, 4)
typedef struct SHNode {
struct SHNode *next;
uint32_t keyLen : 20;
uint32_t dataLen : 12;
char data[];
} SHNode;
#pragma pack(pop)
struct SSHashObj { struct SSHashObj {
SHNode **hashList; SHNode **hashList;
size_t capacity; // number of slots size_t capacity; // number of slots
int64_t size; // number of elements in hash table int64_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function _hash_fn_t hashFp; // hash function
_equal_fn_t equalFp; // equal function _equal_fn_t equalFp; // equal function
}; };
static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
...@@ -76,7 +67,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) { ...@@ -76,7 +67,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) {
pHashObj->hashFp = fn; pHashObj->hashFp = fn;
ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0); ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0);
pHashObj->hashList = (SHNode **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *)); pHashObj->hashList = (SHNode **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *));
if (!pHashObj->hashList) { if (!pHashObj->hashList) {
taosMemoryFree(pHashObj); taosMemoryFree(pHashObj);
...@@ -285,6 +275,43 @@ int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) { ...@@ -285,6 +275,43 @@ int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t keyLen, void **pIter, int32_t *iter) {
if (!pHashObj || !key) {
return TSDB_CODE_FAILED;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHNode *pNode = pHashObj->hashList[slot];
SHNode *pPrev = NULL;
while (pNode) {
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
if (!pPrev) {
pHashObj->hashList[slot] = pNode->next;
} else {
pPrev->next = pNode->next;
}
if (pNode->next) {
*pIter = GET_SHASH_NODE_DATA(pNode->next);
} else {
*pIter = NULL;
++(*iter);
}
FREE_HASH_NODE(pNode);
atomic_sub_fetch_64(&pHashObj->size, 1);
break;
}
pPrev = pNode;
pNode = pNode->next;
}
return TSDB_CODE_SUCCESS;
}
void tSimpleHashClear(SSHashObj *pHashObj) { void tSimpleHashClear(SSHashObj *pHashObj) {
if (!pHashObj || taosHashTableEmpty(pHashObj)) { if (!pHashObj || taosHashTableEmpty(pHashObj)) {
return; return;
...@@ -324,15 +351,6 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) { ...@@ -324,15 +351,6 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) {
return (pHashObj->capacity * sizeof(void *)) + sizeof(SHNode) * tSimpleHashGetSize(pHashObj) + sizeof(SSHashObj); return (pHashObj->capacity * sizeof(void *)) + sizeof(SHNode) * tSimpleHashGetSize(pHashObj) + sizeof(SSHashObj);
} }
void *tSimpleHashGetKey(void *data, size_t *keyLen) {
SHNode *node = (SHNode *)((char *)data - offsetof(SHNode, data));
if (keyLen) {
*keyLen = node->keyLen;
}
return POINTER_SHIFT(data, node->dataLen);
}
void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) { void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
if (!pHashObj) { if (!pHashObj) {
return NULL; return NULL;
...@@ -341,7 +359,7 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) { ...@@ -341,7 +359,7 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
SHNode *pNode = NULL; SHNode *pNode = NULL;
if (!data) { if (!data) {
for (int32_t i = 0; i < pHashObj->capacity; ++i) { for (int32_t i = *iter; i < pHashObj->capacity; ++i) {
pNode = pHashObj->hashList[i]; pNode = pHashObj->hashList[i];
if (!pNode) { if (!pNode) {
continue; continue;
...@@ -368,52 +386,5 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) { ...@@ -368,52 +386,5 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
return GET_SHASH_NODE_DATA(pNode); return GET_SHASH_NODE_DATA(pNode);
} }
return NULL;
}
void *tSimpleHashIterateKV(const SSHashObj *pHashObj, void *data, void **key, int32_t *iter) {
if (!pHashObj) {
return NULL;
}
SHNode *pNode = NULL;
if (!data) {
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
pNode = pHashObj->hashList[i];
if (!pNode) {
continue;
}
*iter = i;
if (key) {
*key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
}
return GET_SHASH_NODE_DATA(pNode);
}
return NULL;
}
pNode = (SHNode *)((char *)data - offsetof(SHNode, data));
if (pNode->next) {
if (key) {
*key = GET_SHASH_NODE_KEY(pNode->next, pNode->next->dataLen);
}
return GET_SHASH_NODE_DATA(pNode->next);
}
++(*iter);
for (int32_t i = *iter; i < pHashObj->capacity; ++i) {
pNode = pHashObj->hashList[i];
if (!pNode) {
continue;
}
*iter = i;
if (key) {
*key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
}
return GET_SHASH_NODE_DATA(pNode);
}
return NULL; return NULL;
} }
\ No newline at end of file
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
// return RUN_ALL_TESTS(); // return RUN_ALL_TESTS();
// } // }
TEST(testCase, tSimpleHashTest) { TEST(testCase, tSimpleHashTest_intKey) {
SSHashObj *pHashObj = SSHashObj *pHashObj =
tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
...@@ -57,12 +57,14 @@ TEST(testCase, tSimpleHashTest) { ...@@ -57,12 +57,14 @@ TEST(testCase, tSimpleHashTest) {
int32_t iter = 0; int32_t iter = 0;
int64_t keySum = 0; int64_t keySum = 0;
int64_t dataSum = 0; int64_t dataSum = 0;
size_t kLen = 0;
while ((data = tSimpleHashIterate(pHashObj, data, &iter))) { while ((data = tSimpleHashIterate(pHashObj, data, &iter))) {
void *key = tSimpleHashGetKey(data, NULL); void *key = tSimpleHashGetKey(data, &kLen);
ASSERT_EQ(keyLen, kLen);
keySum += *(int64_t *)key; keySum += *(int64_t *)key;
dataSum += *(int64_t *)data; dataSum += *(int64_t *)data;
} }
ASSERT_EQ(keySum, dataSum); ASSERT_EQ(keySum, dataSum);
ASSERT_EQ(keySum, originKeySum); ASSERT_EQ(keySum, originKeySum);
...@@ -74,4 +76,69 @@ TEST(testCase, tSimpleHashTest) { ...@@ -74,4 +76,69 @@ TEST(testCase, tSimpleHashTest) {
tSimpleHashCleanup(pHashObj); tSimpleHashCleanup(pHashObj);
} }
TEST(testCase, tSimpleHashTest_binaryKey) {
SSHashObj *pHashObj =
tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
assert(pHashObj != nullptr);
ASSERT_EQ(0, tSimpleHashGetSize(pHashObj));
typedef struct {
int64_t suid;
int64_t uid;
} SCombineKey;
size_t keyLen = sizeof(SCombineKey);
size_t dataLen = sizeof(int64_t);
int64_t originDataSum = 0;
SCombineKey combineKey = {0};
for (int64_t i = 1; i <= 100; ++i) {
combineKey.suid = i;
combineKey.uid = i + 1;
tSimpleHashPut(pHashObj, (const void *)&combineKey, keyLen, (const void *)&i, dataLen);
originDataSum += i;
ASSERT_EQ(i, tSimpleHashGetSize(pHashObj));
}
for (int64_t i = 1; i <= 100; ++i) {
combineKey.suid = i;
combineKey.uid = i + 1;
void *data = tSimpleHashGet(pHashObj, (const void *)&combineKey, keyLen);
ASSERT_EQ(i, *(int64_t *)data);
}
void *data = NULL;
int32_t iter = 0;
int64_t keySum = 0;
int64_t dataSum = 0;
size_t kLen = 0;
while ((data = tSimpleHashIterate(pHashObj, data, &iter))) {
void *key = tSimpleHashGetKey(data, &kLen);
ASSERT_EQ(keyLen, kLen);
dataSum += *(int64_t *)data;
}
ASSERT_EQ(originDataSum, dataSum);
tSimpleHashRemove(pHashObj, (const void *)&combineKey, keyLen);
while ((data = tSimpleHashIterate(pHashObj, data, &iter))) {
void *key = tSimpleHashGetKey(data, &kLen);
ASSERT_EQ(keyLen, kLen);
}
for (int64_t i = 1; i <= 99; ++i) {
combineKey.suid = i;
combineKey.uid = i + 1;
tSimpleHashRemove(pHashObj, (const void *)&combineKey, keyLen);
ASSERT_EQ(99 - i, tSimpleHashGetSize(pHashObj));
}
tSimpleHashCleanup(pHashObj);
}
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册