提交 89afece7 编写于 作者: Y Yifan Hao

Hash table cleanup [3/n]

A few cleanups:
1. Move implementation details (macro definition, SHashEntry and SHashObj
   definition) to hash.c. Only leave relevant APIs in hash.h.
2. Complete function header comments.
3. Correct variable naming from "newSize" to "newCapacity".
上级 376e30ae
......@@ -114,7 +114,7 @@ do { \
a = a + b; \
} \
} \
} while(0)
} while(0)
#define TSKEY_MIN_SUB(a,b) \
do { \
......@@ -1277,7 +1277,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
} else {
COPY_DATA(&pCtx[k].start.val, (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes);
}
pCtx[k].start.key = prevTs;
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
......@@ -1294,7 +1294,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
} else {
COPY_DATA(&pCtx[k].end.val, (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes);
}
pCtx[k].end.key = curTs;
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
......@@ -1309,9 +1309,9 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
} else {
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes);
}
GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes);
SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
SPoint point2 = (SPoint){.key = curTs, .val = &v2};
SPoint point = (SPoint){.key = windowKey, .val = &v };
......@@ -2123,7 +2123,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
case OP_StateWindow: {
pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
......@@ -2253,7 +2253,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
if (!pRuntimeEnv->udfIsCopy) {
destroyUdfInfo(pRuntimeEnv->pUdfInfo);
}
destroyResultBuf(pRuntimeEnv->pResultBuf);
doFreeQueryHandle(pRuntimeEnv);
......@@ -2855,7 +2855,7 @@ void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInf
if (i < (numOfRows - 1)) {
all = false;
}
break;
}
}
......@@ -2880,9 +2880,9 @@ void filterColRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock
bool all = true;
if (pRuntimeEnv->pTsBuf != NULL) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
p = calloc(numOfRows, sizeof(int8_t));
TSKEY* k = (TSKEY*) pColInfoData->pData;
for (int32_t i = 0; i < numOfRows; ++i) {
int32_t offset = ascQuery? i:(numOfRows - i - 1);
......@@ -2897,7 +2897,7 @@ void filterColRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock
p[offset] = true;
}
if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) {
if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) {
if (i < (numOfRows - 1)) {
all = false;
}
......@@ -2905,7 +2905,7 @@ void filterColRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock
break;
}
}
// save the cursor status
pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
} else {
......@@ -2924,7 +2924,7 @@ void filterColRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock
tfree(p);
}
static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId);
static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes);
......@@ -2970,7 +2970,7 @@ void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFi
FORCE_INLINE int32_t getColumnDataFromId(void *param, int32_t id, void **data) {
int32_t numOfCols = ((SColumnDataParam *)param)->numOfCols;
SArray* pDataBlock = ((SColumnDataParam *)param)->pDataBlock;
for (int32_t j = 0; j < numOfCols; ++j) {
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, j);
if (id == pColInfo->info.colId) {
......@@ -3128,7 +3128,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
SColumnDataParam param = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock};
filterSetColFieldData(pQueryAttr->pFilters, &param, getColumnDataFromId);
}
if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
}
......@@ -3281,7 +3281,7 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCt
} else {
if (pCtx[idx].tag.pz != NULL) {
memcpy(pRuntimeEnv->tagVal + offset, pCtx[idx].tag.pz, pCtx[idx].tag.nLen);
}
}
}
offset += pLocalExprInfo->base.resBytes;
......@@ -4035,13 +4035,13 @@ void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow* winx, int
}
TSKEY key = QUERY_IS_ASC_QUERY(pQueryAttr)? winx->skey:winx->ekey;
qDebug("0x%"PRIx64" update query window, tid:%d, %"PRId64" - %"PRId64", old:%"PRId64" - %"PRId64, GET_QID(pRuntimeEnv), tid, key, pTableQueryInfo->win.ekey,
pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey);
pTableQueryInfo->win.skey = key;
STimeWindow win = {.skey = key, .ekey = pQueryAttr->window.ekey};
/**
* In handling the both ascending and descending order super table query, we need to find the first qualified
* timestamp of this table, and then set the first qualified start timestamp.
......@@ -4657,7 +4657,7 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64
STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery) {
cond.type = BLOCK_LOAD_TABLE_SEQ_ORDER;
cond.type = BLOCK_LOAD_TABLE_SEQ_ORDER;
}
if (!isSTableQuery
......@@ -4786,7 +4786,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
int16_t order = (pQueryAttr->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
tsBufResetPos(pRuntimeEnv->pTsBuf);
tsBufSetTraverseOrder(pRuntimeEnv->pTsBuf, order);
tsBufNextPos(pTsBuf);
tsBufNextPos(pTsBuf);
}
int32_t ps = DEFAULT_PAGE_SIZE;
......@@ -5169,7 +5169,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset;
} else if (pDownstream->operatorType == OP_TimeEvery) {
STimeEveryOperatorInfo *pEveryInfo = pDownstream->info;
pTableScanInfo->pCtx = pEveryInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pEveryInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pEveryInfo->binfo.rowCellInfoOffset;
......@@ -5223,7 +5223,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
if (pRuntimeEnv->pQueryAttr->pointInterpQuery) {
pRuntimeEnv->enableGroupData = true;
}
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "DataBlocksOptimizedScanOperator";
pOptr->operatorType = OP_DataBlocksOptScan;
......@@ -5500,7 +5500,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
if (pInfo->pDataBlock->info.rows) {
taoscQSort(pCols, pSchema, numOfCols, pInfo->pDataBlock->info.rows, pInfo->colIndex, comp);
}
tfree(pCols);
tfree(pSchema);
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
......@@ -5648,7 +5648,7 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
key = pBlock->info.window.skey;
TSKEY_MIN_SUB(key, -1);
}
setExecutionContext(pRuntimeEnv, pInfo, pOperator->numOfOutput, pRuntimeEnv->current->groupIndex, key);
doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock);
}
......@@ -5913,7 +5913,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
if (pIntervalInfo->resultRowInfo.size > 0 && pQueryAttr->needSort) {
qsort(pIntervalInfo->resultRowInfo.pResult, pIntervalInfo->resultRowInfo.size, POINTER_BYTES, resRowCompare);
}
closeAllResultRows(&pIntervalInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
......@@ -5942,7 +5942,7 @@ static void everyApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *p
static int64_t getEveryStartTs(bool ascQuery, STimeWindow *range, STimeWindow *blockWin, SQueryAttr *pQueryAttr) {
int64_t startTs = range->skey, ekey = 0;
assert(range->skey != INT64_MIN);
if (ascQuery) {
......@@ -5980,15 +5980,15 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
SQLFunctionCtx* pCtx = NULL;
*needApply = false;
if (!pQueryAttr->pointInterpQuery) {
goto group_finished_exit;
}
assert(pOperatorInfo->numOfOutput > 1);
for (int32_t i = 1; i < pOperatorInfo->numOfOutput; ++i) {
assert(pEveryInfo->binfo.pCtx[i].functionId == TSDB_FUNC_INTERP
assert(pEveryInfo->binfo.pCtx[i].functionId == TSDB_FUNC_INTERP
|| pEveryInfo->binfo.pCtx[i].functionId == TSDB_FUNC_TS_DUMMY
|| pEveryInfo->binfo.pCtx[i].functionId == TSDB_FUNC_TAG_DUMMY);
......@@ -5997,7 +5997,7 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
break;
}
}
TSKEY* tsCols = NULL;
if (pBlock && pBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, 0);
......@@ -6007,7 +6007,7 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
if (pCtx->startTs == INT64_MIN) {
if (pQueryAttr->range.skey == INT64_MIN) {
if (NULL == tsCols) {
if (NULL == tsCols) {
goto group_finished_exit;
}
......@@ -6046,12 +6046,12 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
} else {
pCtx->startTs = ascQuery ? pCtx->startTs + pQueryAttr->interval.interval : pCtx->startTs - pQueryAttr->interval.interval;
}
if (ascQuery && pQueryAttr->range.ekey != INT64_MIN && pCtx->startTs > pQueryAttr->range.ekey) {
if (ascQuery && pQueryAttr->range.ekey != INT64_MIN && pCtx->startTs > pQueryAttr->range.ekey) {
goto group_finished_exit;
}
if ((!ascQuery) && pQueryAttr->range.skey != INT64_MIN && pCtx->startTs < pQueryAttr->range.skey) {
if ((!ascQuery) && pQueryAttr->range.skey != INT64_MIN && pCtx->startTs < pQueryAttr->range.skey) {
goto group_finished_exit;
}
} else {
......@@ -6065,8 +6065,8 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
if ((ascQuery && pQueryAttr->range.ekey == INT64_MIN) || ((!ascQuery) && pQueryAttr->range.skey == INT64_MIN)) {
goto group_finished_exit;
}
if (pQueryAttr->fillType == TSDB_FILL_NONE || pQueryAttr->fillType == TSDB_FILL_LINEAR
if (pQueryAttr->fillType == TSDB_FILL_NONE || pQueryAttr->fillType == TSDB_FILL_LINEAR
|| ((ascQuery && pQueryAttr->fillType == TSDB_FILL_NEXT) || ((!ascQuery) && pQueryAttr->fillType == TSDB_FILL_PREV))) {
goto group_finished_exit;
}
......@@ -6088,11 +6088,11 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
}
*needApply = true;
for (int32_t i = 0; i < pOperatorInfo->numOfOutput; ++i) {
pEveryInfo->binfo.pCtx[i].startTs = pCtx->startTs;
}
return false;
}
......@@ -6115,14 +6115,14 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
} else {
if (tsCols[startPos] == pCtx->startTs) {
doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, pCtx->startTs, startPos, INT64_MIN, 0, 0, RESULT_ROW_START_INTERP);
} else {
} else {
doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, tsCols[startPos - 1], startPos - 1, INT64_MIN, 0, 0, RESULT_ROW_START_INTERP);
}
}
if (pQueryAttr->fillType != TSDB_FILL_LINEAR) {
*needApply = true;
}
}
}
if ((!ascQuery) && (pQueryAttr->fillType == TSDB_FILL_LINEAR || pQueryAttr->fillType == TSDB_FILL_NEXT) && pCtx->end.key == INT64_MIN) {
......@@ -6132,7 +6132,7 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
} else if (startPos == (pBlock->info.rows - 1)) {
if (tsCols[startPos] == pCtx->startTs) {
doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, INT64_MIN, 0, pCtx->startTs, startPos, 0, RESULT_ROW_END_INTERP);
} else {
} else {
TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0];
if (lastTs != INT64_MIN) {
doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, INT64_MIN, 0, lastTs, -1, 0, RESULT_ROW_END_INTERP);
......@@ -6145,17 +6145,17 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, INT64_MIN, 0, tsCols[startPos + 1], startPos + 1, 0, RESULT_ROW_END_INTERP);
}
}
if (pQueryAttr->fillType != TSDB_FILL_LINEAR) {
*needApply = true;
}
}
}
if (ascQuery && (pQueryAttr->fillType == TSDB_FILL_LINEAR || pQueryAttr->fillType == TSDB_FILL_NEXT) && pCtx->end.key == INT64_MIN) {
if (startPos < 0) {
return true;
}
doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, INT64_MIN, 0, tsCols[startPos], startPos, 0, RESULT_ROW_END_INTERP);
*needApply = true;
......@@ -6165,7 +6165,7 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
if (startPos < 0) {
return true;
}
doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, tsCols[startPos], startPos, INT64_MIN, 0, 0, RESULT_ROW_START_INTERP);
*needApply = true;
......@@ -6186,7 +6186,7 @@ group_finished_exit:
if (pQueryAttr->needReverseScan) {
pQueryAttr->range.skey = INT64_MIN;
}
pEveryInfo->groupDone = true;
if (pCtx) {
......@@ -6194,7 +6194,7 @@ group_finished_exit:
pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN;
}
return true;
}
......@@ -6219,7 +6219,7 @@ static void doTimeEveryImpl(SOperatorInfo* pOperator, SQLFunctionCtx *pCtx, SSDa
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
return;
}
tsCols = (int64_t*) pColDataInfo->pData;
assert(tsCols[0] == pBlock->info.window.skey &&
tsCols[pBlock->info.rows - 1] == pBlock->info.window.ekey);
......@@ -6233,13 +6233,13 @@ static void doTimeEveryImpl(SOperatorInfo* pOperator, SQLFunctionCtx *pCtx, SSDa
if (needApply) {
everyApplyFunctions(pRuntimeEnv, pEveryInfo->binfo.pCtx, numOfOutput);
pRes->info.rows = getNumOfResult(pRuntimeEnv, pEveryInfo->binfo.pCtx, pOperator->numOfOutput);
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
pEveryInfo->lastBlock = pBlock;
break;
}
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0);
}
}
......@@ -6267,8 +6267,8 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
return pInfo->pRes;
}
if (pRes->info.rows > 0) {
if (pRes->info.rows > 0) {
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
return pInfo->pRes;
......@@ -6327,9 +6327,9 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
break;
}
assert(pEveryInfo->groupDone);
if (pRes->info.rows > 0) {
break;
}
......@@ -7027,12 +7027,12 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3
pCols[i].colId = pExpr[i].base.resColId;
pCols[i].flist.numOfFilters = pExpr[i].base.flist.numOfFilters;
if (pCols[i].flist.numOfFilters != 0) {
if (pCols[i].flist.numOfFilters != 0) {
pCols[i].flist.filterInfo = calloc(pCols[i].flist.numOfFilters, sizeof(SColumnFilterInfo));
memcpy(pCols[i].flist.filterInfo, pExpr[i].base.flist.filterInfo, pCols[i].flist.numOfFilters * sizeof(SColumnFilterInfo));
} else {
// avoid runtime error
pCols[i].flist.filterInfo = NULL;
pCols[i].flist.filterInfo = NULL;
}
}
......@@ -7126,7 +7126,7 @@ SOperatorInfo* createTimeEveryOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
if (pQueryAttr->needReverseScan) {
pInfo->rangeStart = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP), false, false);
}
initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN);
......@@ -7496,11 +7496,11 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
}
static bool initMultiDistinctInfo(SDistinctOperatorInfo *pInfo, SOperatorInfo* pOperator, SSDataBlock *pBlock) {
if (taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput) {
// distinct info already inited
// distinct info already inited
return true;
}
for (int i = 0; i < pOperator->numOfOutput; i++) {
pInfo->totalBytes += pOperator->pExpr[i].base.colBytes;
pInfo->totalBytes += pOperator->pExpr[i].base.colBytes;
}
for (int i = 0; i < pOperator->numOfOutput; i++) {
int numOfBlock = (int)(taosArrayGetSize(pBlock->pDataBlock));
......@@ -7520,14 +7520,14 @@ static bool initMultiDistinctInfo(SDistinctOperatorInfo *pInfo, SOperatorInfo* p
static void buildMultiDistinctKey(SDistinctOperatorInfo *pInfo, SSDataBlock *pBlock, int32_t rowId) {
char *p = pInfo->buf;
memset(p, 0, pInfo->totalBytes);
memset(p, 0, pInfo->totalBytes);
for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) {
SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo *)taosArrayGet(pInfo->pDistinctDataInfo, i);
SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo *)taosArrayGet(pInfo->pDistinctDataInfo, i);
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index);
char *val = ((char *)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId;
if (isNull(val, pDistDataInfo->type)) {
p += pDistDataInfo->bytes;
if (isNull(val, pDistDataInfo->type)) {
p += pDistDataInfo->bytes;
continue;
}
if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) {
......@@ -7553,7 +7553,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
pRes->info.rows = 0;
SSDataBlock* pBlock = NULL;
while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
......@@ -7567,7 +7567,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
doSetOperatorCompleted(pOperator);
break;
}
// ensure result output buf
// ensure result output buf
if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) {
int32_t newSize = pRes->info.rows + pBlock->info.rows;
for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) {
......@@ -7591,14 +7591,14 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) {
SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); //src
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist
char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i;
char *start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows;
char *start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows;
memcpy(start, val, pDistDataInfo->bytes);
}
pRes->info.rows += 1;
}
}
}
if (pRes->info.rows >= pInfo->threshold) {
......@@ -7614,10 +7614,10 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
pInfo->buf = NULL;
pInfo->threshold = tsMaxNumOfDistinctResults; // distinct result threshold
pInfo->outputCapacity = 4096;
pInfo->pDistinctDataInfo = taosArrayInit(numOfOutput, sizeof(SDistinctDataInfo));
pInfo->pDistinctDataInfo = taosArrayInit(numOfOutput, sizeof(SDistinctDataInfo));
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "DistinctOperator";
......@@ -7629,7 +7629,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = hashDistinct;
pOperator->pExpr = pExpr;
pOperator->pExpr = pExpr;
pOperator->cleanup = destroyDistinctOperatorInfo;
appendUpstream(pOperator, upstream);
......@@ -7808,7 +7808,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pQueryMsg->window.ekey = htobe64(pQueryMsg->window.ekey);
pQueryMsg->range.skey = htobe64(pQueryMsg->range.skey);
pQueryMsg->range.ekey = htobe64(pQueryMsg->range.ekey);
pQueryMsg->interval.interval = htobe64(pQueryMsg->interval.interval);
pQueryMsg->interval.sliding = htobe64(pQueryMsg->interval.sliding);
pQueryMsg->interval.offset = htobe64(pQueryMsg->interval.offset);
......@@ -7879,7 +7879,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
if (code != TSDB_CODE_SUCCESS) {
goto _cleanup;
}
*/
*/
}
if (pQueryMsg->colCondLen > 0) {
......@@ -8309,14 +8309,14 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
if (pUdfInfo->path) {
unlink(pUdfInfo->path);
}
tfree(pUdfInfo->path);
pUdfInfo->path = strdup(path);
if (pUdfInfo->handle) {
taosCloseDll(pUdfInfo->handle);
}
pUdfInfo->handle = taosLoadDll(path);
if (NULL == pUdfInfo->handle) {
......@@ -8477,7 +8477,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
int32_t createQueryFilter(char *data, int32_t len, void** pFilters) {
tExprNode* expr = NULL;
TRY(TSDB_MAX_TAG_CONDITIONS) {
expr = exprTreeFromBinary(data, len);
} CATCH( code ) {
......@@ -8784,7 +8784,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
pQueryAttr->vgId = vgId;
pQueryAttr->pFilters = pFilters;
pQueryAttr->range = pQueryMsg->range;
pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQueryAttr->tableCols == NULL) {
goto _cleanup;
......@@ -9227,6 +9227,9 @@ static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type
static int64_t getQuerySupportBufSize(size_t numOfTables) {
size_t s1 = sizeof(STableQueryInfo);
// TODO: struct SHashNode is an internal implementation of
// hash table. The implementation should not leak here.
size_t s2 = sizeof(SHashNode);
// size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb
......
......@@ -24,12 +24,8 @@ extern "C" {
#include "hashfunc.h"
#include "tlockfree.h"
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
#define HASH_DEFAULT_LOAD_FACTOR (0.75)
#define HASH_INDEX(v, c) ((v) & ((c)-1))
typedef void (*_hash_free_fn_t)(void *param);
// TODO: SHashNode is an internal implementation and should not
// be in the public header file.
typedef struct SHashNode {
struct SHashNode *next;
uint32_t hashVal; // the hash value of key
......@@ -40,48 +36,27 @@ typedef struct SHashNode {
char data[];
} SHashNode;
#define GET_HASH_NODE_KEY(_n) ((char*)(_n) + sizeof(SHashNode) + (_n)->dataLen)
#define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode))
#define GET_HASH_PNODE(_n) ((SHashNode *)((char*)(_n) - sizeof(SHashNode)))
typedef enum SHashLockTypeE {
HASH_NO_LOCK = 0,
HASH_ENTRY_LOCK = 1,
} SHashLockTypeE;
typedef struct SHashEntry {
int32_t num; // number of elements in current entry
SRWLatch latch; // entry latch
SHashNode *next;
} SHashEntry;
typedef struct SHashObj {
SHashEntry **hashList;
size_t capacity; // number of slots
size_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
_hash_free_fn_t freeFp; // hash node free callback function
_equal_fn_t equalFp; // equal function
SRWLatch lock; // read-write spin lock
SHashLockTypeE type; // lock type
bool enableUpdate; // enable update
SArray *pMemBlock; // memory block allocated for SHashEntry
} SHashObj;
typedef struct SHashObj SHashObj;
/**
* init the hash table
* initialize a hash table
*
* @param capacity initial capacity of the hash table
* @param fn hash function to generate the hash value
* @param threadsafe thread safe or not
* @return
* @param capacity initial capacity of the hash table
* @param fn hash function
* @param update whether the hash table allows in place update
* @param type whether the hash table has per entry lock
* @return hash table object
*/
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type);
/**
* set equal func of the hash table
*
* @param pHashObj
* @param equalFp
* @return
......@@ -90,6 +65,7 @@ void taosHashSetEqualFp(SHashObj *pHashObj, _equal_fn_t fp);
/**
* return the size of hash table
*
* @param pHashObj
* @return
*/
......@@ -97,73 +73,105 @@ int32_t taosHashGetSize(const SHashObj *pHashObj);
/**
* put element into hash table, if the element with the same key exists, update it
* @param pHashObj
* @param key
* @param keyLen
* @param data
* @param size
* @return
*
* @param pHashObj hash table object
* @param key key
* @param keyLen length of key
* @param data data
* @param size size of data
* @return 0 if success, -1 otherwise
*/
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size);
/**
* return the payload data with the specified key
*
* @param pHashObj
* @param key
* @param keyLen
* @return
* @param pHashObj hash table object
* @param key key
* @param keyLen length of key
* @return pointer to data
*/
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
/**
* apply the udf before return the result
* @param pHashObj
* @param key
* @param keyLen
* @param fp
* @param d
* @return
* Get the data associated with "key". Note that caller needs to make sure
* "d" has enough capacity to accomodate the data.
*
* @param pHashObj hash table object
* @param key key
* @param keyLen length of key
* @param fp function to be called on hash node when the data is found
* @param d buffer
* @return pointer to data
*/
void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d);
/**
* @param pHashObj
* @param key
* @param keyLen
* @param fp
* @param d
* @param sz
* @return
* Get the data associated with "key". Note that caller needs to take ownership
* of the data "d" and make sure it is deallocated.
*
* @param pHashObj hash table object
* @param key key
* @param keyLen length of key
* @param fp function to be called on hash node when the data is found
* @param d buffer
* @param sz size of the data buffer
* @return pointer to data
*/
void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void** d, size_t *sz);
/**
* remove item with the specified key
* @param pHashObj
* @param key
* @param keyLen
*
* @param pHashObj hash table object
* @param key key
* @param keyLen length of key
* @return 0 if success, -1 otherwise
*/
int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen);
/**
* remove item with the specified key
*
* @param pHashObj hash table object
* @param key key
* @param keyLen length of key
* @param data buffer for data
* @param dsize size of data buffer
* @return 0 if success, -1 otherwise
*/
int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLen, void* data, size_t dsize);
int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param);
/**
* clear the contents of the hash table
*
* @param pHashObj hash table object
*/
void taosHashClear(SHashObj *pHashObj);
/**
* clean up hash table
* @param handle
*
* @param pHashObj hash table object
*/
void taosHashCleanup(SHashObj *pHashObj);
/**
* return the number of collisions in the hash table
*
* @param pHashObj
* @return
* @param pHashObj hash table object
* @return maximum number of collisions
*/
int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj);
/**
* return the consumed memory of the hash table
*
* @param pHashObj hash table object
* @return consumed memory of the hash table
*/
size_t taosHashGetMemSize(const SHashObj *pHashObj);
void *taosHashIterate(SHashObj *pHashObj, void *p);
......
......@@ -19,6 +19,9 @@
#include "taosdef.h"
#define EXT_SIZE 1024
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
#define HASH_DEFAULT_LOAD_FACTOR (0.75)
#define HASH_INDEX(v, c) ((v) & ((c)-1))
#define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR)
......@@ -36,6 +39,32 @@
DO_FREE_HASH_NODE(_n); \
} while (0);
#define GET_HASH_NODE_KEY(_n) ((char*)(_n) + sizeof(SHashNode) + (_n)->dataLen)
#define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode))
#define GET_HASH_PNODE(_n) ((SHashNode *)((char*)(_n) - sizeof(SHashNode)))
typedef void (*_hash_free_fn_t)(void *param);
typedef struct SHashEntry {
int32_t num; // number of elements in current entry
SRWLatch latch; // entry latch
SHashNode *next;
} SHashEntry;
typedef struct SHashObj {
SHashEntry **hashList;
size_t capacity; // number of slots
size_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
_hash_free_fn_t freeFp; // hash node free callback function
_equal_fn_t equalFp; // equal function
SRWLatch lock; // read-write spin lock
SHashLockTypeE type; // lock type
bool enableUpdate; // enable update
SArray *pMemBlock; // memory block allocated for SHashEntry
} SHashObj;
static FORCE_INLINE void __wr_lock(void *lock, int32_t type) {
if (type == HASH_NO_LOCK) {
return;
......@@ -97,26 +126,26 @@ static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntr
static void taosHashTableResize(SHashObj *pHashObj);
/**
* allocate and initialize a hash node
*
* @param key key of object for hash, usually a null-terminated string
* @param keyLen length of key
* @param pData actually data. Requires a consecutive memory block, no pointer is allowed in pData.
* Pointer copy causes memory access error.
* @param pData data to be stored in hash node
* @param dsize size of data
* @return SHashNode
* @return sHashNode
*/
static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal);
/**
* Update the hash node
* update the hash node
*
* @param pNode hash node
* @param key key for generate hash value
* @param keyLen key length
* @param pData actual data
* @param dsize size of actual data
* @return hash node
* @param pHashObj hash table object
* @param pe hash table entry to operate on
* @param prev previous node
* @param pNode the old node with requested key
* @param pNewNode the new node with requested key
*/
static FORCE_INLINE SHashNode *doUpdateHashNode(SHashObj *pHashObj, SHashEntry* pe, SHashNode* prev, SHashNode *pNode, SHashNode *pNewNode) {
static FORCE_INLINE void doUpdateHashNode(SHashObj *pHashObj, SHashEntry* pe, SHashNode* prev, SHashNode *pNode, SHashNode *pNewNode) {
assert(pNode->keyLen == pNewNode->keyLen);
atomic_sub_fetch_32(&pNode->refCount, 1);
......@@ -134,12 +163,10 @@ static FORCE_INLINE SHashNode *doUpdateHashNode(SHashObj *pHashObj, SHashEntry*
pe->num++;
atomic_add_fetch_64(&pHashObj->size, 1);
}
return pNewNode;
}
/**
* insert the hash node at the front of the linked list
* Insert the hash node at the front of the linked list
*
* @param pHashObj
* @param pNode
......@@ -155,16 +182,21 @@ static void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode);
static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj);
/**
* Initialize a hash table
* initialize a hash table
*
* @param capacity Initial capacity of the hash table
* @param fn Hash function
* @param update Whether the hash table allows in place update
* @param type Whether the hash table has per entry lock
* @return Hash table object
* @param capacity initial capacity of the hash table
* @param fn hash function
* @param update whether the hash table allows in place update
* @param type whether the hash table has per entry lock
* @return hash table object
*/
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) {
assert(fn != NULL);
if (fn == NULL) {
uError("hash table must have a valid hash function");
return NULL;
}
if (capacity == 0) {
capacity = 4;
}
......@@ -646,15 +678,15 @@ void taosHashTableResize(SHashObj *pHashObj) {
SHashNode *pNode = NULL;
SHashNode *pNext = NULL;
int32_t newSize = (int32_t)(pHashObj->capacity << 1u);
if (newSize > HASH_MAX_CAPACITY) {
int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u);
if (newCapacity > HASH_MAX_CAPACITY) {
// uDebug("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached",
// pHashObj->capacity, HASH_MAX_CAPACITY);
return;
}
int64_t st = taosGetTimestampUs();
void *pNewEntryList = realloc(pHashObj->hashList, sizeof(void *) * newSize);
void *pNewEntryList = realloc(pHashObj->hashList, sizeof(void *) * newCapacity);
if (pNewEntryList == NULL) { // todo handle error
// uDebug("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity);
return;
......@@ -662,7 +694,7 @@ void taosHashTableResize(SHashObj *pHashObj) {
pHashObj->hashList = pNewEntryList;
size_t inc = newSize - pHashObj->capacity;
size_t inc = newCapacity - pHashObj->capacity;
void * p = calloc(inc, sizeof(SHashEntry));
for (int32_t i = 0; i < inc; ++i) {
......@@ -671,7 +703,7 @@ void taosHashTableResize(SHashObj *pHashObj) {
taosArrayPush(pHashObj->pMemBlock, &p);
pHashObj->capacity = newSize;
pHashObj->capacity = newCapacity;
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
SHashEntry *pe = pHashObj->hashList[i];
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册