From 89afece7884a8bf89961f903c5688782ddfd859f Mon Sep 17 00:00:00 2001 From: Yifan Hao Date: Sun, 21 Nov 2021 22:42:54 -0800 Subject: [PATCH] Hash table cleanup [3/n] A few cleanups: 1. Move implementation details (macro definition, SHashEntry and SHashObj definition) to hash.c. Only leave relevant APIs in hash.h. 2. Complete function header comments. 3. Correct variable naming from "newSize" to "newCapacity". --- src/query/src/qExecutor.c | 159 +++++++++++++++++++------------------- src/util/inc/hash.h | 138 +++++++++++++++++---------------- src/util/src/hash.c | 82 ++++++++++++++------ 3 files changed, 211 insertions(+), 168 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 6a89a2f823..d870eeb6dd 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -114,7 +114,7 @@ do { \ a = a + b; \ } \ } \ -} while(0) +} while(0) #define TSKEY_MIN_SUB(a,b) \ do { \ @@ -1277,7 +1277,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, } else { COPY_DATA(&pCtx[k].start.val, (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes); } - + pCtx[k].start.key = prevTs; if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { @@ -1294,7 +1294,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, } else { COPY_DATA(&pCtx[k].end.val, (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes); } - + pCtx[k].end.key = curTs; if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { @@ -1309,9 +1309,9 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, } else { GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes); } - + GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes); - + SPoint point1 = (SPoint){.key = prevTs, .val = &v1}; SPoint point2 = (SPoint){.key = curTs, .val = &v2}; SPoint point = (SPoint){.key = windowKey, .val = &v }; @@ -2123,7 +2123,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } case OP_StateWindow: { - pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; if (opType != OP_DummyInput) { setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); @@ -2253,7 +2253,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { if (!pRuntimeEnv->udfIsCopy) { destroyUdfInfo(pRuntimeEnv->pUdfInfo); } - + destroyResultBuf(pRuntimeEnv->pResultBuf); doFreeQueryHandle(pRuntimeEnv); @@ -2855,7 +2855,7 @@ void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInf if (i < (numOfRows - 1)) { all = false; } - + break; } } @@ -2880,9 +2880,9 @@ void filterColRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock bool all = true; if (pRuntimeEnv->pTsBuf != NULL) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); p = calloc(numOfRows, sizeof(int8_t)); - + TSKEY* k = (TSKEY*) pColInfoData->pData; for (int32_t i = 0; i < numOfRows; ++i) { int32_t offset = ascQuery? i:(numOfRows - i - 1); @@ -2897,7 +2897,7 @@ void filterColRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock p[offset] = true; } - if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) { + if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) { if (i < (numOfRows - 1)) { all = false; } @@ -2905,7 +2905,7 @@ void filterColRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock break; } } - + // save the cursor status pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); } else { @@ -2924,7 +2924,7 @@ void filterColRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock tfree(p); } - + static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId); static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes); @@ -2970,7 +2970,7 @@ void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFi FORCE_INLINE int32_t getColumnDataFromId(void *param, int32_t id, void **data) { int32_t numOfCols = ((SColumnDataParam *)param)->numOfCols; SArray* pDataBlock = ((SColumnDataParam *)param)->pDataBlock; - + for (int32_t j = 0; j < numOfCols; ++j) { SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, j); if (id == pColInfo->info.colId) { @@ -3128,7 +3128,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa SColumnDataParam param = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock}; filterSetColFieldData(pQueryAttr->pFilters, ¶m, getColumnDataFromId); } - + if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) { filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery); } @@ -3281,7 +3281,7 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCt } else { if (pCtx[idx].tag.pz != NULL) { memcpy(pRuntimeEnv->tagVal + offset, pCtx[idx].tag.pz, pCtx[idx].tag.nLen); - } + } } offset += pLocalExprInfo->base.resBytes; @@ -4035,13 +4035,13 @@ void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow* winx, int } TSKEY key = QUERY_IS_ASC_QUERY(pQueryAttr)? winx->skey:winx->ekey; - + qDebug("0x%"PRIx64" update query window, tid:%d, %"PRId64" - %"PRId64", old:%"PRId64" - %"PRId64, GET_QID(pRuntimeEnv), tid, key, pTableQueryInfo->win.ekey, pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey); pTableQueryInfo->win.skey = key; STimeWindow win = {.skey = key, .ekey = pQueryAttr->window.ekey}; - + /** * In handling the both ascending and descending order super table query, we need to find the first qualified * timestamp of this table, and then set the first qualified start timestamp. @@ -4657,7 +4657,7 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64 STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery) { - cond.type = BLOCK_LOAD_TABLE_SEQ_ORDER; + cond.type = BLOCK_LOAD_TABLE_SEQ_ORDER; } if (!isSTableQuery @@ -4786,7 +4786,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr int16_t order = (pQueryAttr->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; tsBufResetPos(pRuntimeEnv->pTsBuf); tsBufSetTraverseOrder(pRuntimeEnv->pTsBuf, order); - tsBufNextPos(pTsBuf); + tsBufNextPos(pTsBuf); } int32_t ps = DEFAULT_PAGE_SIZE; @@ -5169,7 +5169,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset; } else if (pDownstream->operatorType == OP_TimeEvery) { STimeEveryOperatorInfo *pEveryInfo = pDownstream->info; - + pTableScanInfo->pCtx = pEveryInfo->binfo.pCtx; pTableScanInfo->pResultRowInfo = &pEveryInfo->binfo.resultRowInfo; pTableScanInfo->rowCellInfoOffset = pEveryInfo->binfo.rowCellInfoOffset; @@ -5223,7 +5223,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime if (pRuntimeEnv->pQueryAttr->pointInterpQuery) { pRuntimeEnv->enableGroupData = true; } - + SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); pOptr->name = "DataBlocksOptimizedScanOperator"; pOptr->operatorType = OP_DataBlocksOptScan; @@ -5500,7 +5500,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { if (pInfo->pDataBlock->info.rows) { taoscQSort(pCols, pSchema, numOfCols, pInfo->pDataBlock->info.rows, pInfo->colIndex, comp); } - + tfree(pCols); tfree(pSchema); return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; @@ -5648,7 +5648,7 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) { key = pBlock->info.window.skey; TSKEY_MIN_SUB(key, -1); } - + setExecutionContext(pRuntimeEnv, pInfo, pOperator->numOfOutput, pRuntimeEnv->current->groupIndex, key); doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock); } @@ -5913,7 +5913,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { if (pIntervalInfo->resultRowInfo.size > 0 && pQueryAttr->needSort) { qsort(pIntervalInfo->resultRowInfo.pResult, pIntervalInfo->resultRowInfo.size, POINTER_BYTES, resRowCompare); } - + closeAllResultRows(&pIntervalInfo->resultRowInfo); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); @@ -5942,7 +5942,7 @@ static void everyApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *p static int64_t getEveryStartTs(bool ascQuery, STimeWindow *range, STimeWindow *blockWin, SQueryAttr *pQueryAttr) { int64_t startTs = range->skey, ekey = 0; - + assert(range->skey != INT64_MIN); if (ascQuery) { @@ -5980,15 +5980,15 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo SQLFunctionCtx* pCtx = NULL; *needApply = false; - + if (!pQueryAttr->pointInterpQuery) { goto group_finished_exit; } assert(pOperatorInfo->numOfOutput > 1); - + for (int32_t i = 1; i < pOperatorInfo->numOfOutput; ++i) { - assert(pEveryInfo->binfo.pCtx[i].functionId == TSDB_FUNC_INTERP + assert(pEveryInfo->binfo.pCtx[i].functionId == TSDB_FUNC_INTERP || pEveryInfo->binfo.pCtx[i].functionId == TSDB_FUNC_TS_DUMMY || pEveryInfo->binfo.pCtx[i].functionId == TSDB_FUNC_TAG_DUMMY); @@ -5997,7 +5997,7 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo break; } } - + TSKEY* tsCols = NULL; if (pBlock && pBlock->pDataBlock != NULL) { SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, 0); @@ -6007,7 +6007,7 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo if (pCtx->startTs == INT64_MIN) { if (pQueryAttr->range.skey == INT64_MIN) { - if (NULL == tsCols) { + if (NULL == tsCols) { goto group_finished_exit; } @@ -6046,12 +6046,12 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo } else { pCtx->startTs = ascQuery ? pCtx->startTs + pQueryAttr->interval.interval : pCtx->startTs - pQueryAttr->interval.interval; } - - if (ascQuery && pQueryAttr->range.ekey != INT64_MIN && pCtx->startTs > pQueryAttr->range.ekey) { + + if (ascQuery && pQueryAttr->range.ekey != INT64_MIN && pCtx->startTs > pQueryAttr->range.ekey) { goto group_finished_exit; } - if ((!ascQuery) && pQueryAttr->range.skey != INT64_MIN && pCtx->startTs < pQueryAttr->range.skey) { + if ((!ascQuery) && pQueryAttr->range.skey != INT64_MIN && pCtx->startTs < pQueryAttr->range.skey) { goto group_finished_exit; } } else { @@ -6065,8 +6065,8 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo if ((ascQuery && pQueryAttr->range.ekey == INT64_MIN) || ((!ascQuery) && pQueryAttr->range.skey == INT64_MIN)) { goto group_finished_exit; } - - if (pQueryAttr->fillType == TSDB_FILL_NONE || pQueryAttr->fillType == TSDB_FILL_LINEAR + + if (pQueryAttr->fillType == TSDB_FILL_NONE || pQueryAttr->fillType == TSDB_FILL_LINEAR || ((ascQuery && pQueryAttr->fillType == TSDB_FILL_NEXT) || ((!ascQuery) && pQueryAttr->fillType == TSDB_FILL_PREV))) { goto group_finished_exit; } @@ -6088,11 +6088,11 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo } *needApply = true; - + for (int32_t i = 0; i < pOperatorInfo->numOfOutput; ++i) { pEveryInfo->binfo.pCtx[i].startTs = pCtx->startTs; } - + return false; } @@ -6115,14 +6115,14 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo } else { if (tsCols[startPos] == pCtx->startTs) { doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, pCtx->startTs, startPos, INT64_MIN, 0, 0, RESULT_ROW_START_INTERP); - } else { + } else { doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, tsCols[startPos - 1], startPos - 1, INT64_MIN, 0, 0, RESULT_ROW_START_INTERP); } } if (pQueryAttr->fillType != TSDB_FILL_LINEAR) { *needApply = true; - } + } } if ((!ascQuery) && (pQueryAttr->fillType == TSDB_FILL_LINEAR || pQueryAttr->fillType == TSDB_FILL_NEXT) && pCtx->end.key == INT64_MIN) { @@ -6132,7 +6132,7 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo } else if (startPos == (pBlock->info.rows - 1)) { if (tsCols[startPos] == pCtx->startTs) { doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, INT64_MIN, 0, pCtx->startTs, startPos, 0, RESULT_ROW_END_INTERP); - } else { + } else { TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0]; if (lastTs != INT64_MIN) { doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, INT64_MIN, 0, lastTs, -1, 0, RESULT_ROW_END_INTERP); @@ -6145,17 +6145,17 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, INT64_MIN, 0, tsCols[startPos + 1], startPos + 1, 0, RESULT_ROW_END_INTERP); } } - + if (pQueryAttr->fillType != TSDB_FILL_LINEAR) { *needApply = true; - } + } } - + if (ascQuery && (pQueryAttr->fillType == TSDB_FILL_LINEAR || pQueryAttr->fillType == TSDB_FILL_NEXT) && pCtx->end.key == INT64_MIN) { if (startPos < 0) { return true; } - + doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, INT64_MIN, 0, tsCols[startPos], startPos, 0, RESULT_ROW_END_INTERP); *needApply = true; @@ -6165,7 +6165,7 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo if (startPos < 0) { return true; } - + doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, tsCols[startPos], startPos, INT64_MIN, 0, 0, RESULT_ROW_START_INTERP); *needApply = true; @@ -6186,7 +6186,7 @@ group_finished_exit: if (pQueryAttr->needReverseScan) { pQueryAttr->range.skey = INT64_MIN; } - + pEveryInfo->groupDone = true; if (pCtx) { @@ -6194,7 +6194,7 @@ group_finished_exit: pCtx->start.key = INT64_MIN; pCtx->end.key = INT64_MIN; } - + return true; } @@ -6219,7 +6219,7 @@ static void doTimeEveryImpl(SOperatorInfo* pOperator, SQLFunctionCtx *pCtx, SSDa setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); return; } - + tsCols = (int64_t*) pColDataInfo->pData; assert(tsCols[0] == pBlock->info.window.skey && tsCols[pBlock->info.rows - 1] == pBlock->info.window.ekey); @@ -6233,13 +6233,13 @@ static void doTimeEveryImpl(SOperatorInfo* pOperator, SQLFunctionCtx *pCtx, SSDa if (needApply) { everyApplyFunctions(pRuntimeEnv, pEveryInfo->binfo.pCtx, numOfOutput); - + pRes->info.rows = getNumOfResult(pRuntimeEnv, pEveryInfo->binfo.pCtx, pOperator->numOfOutput); if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { pEveryInfo->lastBlock = pBlock; break; } - + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0); } } @@ -6267,8 +6267,8 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput); return pInfo->pRes; } - - if (pRes->info.rows > 0) { + + if (pRes->info.rows > 0) { copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput); return pInfo->pRes; @@ -6327,9 +6327,9 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { break; } - + assert(pEveryInfo->groupDone); - + if (pRes->info.rows > 0) { break; } @@ -7027,12 +7027,12 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3 pCols[i].colId = pExpr[i].base.resColId; pCols[i].flist.numOfFilters = pExpr[i].base.flist.numOfFilters; - if (pCols[i].flist.numOfFilters != 0) { + if (pCols[i].flist.numOfFilters != 0) { pCols[i].flist.filterInfo = calloc(pCols[i].flist.numOfFilters, sizeof(SColumnFilterInfo)); memcpy(pCols[i].flist.filterInfo, pExpr[i].base.flist.filterInfo, pCols[i].flist.numOfFilters * sizeof(SColumnFilterInfo)); } else { // avoid runtime error - pCols[i].flist.filterInfo = NULL; + pCols[i].flist.filterInfo = NULL; } } @@ -7126,7 +7126,7 @@ SOperatorInfo* createTimeEveryOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera if (pQueryAttr->needReverseScan) { pInfo->rangeStart = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP), false, false); } - + initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN); @@ -7496,11 +7496,11 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf } static bool initMultiDistinctInfo(SDistinctOperatorInfo *pInfo, SOperatorInfo* pOperator, SSDataBlock *pBlock) { if (taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput) { - // distinct info already inited + // distinct info already inited return true; } for (int i = 0; i < pOperator->numOfOutput; i++) { - pInfo->totalBytes += pOperator->pExpr[i].base.colBytes; + pInfo->totalBytes += pOperator->pExpr[i].base.colBytes; } for (int i = 0; i < pOperator->numOfOutput; i++) { int numOfBlock = (int)(taosArrayGetSize(pBlock->pDataBlock)); @@ -7520,14 +7520,14 @@ static bool initMultiDistinctInfo(SDistinctOperatorInfo *pInfo, SOperatorInfo* p static void buildMultiDistinctKey(SDistinctOperatorInfo *pInfo, SSDataBlock *pBlock, int32_t rowId) { char *p = pInfo->buf; - memset(p, 0, pInfo->totalBytes); + memset(p, 0, pInfo->totalBytes); for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) { - SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo *)taosArrayGet(pInfo->pDistinctDataInfo, i); + SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo *)taosArrayGet(pInfo->pDistinctDataInfo, i); SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); char *val = ((char *)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId; - if (isNull(val, pDistDataInfo->type)) { - p += pDistDataInfo->bytes; + if (isNull(val, pDistDataInfo->type)) { + p += pDistDataInfo->bytes; continue; } if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) { @@ -7553,7 +7553,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { pRes->info.rows = 0; SSDataBlock* pBlock = NULL; - + while(1) { publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); @@ -7567,7 +7567,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { doSetOperatorCompleted(pOperator); break; } - // ensure result output buf + // ensure result output buf if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) { int32_t newSize = pRes->info.rows + pBlock->info.rows; for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) { @@ -7591,14 +7591,14 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) { SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); //src - SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist + SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i; - char *start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows; + char *start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows; memcpy(start, val, pDistDataInfo->bytes); } pRes->info.rows += 1; - } + } } if (pRes->info.rows >= pInfo->threshold) { @@ -7614,10 +7614,10 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat pInfo->buf = NULL; pInfo->threshold = tsMaxNumOfDistinctResults; // distinct result threshold pInfo->outputCapacity = 4096; - pInfo->pDistinctDataInfo = taosArrayInit(numOfOutput, sizeof(SDistinctDataInfo)); + pInfo->pDistinctDataInfo = taosArrayInit(numOfOutput, sizeof(SDistinctDataInfo)); pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity); - + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "DistinctOperator"; @@ -7629,7 +7629,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->exec = hashDistinct; - pOperator->pExpr = pExpr; + pOperator->pExpr = pExpr; pOperator->cleanup = destroyDistinctOperatorInfo; appendUpstream(pOperator, upstream); @@ -7808,7 +7808,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { pQueryMsg->window.ekey = htobe64(pQueryMsg->window.ekey); pQueryMsg->range.skey = htobe64(pQueryMsg->range.skey); pQueryMsg->range.ekey = htobe64(pQueryMsg->range.ekey); - + pQueryMsg->interval.interval = htobe64(pQueryMsg->interval.interval); pQueryMsg->interval.sliding = htobe64(pQueryMsg->interval.sliding); pQueryMsg->interval.offset = htobe64(pQueryMsg->interval.offset); @@ -7879,7 +7879,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { if (code != TSDB_CODE_SUCCESS) { goto _cleanup; } -*/ +*/ } if (pQueryMsg->colCondLen > 0) { @@ -8309,14 +8309,14 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) { if (pUdfInfo->path) { unlink(pUdfInfo->path); } - + tfree(pUdfInfo->path); pUdfInfo->path = strdup(path); if (pUdfInfo->handle) { taosCloseDll(pUdfInfo->handle); } - + pUdfInfo->handle = taosLoadDll(path); if (NULL == pUdfInfo->handle) { @@ -8477,7 +8477,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp int32_t createQueryFilter(char *data, int32_t len, void** pFilters) { tExprNode* expr = NULL; - + TRY(TSDB_MAX_TAG_CONDITIONS) { expr = exprTreeFromBinary(data, len); } CATCH( code ) { @@ -8784,7 +8784,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S pQueryAttr->vgId = vgId; pQueryAttr->pFilters = pFilters; pQueryAttr->range = pQueryMsg->range; - + pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); if (pQueryAttr->tableCols == NULL) { goto _cleanup; @@ -9227,6 +9227,9 @@ static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type static int64_t getQuerySupportBufSize(size_t numOfTables) { size_t s1 = sizeof(STableQueryInfo); + + // TODO: struct SHashNode is an internal implementation of + // hash table. The implementation should not leak here. size_t s2 = sizeof(SHashNode); // size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb diff --git a/src/util/inc/hash.h b/src/util/inc/hash.h index c85441c324..93cf90e17e 100644 --- a/src/util/inc/hash.h +++ b/src/util/inc/hash.h @@ -24,12 +24,8 @@ extern "C" { #include "hashfunc.h" #include "tlockfree.h" -#define HASH_MAX_CAPACITY (1024 * 1024 * 16) -#define HASH_DEFAULT_LOAD_FACTOR (0.75) -#define HASH_INDEX(v, c) ((v) & ((c)-1)) - -typedef void (*_hash_free_fn_t)(void *param); - +// TODO: SHashNode is an internal implementation and should not +// be in the public header file. typedef struct SHashNode { struct SHashNode *next; uint32_t hashVal; // the hash value of key @@ -40,48 +36,27 @@ typedef struct SHashNode { char data[]; } SHashNode; -#define GET_HASH_NODE_KEY(_n) ((char*)(_n) + sizeof(SHashNode) + (_n)->dataLen) -#define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode)) -#define GET_HASH_PNODE(_n) ((SHashNode *)((char*)(_n) - sizeof(SHashNode))) - typedef enum SHashLockTypeE { HASH_NO_LOCK = 0, HASH_ENTRY_LOCK = 1, } SHashLockTypeE; -typedef struct SHashEntry { - int32_t num; // number of elements in current entry - SRWLatch latch; // entry latch - SHashNode *next; -} SHashEntry; - -typedef struct SHashObj { - SHashEntry **hashList; - size_t capacity; // number of slots - size_t size; // number of elements in hash table - _hash_fn_t hashFp; // hash function - _hash_free_fn_t freeFp; // hash node free callback function - _equal_fn_t equalFp; // equal function - - SRWLatch lock; // read-write spin lock - SHashLockTypeE type; // lock type - bool enableUpdate; // enable update - SArray *pMemBlock; // memory block allocated for SHashEntry -} SHashObj; +typedef struct SHashObj SHashObj; /** - * init the hash table + * initialize a hash table * - * @param capacity initial capacity of the hash table - * @param fn hash function to generate the hash value - * @param threadsafe thread safe or not - * @return + * @param capacity initial capacity of the hash table + * @param fn hash function + * @param update whether the hash table allows in place update + * @param type whether the hash table has per entry lock + * @return hash table object */ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type); - /** * set equal func of the hash table + * * @param pHashObj * @param equalFp * @return @@ -90,6 +65,7 @@ void taosHashSetEqualFp(SHashObj *pHashObj, _equal_fn_t fp); /** * return the size of hash table + * * @param pHashObj * @return */ @@ -97,73 +73,105 @@ int32_t taosHashGetSize(const SHashObj *pHashObj); /** * put element into hash table, if the element with the same key exists, update it - * @param pHashObj - * @param key - * @param keyLen - * @param data - * @param size - * @return + * + * @param pHashObj hash table object + * @param key key + * @param keyLen length of key + * @param data data + * @param size size of data + * @return 0 if success, -1 otherwise */ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size); /** * return the payload data with the specified key * - * @param pHashObj - * @param key - * @param keyLen - * @return + * @param pHashObj hash table object + * @param key key + * @param keyLen length of key + * @return pointer to data */ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen); /** - * apply the udf before return the result - * @param pHashObj - * @param key - * @param keyLen - * @param fp - * @param d - * @return + * Get the data associated with "key". Note that caller needs to make sure + * "d" has enough capacity to accomodate the data. + * + * @param pHashObj hash table object + * @param key key + * @param keyLen length of key + * @param fp function to be called on hash node when the data is found + * @param d buffer + * @return pointer to data */ void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d); /** - * @param pHashObj - * @param key - * @param keyLen - * @param fp - * @param d - * @param sz - * @return + * Get the data associated with "key". Note that caller needs to take ownership + * of the data "d" and make sure it is deallocated. + * + * @param pHashObj hash table object + * @param key key + * @param keyLen length of key + * @param fp function to be called on hash node when the data is found + * @param d buffer + * @param sz size of the data buffer + * @return pointer to data */ void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void** d, size_t *sz); + /** * remove item with the specified key - * @param pHashObj - * @param key - * @param keyLen + * + * @param pHashObj hash table object + * @param key key + * @param keyLen length of key + * @return 0 if success, -1 otherwise */ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen); +/** + * remove item with the specified key + * + * @param pHashObj hash table object + * @param key key + * @param keyLen length of key + * @param data buffer for data + * @param dsize size of data buffer + * @return 0 if success, -1 otherwise + */ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLen, void* data, size_t dsize); int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param); +/** + * clear the contents of the hash table + * + * @param pHashObj hash table object + */ void taosHashClear(SHashObj *pHashObj); /** * clean up hash table - * @param handle + * + * @param pHashObj hash table object */ void taosHashCleanup(SHashObj *pHashObj); /** + * return the number of collisions in the hash table * - * @param pHashObj - * @return + * @param pHashObj hash table object + * @return maximum number of collisions */ int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj); +/** + * return the consumed memory of the hash table + * + * @param pHashObj hash table object + * @return consumed memory of the hash table + */ size_t taosHashGetMemSize(const SHashObj *pHashObj); void *taosHashIterate(SHashObj *pHashObj, void *p); diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 6530bba25a..bfe74ba9d2 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -19,6 +19,9 @@ #include "taosdef.h" #define EXT_SIZE 1024 +#define HASH_MAX_CAPACITY (1024 * 1024 * 16) +#define HASH_DEFAULT_LOAD_FACTOR (0.75) +#define HASH_INDEX(v, c) ((v) & ((c)-1)) #define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR) @@ -36,6 +39,32 @@ DO_FREE_HASH_NODE(_n); \ } while (0); +#define GET_HASH_NODE_KEY(_n) ((char*)(_n) + sizeof(SHashNode) + (_n)->dataLen) +#define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode)) +#define GET_HASH_PNODE(_n) ((SHashNode *)((char*)(_n) - sizeof(SHashNode))) + +typedef void (*_hash_free_fn_t)(void *param); + +typedef struct SHashEntry { + int32_t num; // number of elements in current entry + SRWLatch latch; // entry latch + SHashNode *next; +} SHashEntry; + +typedef struct SHashObj { + SHashEntry **hashList; + size_t capacity; // number of slots + size_t size; // number of elements in hash table + _hash_fn_t hashFp; // hash function + _hash_free_fn_t freeFp; // hash node free callback function + _equal_fn_t equalFp; // equal function + + SRWLatch lock; // read-write spin lock + SHashLockTypeE type; // lock type + bool enableUpdate; // enable update + SArray *pMemBlock; // memory block allocated for SHashEntry +} SHashObj; + static FORCE_INLINE void __wr_lock(void *lock, int32_t type) { if (type == HASH_NO_LOCK) { return; @@ -97,26 +126,26 @@ static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntr static void taosHashTableResize(SHashObj *pHashObj); /** + * allocate and initialize a hash node + * * @param key key of object for hash, usually a null-terminated string * @param keyLen length of key - * @param pData actually data. Requires a consecutive memory block, no pointer is allowed in pData. - * Pointer copy causes memory access error. + * @param pData data to be stored in hash node * @param dsize size of data - * @return SHashNode + * @return sHashNode */ static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal); /** - * Update the hash node + * update the hash node * - * @param pNode hash node - * @param key key for generate hash value - * @param keyLen key length - * @param pData actual data - * @param dsize size of actual data - * @return hash node + * @param pHashObj hash table object + * @param pe hash table entry to operate on + * @param prev previous node + * @param pNode the old node with requested key + * @param pNewNode the new node with requested key */ -static FORCE_INLINE SHashNode *doUpdateHashNode(SHashObj *pHashObj, SHashEntry* pe, SHashNode* prev, SHashNode *pNode, SHashNode *pNewNode) { +static FORCE_INLINE void doUpdateHashNode(SHashObj *pHashObj, SHashEntry* pe, SHashNode* prev, SHashNode *pNode, SHashNode *pNewNode) { assert(pNode->keyLen == pNewNode->keyLen); atomic_sub_fetch_32(&pNode->refCount, 1); @@ -134,12 +163,10 @@ static FORCE_INLINE SHashNode *doUpdateHashNode(SHashObj *pHashObj, SHashEntry* pe->num++; atomic_add_fetch_64(&pHashObj->size, 1); } - - return pNewNode; } /** - * insert the hash node at the front of the linked list + * Insert the hash node at the front of the linked list * * @param pHashObj * @param pNode @@ -155,16 +182,21 @@ static void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode); static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj); /** - * Initialize a hash table + * initialize a hash table * - * @param capacity Initial capacity of the hash table - * @param fn Hash function - * @param update Whether the hash table allows in place update - * @param type Whether the hash table has per entry lock - * @return Hash table object + * @param capacity initial capacity of the hash table + * @param fn hash function + * @param update whether the hash table allows in place update + * @param type whether the hash table has per entry lock + * @return hash table object */ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) { assert(fn != NULL); + if (fn == NULL) { + uError("hash table must have a valid hash function"); + return NULL; + } + if (capacity == 0) { capacity = 4; } @@ -646,15 +678,15 @@ void taosHashTableResize(SHashObj *pHashObj) { SHashNode *pNode = NULL; SHashNode *pNext = NULL; - int32_t newSize = (int32_t)(pHashObj->capacity << 1u); - if (newSize > HASH_MAX_CAPACITY) { + int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u); + if (newCapacity > HASH_MAX_CAPACITY) { // uDebug("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", // pHashObj->capacity, HASH_MAX_CAPACITY); return; } int64_t st = taosGetTimestampUs(); - void *pNewEntryList = realloc(pHashObj->hashList, sizeof(void *) * newSize); + void *pNewEntryList = realloc(pHashObj->hashList, sizeof(void *) * newCapacity); if (pNewEntryList == NULL) { // todo handle error // uDebug("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity); return; @@ -662,7 +694,7 @@ void taosHashTableResize(SHashObj *pHashObj) { pHashObj->hashList = pNewEntryList; - size_t inc = newSize - pHashObj->capacity; + size_t inc = newCapacity - pHashObj->capacity; void * p = calloc(inc, sizeof(SHashEntry)); for (int32_t i = 0; i < inc; ++i) { @@ -671,7 +703,7 @@ void taosHashTableResize(SHashObj *pHashObj) { taosArrayPush(pHashObj->pMemBlock, &p); - pHashObj->capacity = newSize; + pHashObj->capacity = newCapacity; for (int32_t i = 0; i < pHashObj->capacity; ++i) { SHashEntry *pe = pHashObj->hashList[i]; -- GitLab