未验证 提交 c4aa0be0 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #8760 from haoyifan/hashtable_cleanup

Hashtable cleanup
......@@ -117,7 +117,7 @@ do { \
a = a + b; \
} \
} \
} while(0)
} while(0)
#define TSKEY_MIN_SUB(a,b) \
do { \
......@@ -1332,7 +1332,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
} else {
COPY_DATA(&pCtx[k].start.val, (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes);
}
pCtx[k].start.key = prevTs;
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
......@@ -1349,7 +1349,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
} else {
COPY_DATA(&pCtx[k].end.val, (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes);
}
pCtx[k].end.key = curTs;
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
......@@ -1364,9 +1364,9 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
} else {
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes);
}
GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes);
SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
SPoint point2 = (SPoint){.key = curTs, .val = &v2};
SPoint point = (SPoint){.key = windowKey, .val = &v };
......@@ -2235,6 +2235,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
if (pRuntimeEnv->proot == NULL) {
goto _clean;
}
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
......@@ -2397,7 +2398,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
if (!pRuntimeEnv->udfIsCopy) {
destroyUdfInfo(pRuntimeEnv->pUdfInfo);
}
destroyResultBuf(pRuntimeEnv->pResultBuf);
doFreeQueryHandle(pRuntimeEnv);
......@@ -3007,7 +3008,7 @@ void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInf
if (i < (numOfRows - 1)) {
all = false;
}
break;
}
}
......@@ -3032,9 +3033,9 @@ void filterColRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock
bool all = true;
if (pRuntimeEnv->pTsBuf != NULL) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
p = calloc(numOfRows, sizeof(int8_t));
TSKEY* k = (TSKEY*) pColInfoData->pData;
for (int32_t i = 0; i < numOfRows; ++i) {
int32_t offset = ascQuery? i:(numOfRows - i - 1);
......@@ -3049,7 +3050,7 @@ void filterColRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock
p[offset] = true;
}
if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) {
if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) {
if (i < (numOfRows - 1)) {
all = false;
}
......@@ -3057,7 +3058,7 @@ void filterColRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock
break;
}
}
// save the cursor status
pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
} else {
......@@ -3076,7 +3077,7 @@ void filterColRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock
tfree(p);
}
static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId);
static void doSetTagValueInParam(void* pTable, char* param, int32_t paraLen, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes);
......@@ -3122,7 +3123,7 @@ void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFi
FORCE_INLINE int32_t getColumnDataFromId(void *param, int32_t id, void **data) {
int32_t numOfCols = ((SColumnDataParam *)param)->numOfCols;
SArray* pDataBlock = ((SColumnDataParam *)param)->pDataBlock;
for (int32_t j = 0; j < numOfCols; ++j) {
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, j);
if (id == pColInfo->info.colId) {
......@@ -3283,7 +3284,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
SColumnDataParam param = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock};
filterSetColFieldData(pQueryAttr->pFilters, &param, getColumnDataFromId);
}
if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
}
......@@ -3445,7 +3446,7 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCt
} else {
if (pCtx[idx].tag.pz != NULL) {
memcpy(pRuntimeEnv->tagVal + offset, pCtx[idx].tag.pz, pCtx[idx].tag.nLen);
}
}
}
offset += pLocalExprInfo->base.resBytes;
......@@ -4283,13 +4284,13 @@ void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow* winx, int
}
TSKEY key = QUERY_IS_ASC_QUERY(pQueryAttr)? winx->skey:winx->ekey;
qDebug("0x%"PRIx64" update query window, tid:%d, %"PRId64" - %"PRId64", old:%"PRId64" - %"PRId64, GET_QID(pRuntimeEnv), tid, key, pTableQueryInfo->win.ekey,
pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey);
pTableQueryInfo->win.skey = key;
STimeWindow win = {.skey = key, .ekey = pQueryAttr->window.ekey};
/**
* In handling the both ascending and descending order super table query, we need to find the first qualified
* timestamp of this table, and then set the first qualified start timestamp.
......@@ -4905,7 +4906,7 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64
STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery) {
cond.type = BLOCK_LOAD_TABLE_SEQ_ORDER;
cond.type = BLOCK_LOAD_TABLE_SEQ_ORDER;
}
if (!isSTableQuery
......@@ -5046,7 +5047,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
int16_t order = (pQueryAttr->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
tsBufResetPos(pRuntimeEnv->pTsBuf);
tsBufSetTraverseOrder(pRuntimeEnv->pTsBuf, order);
tsBufNextPos(pTsBuf);
tsBufNextPos(pTsBuf);
}
int32_t ps = DEFAULT_PAGE_SIZE;
......@@ -5467,7 +5468,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset;
} else if (pDownstream->operatorType == OP_TimeEvery) {
STimeEveryOperatorInfo *pEveryInfo = pDownstream->info;
pTableScanInfo->pCtx = pEveryInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pEveryInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pEveryInfo->binfo.rowCellInfoOffset;
......@@ -5525,7 +5526,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
if (pRuntimeEnv->pQueryAttr->pointInterpQuery) {
pRuntimeEnv->enableGroupData = true;
}
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
if (pOptr == NULL) {
tfree(pInfo);
......@@ -5873,7 +5874,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
if (pInfo->pDataBlock->info.rows) {
taoscQSort(pCols, pSchema, numOfCols, pInfo->pDataBlock->info.rows, pInfo->colIndex, comp);
}
tfree(pCols);
tfree(pSchema);
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
......@@ -6057,7 +6058,7 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
key = pBlock->info.window.skey;
TSKEY_MIN_SUB(key, -1);
}
setExecutionContext(pRuntimeEnv, pInfo, pOperator->numOfOutput, pRuntimeEnv->current->groupIndex, key);
doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock);
}
......@@ -6354,7 +6355,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
if (pIntervalInfo->resultRowInfo.size > 0 && pQueryAttr->needSort) {
qsort(pIntervalInfo->resultRowInfo.pResult, pIntervalInfo->resultRowInfo.size, POINTER_BYTES, resRowCompare);
}
closeAllResultRows(&pIntervalInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
......@@ -6383,7 +6384,7 @@ static void everyApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *p
static int64_t getEveryStartTs(bool ascQuery, STimeWindow *range, STimeWindow *blockWin, SQueryAttr *pQueryAttr) {
int64_t startTs = range->skey, ekey = 0;
assert(range->skey != INT64_MIN);
if (ascQuery) {
......@@ -6421,15 +6422,15 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
SQLFunctionCtx* pCtx = NULL;
*needApply = false;
if (!pQueryAttr->pointInterpQuery) {
goto group_finished_exit;
}
assert(pOperatorInfo->numOfOutput > 1);
for (int32_t i = 1; i < pOperatorInfo->numOfOutput; ++i) {
assert(pEveryInfo->binfo.pCtx[i].functionId == TSDB_FUNC_INTERP
assert(pEveryInfo->binfo.pCtx[i].functionId == TSDB_FUNC_INTERP
|| pEveryInfo->binfo.pCtx[i].functionId == TSDB_FUNC_TS_DUMMY
|| pEveryInfo->binfo.pCtx[i].functionId == TSDB_FUNC_TAG
|| pEveryInfo->binfo.pCtx[i].functionId == TSDB_FUNC_TAG_DUMMY);
......@@ -6439,7 +6440,7 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
break;
}
}
TSKEY* tsCols = NULL;
if (pBlock && pBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, 0);
......@@ -6449,7 +6450,7 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
if (pCtx->startTs == INT64_MIN) {
if (pQueryAttr->range.skey == INT64_MIN) {
if (NULL == tsCols) {
if (NULL == tsCols) {
goto group_finished_exit;
}
......@@ -6488,12 +6489,12 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
} else {
pCtx->startTs = ascQuery ? pCtx->startTs + pQueryAttr->interval.interval : pCtx->startTs - pQueryAttr->interval.interval;
}
if (ascQuery && pQueryAttr->range.ekey != INT64_MIN && pCtx->startTs > pQueryAttr->range.ekey) {
if (ascQuery && pQueryAttr->range.ekey != INT64_MIN && pCtx->startTs > pQueryAttr->range.ekey) {
goto group_finished_exit;
}
if ((!ascQuery) && pQueryAttr->range.skey != INT64_MIN && pCtx->startTs < pQueryAttr->range.skey) {
if ((!ascQuery) && pQueryAttr->range.skey != INT64_MIN && pCtx->startTs < pQueryAttr->range.skey) {
goto group_finished_exit;
}
} else {
......@@ -6507,8 +6508,8 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
if ((ascQuery && pQueryAttr->range.ekey == INT64_MIN) || ((!ascQuery) && pQueryAttr->range.skey == INT64_MIN)) {
goto group_finished_exit;
}
if (pQueryAttr->fillType == TSDB_FILL_NONE || pQueryAttr->fillType == TSDB_FILL_LINEAR
if (pQueryAttr->fillType == TSDB_FILL_NONE || pQueryAttr->fillType == TSDB_FILL_LINEAR
|| ((ascQuery && pQueryAttr->fillType == TSDB_FILL_NEXT) || ((!ascQuery) && pQueryAttr->fillType == TSDB_FILL_PREV))) {
goto group_finished_exit;
}
......@@ -6530,11 +6531,11 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
}
*needApply = true;
for (int32_t i = 0; i < pOperatorInfo->numOfOutput; ++i) {
pEveryInfo->binfo.pCtx[i].startTs = pCtx->startTs;
}
return false;
}
......@@ -6557,14 +6558,14 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
} else {
if (tsCols[startPos] == pCtx->startTs) {
doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, pCtx->startTs, startPos, INT64_MIN, 0, 0, RESULT_ROW_START_INTERP);
} else {
} else {
doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, tsCols[startPos - 1], startPos - 1, INT64_MIN, 0, 0, RESULT_ROW_START_INTERP);
}
}
if (pQueryAttr->fillType != TSDB_FILL_LINEAR) {
*needApply = true;
}
}
}
if ((!ascQuery) && (pQueryAttr->fillType == TSDB_FILL_LINEAR || pQueryAttr->fillType == TSDB_FILL_NEXT) && pCtx->end.key == INT64_MIN) {
......@@ -6574,7 +6575,7 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
} else if (startPos == (pBlock->info.rows - 1)) {
if (tsCols[startPos] == pCtx->startTs) {
doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, INT64_MIN, 0, pCtx->startTs, startPos, 0, RESULT_ROW_END_INTERP);
} else {
} else {
TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0];
if (lastTs != INT64_MIN) {
doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, INT64_MIN, 0, lastTs, -1, 0, RESULT_ROW_END_INTERP);
......@@ -6587,17 +6588,17 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, INT64_MIN, 0, tsCols[startPos + 1], startPos + 1, 0, RESULT_ROW_END_INTERP);
}
}
if (pQueryAttr->fillType != TSDB_FILL_LINEAR) {
*needApply = true;
}
}
}
if (ascQuery && (pQueryAttr->fillType == TSDB_FILL_LINEAR || pQueryAttr->fillType == TSDB_FILL_NEXT) && pCtx->end.key == INT64_MIN) {
if (startPos < 0) {
return true;
}
doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, INT64_MIN, 0, tsCols[startPos], startPos, 0, RESULT_ROW_END_INTERP);
*needApply = true;
......@@ -6607,7 +6608,7 @@ static bool doEveryInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlo
if (startPos < 0) {
return true;
}
doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pBlock->pDataBlock, tsCols[startPos], startPos, INT64_MIN, 0, 0, RESULT_ROW_START_INTERP);
*needApply = true;
......@@ -6628,7 +6629,7 @@ group_finished_exit:
if (pQueryAttr->needReverseScan) {
pQueryAttr->range.skey = INT64_MIN;
}
pEveryInfo->groupDone = true;
if (pCtx) {
......@@ -6636,7 +6637,7 @@ group_finished_exit:
pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN;
}
return true;
}
......@@ -6671,7 +6672,7 @@ static void doTimeEveryImpl(SOperatorInfo* pOperator, SQLFunctionCtx *pCtx, SSDa
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
return;
}
tsCols = (int64_t*) pColDataInfo->pData;
assert(tsCols[0] == pBlock->info.window.skey &&
tsCols[pBlock->info.rows - 1] == pBlock->info.window.ekey);
......@@ -6685,7 +6686,7 @@ static void doTimeEveryImpl(SOperatorInfo* pOperator, SQLFunctionCtx *pCtx, SSDa
if (needApply) {
everyApplyFunctions(pRuntimeEnv, pEveryInfo->binfo.pCtx, numOfOutput);
pRes->info.rows = getNumOfResult(pRuntimeEnv, pEveryInfo->binfo.pCtx, pOperator->numOfOutput);
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
pEveryInfo->lastBlock = pBlock;
......@@ -6719,8 +6720,8 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
return pInfo->pRes;
}
if (pRes->info.rows > 0) {
if (pRes->info.rows > 0) {
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
return pInfo->pRes;
......@@ -6779,9 +6780,9 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
break;
}
assert(pEveryInfo->groupDone);
if (pRes->info.rows > 0) {
break;
}
......@@ -7587,12 +7588,12 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3
pCols[i].colId = pExpr[i].base.resColId;
pCols[i].flist.numOfFilters = pExpr[i].base.flist.numOfFilters;
if (pCols[i].flist.numOfFilters != 0) {
if (pCols[i].flist.numOfFilters != 0) {
pCols[i].flist.filterInfo = calloc(pCols[i].flist.numOfFilters, sizeof(SColumnFilterInfo));
memcpy(pCols[i].flist.filterInfo, pExpr[i].base.flist.filterInfo, pCols[i].flist.numOfFilters * sizeof(SColumnFilterInfo));
} else {
// avoid runtime error
pCols[i].flist.filterInfo = NULL;
pCols[i].flist.filterInfo = NULL;
}
}
......@@ -7726,7 +7727,7 @@ SOperatorInfo* createTimeEveryOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
if (pQueryAttr->needReverseScan) {
pInfo->rangeStart = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP), false, false);
}
initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
if (pBInfo->pRes == NULL || pBInfo->pCtx == NULL || pBInfo->resultRowInfo.pResult == NULL ||
......@@ -8262,11 +8263,11 @@ _clean:
static bool initMultiDistinctInfo(SDistinctOperatorInfo *pInfo, SOperatorInfo* pOperator, SSDataBlock *pBlock) {
if (taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput) {
// distinct info already inited
// distinct info already inited
return true;
}
for (int i = 0; i < pOperator->numOfOutput; i++) {
pInfo->totalBytes += pOperator->pExpr[i].base.colBytes;
pInfo->totalBytes += pOperator->pExpr[i].base.colBytes;
}
for (int i = 0; i < pOperator->numOfOutput; i++) {
int numOfBlock = (int)(taosArrayGetSize(pBlock->pDataBlock));
......@@ -8286,14 +8287,14 @@ static bool initMultiDistinctInfo(SDistinctOperatorInfo *pInfo, SOperatorInfo* p
static void buildMultiDistinctKey(SDistinctOperatorInfo *pInfo, SSDataBlock *pBlock, int32_t rowId) {
char *p = pInfo->buf;
memset(p, 0, pInfo->totalBytes);
memset(p, 0, pInfo->totalBytes);
for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) {
SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo *)taosArrayGet(pInfo->pDistinctDataInfo, i);
SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo *)taosArrayGet(pInfo->pDistinctDataInfo, i);
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index);
char *val = ((char *)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId;
if (isNull(val, pDistDataInfo->type)) {
p += pDistDataInfo->bytes;
if (isNull(val, pDistDataInfo->type)) {
p += pDistDataInfo->bytes;
continue;
}
if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) {
......@@ -8319,7 +8320,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
pRes->info.rows = 0;
SSDataBlock* pBlock = NULL;
while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
......@@ -8333,7 +8334,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
doSetOperatorCompleted(pOperator);
break;
}
// ensure result output buf
// ensure result output buf
if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) {
int32_t newSize = pRes->info.rows + pBlock->info.rows;
for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) {
......@@ -8357,14 +8358,14 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) {
SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); //src
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist
char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i;
char *start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows;
char *start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows;
memcpy(start, val, pDistDataInfo->bytes);
}
pRes->info.rows += 1;
}
}
}
if (pRes->info.rows >= pInfo->threshold) {
......@@ -8384,10 +8385,10 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
pInfo->buf = NULL;
pInfo->threshold = tsMaxNumOfDistinctResults; // distinct result threshold
pInfo->outputCapacity = 4096;
pInfo->pDistinctDataInfo = taosArrayInit(numOfOutput, sizeof(SDistinctDataInfo));
pInfo->pDistinctDataInfo = taosArrayInit(numOfOutput, sizeof(SDistinctDataInfo));
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity);
if (pInfo->pDistinctDataInfo == NULL || pInfo->pSet == NULL || pInfo->pRes == NULL) {
goto _clean;
}
......@@ -8406,7 +8407,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = hashDistinct;
pOperator->pExpr = pExpr;
pOperator->pExpr = pExpr;
pOperator->cleanup = destroyDistinctOperatorInfo;
appendUpstream(pOperator, upstream);
......@@ -8587,7 +8588,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pQueryMsg->window.ekey = htobe64(pQueryMsg->window.ekey);
pQueryMsg->range.skey = htobe64(pQueryMsg->range.skey);
pQueryMsg->range.ekey = htobe64(pQueryMsg->range.ekey);
pQueryMsg->interval.interval = htobe64(pQueryMsg->interval.interval);
pQueryMsg->interval.sliding = htobe64(pQueryMsg->interval.sliding);
pQueryMsg->interval.offset = htobe64(pQueryMsg->interval.offset);
......@@ -8658,7 +8659,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
if (code != TSDB_CODE_SUCCESS) {
goto _cleanup;
}
*/
*/
}
if (pQueryMsg->colCondLen > 0) {
......@@ -9116,14 +9117,14 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
if (pUdfInfo->path) {
unlink(pUdfInfo->path);
}
tfree(pUdfInfo->path);
pUdfInfo->path = strdup(path);
if (pUdfInfo->handle) {
taosCloseDll(pUdfInfo->handle);
}
pUdfInfo->handle = taosLoadDll(path);
if (NULL == pUdfInfo->handle) {
......@@ -9281,7 +9282,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
int32_t createQueryFilter(char *data, int32_t len, void** pFilters) {
tExprNode* expr = NULL;
TRY(TSDB_MAX_TAG_CONDITIONS) {
expr = exprTreeFromBinary(data, len);
} CATCH( code ) {
......@@ -10041,6 +10042,9 @@ static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type
static int64_t getQuerySupportBufSize(size_t numOfTables) {
size_t s1 = sizeof(STableQueryInfo);
// TODO: struct SHashNode is an internal implementation of
// hash table. The implementation should not leak here.
size_t s2 = sizeof(SHashNode);
// size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb
......
......@@ -24,24 +24,18 @@ extern "C" {
#include "hashfunc.h"
#include "tlockfree.h"
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
#define HASH_DEFAULT_LOAD_FACTOR (0.75)
#define HASH_INDEX(v, c) ((v) & ((c)-1))
typedef void (*_hash_free_fn_t)(void *param);
// TODO: SHashNode is an internal implementation and should not
// be in the public header file.
typedef struct SHashNode {
struct SHashNode *next;
uint32_t hashVal; // the hash value of key
uint32_t dataLen; // length of data
uint32_t keyLen; // length of the key
int8_t removed; // flag to indicate removed
int32_t count; // reference count
int32_t refCount; // reference count
char data[];
} SHashNode;
#define GET_HASH_NODE_KEY(_n) ((char*)(_n) + sizeof(SHashNode) + (_n)->dataLen)
#define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode))
#define GET_HASH_PNODE(_n) ((SHashNode *)((char*)(_n) - sizeof(SHashNode)))
typedef enum SHashLockTypeE {
......@@ -49,41 +43,24 @@ typedef enum SHashLockTypeE {
HASH_ENTRY_LOCK = 1,
} SHashLockTypeE;
typedef struct SHashEntry {
int32_t num; // number of elements in current entry
SRWLatch latch; // entry latch
SHashNode *next;
} SHashEntry;
typedef struct SHashObj {
SHashEntry **hashList;
size_t capacity; // number of slots
size_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
_hash_free_fn_t freeFp; // hash node free callback function
_equal_fn_t equalFp; // equal function
SRWLatch lock; // read-write spin lock
SHashLockTypeE type; // lock type
bool enableUpdate; // enable update
SArray *pMemBlock; // memory block allocated for SHashEntry
} SHashObj;
typedef struct SHashObj SHashObj;
/**
* init the hash table
* initialize a hash table
*
* @param capacity initial capacity of the hash table
* @param fn hash function to generate the hash value
* @param threadsafe thread safe or not
* @return
* @param capacity initial capacity of the hash table
* @param fn hash function
* @param update whether the hash table allows in place update
* @param type whether the hash table has per entry lock
* @return hash table object
*/
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type);
/**
* set equal func of the hash table
* @param pHashObj
* @param equalFp
* set equal func of the hash table
*
* @param pHashObj
* @param equalFp
* @return
*/
void taosHashSetEqualFp(SHashObj *pHashObj, _equal_fn_t fp);
......@@ -92,6 +69,7 @@ void taosHashSetFreeFp(SHashObj *pHashObj, _hash_free_fn_t fp);
/**
* return the size of hash table
*
* @param pHashObj
* @return
*/
......@@ -99,73 +77,114 @@ int32_t taosHashGetSize(const SHashObj *pHashObj);
/**
* put element into hash table, if the element with the same key exists, update it
* @param pHashObj
* @param key
* @param keyLen
* @param data
* @param size
* @return
*
* @param pHashObj hash table object
* @param key key
* @param keyLen length of key
* @param data data
* @param size size of data
* @return 0 if success, -1 otherwise
*/
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size);
/**
* return the payload data with the specified key
*
* @param pHashObj
* @param key
* @param keyLen
* @return
* @param pHashObj hash table object
* @param key key
* @param keyLen length of key
* @return pointer to data
*/
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
/**
* apply the udf before return the result
* @param pHashObj
* @param key
* @param keyLen
* @param fp
* @param d
* @return
* Get the data associated with "key". Note that caller needs to make sure
* "d" has enough capacity to accomodate the data.
*
* @param pHashObj hash table object
* @param key key
* @param keyLen length of key
* @param fp function to be called on hash node when the data is found
* @param d buffer
* @return pointer to data
*/
void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d);
/**
* @param pHashObj
* @param key
* @param keyLen
* @param fp
* @param d
* @param sz
* @return
* Get the data associated with "key". Note that caller needs to take ownership
* of the data "d" and make sure it is deallocated.
*
* @param pHashObj hash table object
* @param key key
* @param keyLen length of key
* @param fp function to be called on hash node when the data is found
* @param d buffer
* @param sz size of the data buffer
* @return pointer to data
*/
void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void** d, size_t *sz);
/**
* remove item with the specified key
* @param pHashObj
* @param key
* @param keyLen
*
* @param pHashObj hash table object
* @param key key
* @param keyLen length of key
* @return 0 if success, -1 otherwise
*/
int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen);
/**
* remove item with the specified key
*
* @param pHashObj hash table object
* @param key key
* @param keyLen length of key
* @param data buffer for data
* @param dsize size of data buffer
* @return 0 if success, -1 otherwise
*/
int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLen, void* data, size_t dsize);
int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param);
/**
* traverse through all objects in the hash table and apply "fp" on each node.
* If "fp" returns false when applied on top of a node, the node will also be
* removed from table.
*
* @param pHashObj hash table object
* @param fp function pointer applied on each node
* @param param parameter fed into "fp"
*/
void taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param);
/**
* clear the contents of the hash table
*
* @param pHashObj hash table object
*/
void taosHashClear(SHashObj *pHashObj);
/**
* clean up hash table
* @param handle
*
* @param pHashObj hash table object
*/
void taosHashCleanup(SHashObj *pHashObj);
/**
* return the number of collisions in the hash table
*
* @param pHashObj
* @return
* @param pHashObj hash table object
* @return maximum number of collisions
*/
int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj);
int32_t taosHashGetMaxOverflowLinkLength(SHashObj *pHashObj);
/**
* return the consumed memory of the hash table
*
* @param pHashObj hash table object
* @return consumed memory of the hash table
*/
size_t taosHashGetMemSize(const SHashObj *pHashObj);
void *taosHashIterate(SHashObj *pHashObj, void *p);
......
......@@ -22,6 +22,8 @@ typedef uint32_t (*_hash_fn_t)(const char *, uint32_t);
typedef int32_t (*_equal_fn_t)(const void *a, const void *b, size_t sz);
typedef void (*_hash_free_fn_t)(void *param);
/**
* murmur hash algorithm
* @key usually string
......
......@@ -18,53 +18,117 @@
#include "tulog.h"
#include "taosdef.h"
#define EXT_SIZE 1024
/*
* Macro definition
*/
#define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR)
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
#define HASH_DEFAULT_LOAD_FACTOR (0.75)
#define HASH_INDEX(v, c) ((v) & ((c)-1))
#define DO_FREE_HASH_NODE(_n) \
do { \
tfree(_n); \
} while (0)
#define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR)
#define FREE_HASH_NODE(_h, _n) \
#define FREE_HASH_NODE(_n) \
do { \
if ((_h)->freeFp) { \
(_h)->freeFp(GET_HASH_NODE_DATA(_n)); \
} \
\
DO_FREE_HASH_NODE(_n); \
tfree(_n); \
} while (0);
static FORCE_INLINE void __wr_lock(void *lock, int32_t type) {
if (type == HASH_NO_LOCK) {
#define GET_HASH_NODE_KEY(_n) ((char*)(_n) + sizeof(SHashNode) + (_n)->dataLen)
#define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode))
#define GET_HASH_PNODE(_n) ((SHashNode *)((char*)(_n) - sizeof(SHashNode)))
/*
* typedef
*/
typedef struct SHashEntry {
int32_t num; // number of elements in current entry
SRWLatch latch; // entry latch
SHashNode *next;
} SHashEntry;
typedef struct SHashObj {
SHashEntry **hashList;
size_t capacity; // number of slots
size_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
_equal_fn_t equalFp; // equal function
_hash_free_fn_t freeFp; // hash node free callback function
SRWLatch lock; // read-write spin lock
SHashLockTypeE type; // lock type
bool enableUpdate; // enable update
SArray *pMemBlock; // memory block allocated for SHashEntry
} SHashObj;
/*
* Function definition
*/
static FORCE_INLINE void taosHashWLock(SHashObj *pHashObj) {
if (pHashObj->type == HASH_NO_LOCK) {
return;
}
taosWLockLatch(&pHashObj->lock);
}
static FORCE_INLINE void taosHashWUnlock(SHashObj *pHashObj) {
if (pHashObj->type == HASH_NO_LOCK) {
return;
}
taosWUnLockLatch(&pHashObj->lock);
}
static FORCE_INLINE void taosHashRLock(SHashObj *pHashObj) {
if (pHashObj->type == HASH_NO_LOCK) {
return;
}
taosRLockLatch(&pHashObj->lock);
}
static FORCE_INLINE void taosHashRUnlock(SHashObj *pHashObj) {
if (pHashObj->type == HASH_NO_LOCK) {
return;
}
taosRUnLockLatch(&pHashObj->lock);
}
static FORCE_INLINE void
taosHashEntryWLock(const SHashObj *pHashObj, SHashEntry* pe) {
if (pHashObj->type == HASH_NO_LOCK) {
return;
}
taosWLockLatch(lock);
taosWLockLatch(&pe->latch);
}
static FORCE_INLINE void __rd_lock(void *lock, int32_t type) {
if (type == HASH_NO_LOCK) {
static FORCE_INLINE void
taosHashEntryWUnlock(const SHashObj *pHashObj, SHashEntry* pe) {
if (pHashObj->type == HASH_NO_LOCK) {
return;
}
taosRLockLatch(lock);
taosWUnLockLatch(&pe->latch);
}
static FORCE_INLINE void __rd_unlock(void *lock, int32_t type) {
if (type == HASH_NO_LOCK) {
static FORCE_INLINE void
taosHashEntryRLock(const SHashObj *pHashObj, SHashEntry* pe) {
if (pHashObj->type == HASH_NO_LOCK) {
return;
}
taosRUnLockLatch(lock);
taosRLockLatch(&pe->latch);
}
static FORCE_INLINE void __wr_unlock(void *lock, int32_t type) {
if (type == HASH_NO_LOCK) {
static FORCE_INLINE void
taosHashEntryRUnlock(const SHashObj *pHashObj, SHashEntry* pe) {
if (pHashObj->type == HASH_NO_LOCK) {
return;
}
taosWUnLockLatch(lock);
taosRUnLockLatch(&pe->latch);
}
static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
......@@ -75,10 +139,13 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
return i;
}
static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntry *pe, const void *key, size_t keyLen, uint32_t hashVal) {
static FORCE_INLINE SHashNode *
doSearchInEntryList(SHashObj *pHashObj, SHashEntry *pe, const void *key, size_t keyLen, uint32_t hashVal) {
SHashNode *pNode = pe->next;
while (pNode) {
if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && pNode->removed == 0) {
if ((pNode->keyLen == keyLen) &&
((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) &&
pNode->removed == 0) {
assert(pNode->hashVal == hashVal);
break;
}
......@@ -90,59 +157,57 @@ static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntr
}
/**
* Resize the hash list if the threshold is reached
* resize the hash list if the threshold is reached
*
* @param pHashObj
*/
static void taosHashTableResize(SHashObj *pHashObj);
/**
* allocate and initialize a hash node
*
* @param key key of object for hash, usually a null-terminated string
* @param keyLen length of key
* @param pData actually data. Requires a consecutive memory block, no pointer is allowed in pData.
* Pointer copy causes memory access error.
* @param pData data to be stored in hash node
* @param dsize size of data
* @return SHashNode
*/
static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal);
/**
* Update the hash node
* update the hash node
*
* @param pNode hash node
* @param key key for generate hash value
* @param keyLen key length
* @param pData actual data
* @param dsize size of actual data
* @return hash node
* @param pHashObj hash table object
* @param pe hash table entry to operate on
* @param prev previous node
* @param pNode the old node with requested key
* @param pNewNode the new node with requested key
*/
static FORCE_INLINE SHashNode *doUpdateHashNode(SHashObj *pHashObj, SHashEntry* pe, SHashNode* prev, SHashNode *pNode, SHashNode *pNewNode) {
static FORCE_INLINE void doUpdateHashNode(SHashObj *pHashObj, SHashEntry* pe, SHashNode* prev, SHashNode *pNode, SHashNode *pNewNode) {
assert(pNode->keyLen == pNewNode->keyLen);
atomic_sub_fetch_32(&pNode->count, 1);
atomic_sub_fetch_32(&pNode->refCount, 1);
if (prev != NULL) {
prev->next = pNewNode;
} else {
pe->next = pNewNode;
}
if (pNode->count <= 0) {
if (pNode->refCount <= 0) {
pNewNode->next = pNode->next;
DO_FREE_HASH_NODE(pNode);
FREE_HASH_NODE(pNode);
} else {
pNewNode->next = pNode;
pNewNode->next = pNode;
pe->num++;
atomic_add_fetch_64(&pHashObj->size, 1);
}
return pNewNode;
}
/**
* insert the hash node at the front of the linked list
*
* @param pHashObj
* @param pNode
* @param pHashObj hash table object
* @param pNode the old node with requested key
*/
static void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode);
......@@ -155,13 +220,21 @@ static void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode);
static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj);
/**
* Get the next element in hash table for iterator
* @param pIter
* @return
* initialize a hash table
*
* @param capacity initial capacity of the hash table
* @param fn hash function
* @param update whether the hash table allows in place update
* @param type whether the hash table has per entry lock
* @return hash table object
*/
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) {
assert(fn != NULL);
if (fn == NULL) {
uError("hash table must have a valid hash function");
assert(0);
return NULL;
}
if (capacity == 0) {
capacity = 4;
}
......@@ -174,28 +247,43 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
// the max slots is not defined by user
pHashObj->capacity = taosHashCapacity((int32_t)capacity);
assert((pHashObj->capacity & (pHashObj->capacity - 1)) == 0);
pHashObj->equalFp = memcmp;
pHashObj->hashFp = fn;
pHashObj->type = type;
pHashObj->enableUpdate = update;
assert((pHashObj->capacity & (pHashObj->capacity - 1)) == 0);
pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(void *));
if (pHashObj->hashList == NULL) {
free(pHashObj);
uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
} else {
pHashObj->pMemBlock = taosArrayInit(8, sizeof(void *));
}
void *p = calloc(pHashObj->capacity, sizeof(SHashEntry));
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
pHashObj->hashList[i] = (void *)((char *)p + i * sizeof(SHashEntry));
}
pHashObj->pMemBlock = taosArrayInit(8, sizeof(void *));
if (pHashObj->pMemBlock == NULL) {
free(pHashObj->hashList);
free(pHashObj);
uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
taosArrayPush(pHashObj->pMemBlock, &p);
void *p = calloc(pHashObj->capacity, sizeof(SHashEntry));
if (p == NULL) {
taosArrayDestroy(&pHashObj->pMemBlock);
free(pHashObj->hashList);
free(pHashObj);
uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
pHashObj->hashList[i] = (void *)((char *)p + i * sizeof(SHashEntry));
}
taosArrayPush(pHashObj->pMemBlock, &p);
return pHashObj;
}
......@@ -212,7 +300,7 @@ void taosHashSetFreeFp(SHashObj *pHashObj, _hash_free_fn_t fp) {
}
int32_t taosHashGetSize(const SHashObj *pHashObj) {
if (!pHashObj) {
if (pHashObj == NULL) {
return 0;
}
return (int32_t)atomic_load_64(&pHashObj->size);
......@@ -223,6 +311,10 @@ static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) {
}
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
if (pHashObj == NULL || key == NULL || keyLen == 0 || data == NULL || size == 0) {
return -1;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
......@@ -231,19 +323,17 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
// need the resize process, write lock applied
if (HASH_NEED_RESIZE(pHashObj)) {
__wr_lock(&pHashObj->lock, pHashObj->type);
taosHashWLock(pHashObj);
taosHashTableResize(pHashObj);
__wr_unlock(&pHashObj->lock, pHashObj->type);
taosHashWUnlock(pHashObj);
}
__rd_lock(&pHashObj->lock, pHashObj->type);
taosHashRLock(pHashObj);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot];
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWLockLatch(&pe->latch);
}
taosHashEntryWLock(pHashObj, pe);
SHashNode *pNode = pe->next;
if (pe->num > 0) {
......@@ -254,7 +344,9 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
SHashNode* prev = NULL;
while (pNode) {
if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && pNode->removed == 0) {
if ((pNode->keyLen == keyLen) &&
(*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0 &&
pNode->removed == 0) {
assert(pNode->hashVal == hashVal);
break;
}
......@@ -267,18 +359,12 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
// no data in hash table with the specified key, add it into hash table
pushfrontNodeInEntryList(pe, pNewNode);
if (pe->num == 0) {
assert(pe->next == NULL);
} else {
assert(pe->next != NULL);
}
assert(pe->next != NULL);
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch);
}
taosHashEntryWUnlock(pHashObj, pe);
// enable resize
__rd_unlock(&pHashObj->lock, pHashObj->type);
taosHashRUnlock(pHashObj);
atomic_add_fetch_64(&pHashObj->size, 1);
return 0;
......@@ -287,15 +373,13 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
if (pHashObj->enableUpdate) {
doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode);
} else {
DO_FREE_HASH_NODE(pNewNode);
FREE_HASH_NODE(pNewNode);
}
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch);
}
taosHashEntryWUnlock(pHashObj, pe);
// enable resize
__rd_unlock(&pHashObj->lock, pHashObj->type);
taosHashRUnlock(pHashObj);
return pHashObj->enableUpdate ? 0 : -1;
}
......@@ -306,30 +390,27 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
}
//TODO(yihaoDeng), merge with taosHashGetClone
void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void** d, size_t *sz) {
if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) {
if (pHashObj == NULL || taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) {
return NULL;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
// only add the read lock to disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
taosHashRLock(pHashObj);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot];
// no data, return directly
if (atomic_load_32(&pe->num) == 0) {
__rd_unlock(&pHashObj->lock, pHashObj->type);
taosHashRUnlock(pHashObj);
return NULL;
}
char *data = NULL;
// lock entry
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosRLockLatch(&pe->latch);
}
taosHashEntryRLock(pHashObj, pe);
if (pe->num > 0) {
assert(pe->next != NULL);
......@@ -342,56 +423,47 @@ void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, vo
if (fp != NULL) {
fp(GET_HASH_NODE_DATA(pNode));
}
if (*d == NULL) {
*sz = pNode->dataLen + EXT_SIZE;
*d = calloc(1, *sz);
*sz = pNode->dataLen;
*d = calloc(1, *sz);
} else if (*sz < pNode->dataLen){
*sz = pNode->dataLen + EXT_SIZE;
*d = realloc(*d, *sz);
*sz = pNode->dataLen;
*d = realloc(*d, *sz);
}
memcpy((char *)(*d), GET_HASH_NODE_DATA(pNode), pNode->dataLen);
// just make runtime happy
if ((*sz) - pNode->dataLen > 0) {
memset((char *)(*d) + pNode->dataLen, 0, (*sz) - pNode->dataLen);
}
data = GET_HASH_NODE_DATA(pNode);
}
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosRUnLockLatch(&pe->latch);
}
taosHashEntryRUnlock(pHashObj, pe);
taosHashRUnlock(pHashObj);
__rd_unlock(&pHashObj->lock, pHashObj->type);
return data;
}
void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d) {
if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) {
if (pHashObj == NULL || taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) {
return NULL;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
// only add the read lock to disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
taosHashRLock(pHashObj);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot];
// no data, return directly
if (atomic_load_32(&pe->num) == 0) {
__rd_unlock(&pHashObj->lock, pHashObj->type);
taosHashRUnlock(pHashObj);
return NULL;
}
char *data = NULL;
// lock entry
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosRLockLatch(&pe->latch);
}
taosHashEntryRLock(pHashObj, pe);
if (pe->num > 0) {
assert(pe->next != NULL);
......@@ -412,11 +484,9 @@ void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void
data = GET_HASH_NODE_DATA(pNode);
}
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosRUnLockLatch(&pe->latch);
}
taosHashEntryRUnlock(pHashObj, pe);
taosHashRUnlock(pHashObj);
__rd_unlock(&pHashObj->lock, pHashObj->type);
return data;
}
......@@ -425,28 +495,26 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
}
int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t dsize) {
if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) {
if (pHashObj == NULL || taosHashTableEmpty(pHashObj) || key == NULL || keyLen == 0) {
return -1;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
// disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
taosHashRLock(pHashObj);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot];
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWLockLatch(&pe->latch);
}
taosHashEntryWLock(pHashObj, pe);
// double check after locked
if (pe->num == 0) {
assert(pe->next == NULL);
taosWUnLockLatch(&pe->latch);
__rd_unlock(&pHashObj->lock, pHashObj->type);
taosHashEntryWUnlock(pHashObj, pe);
taosHashRUnlock(pHashObj);
return -1;
}
......@@ -455,49 +523,46 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe
SHashNode *prevNode = NULL;
while (pNode) {
if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && pNode->removed == 0)
break;
prevNode = pNode;
pNode = pNode->next;
}
if ((pNode->keyLen == keyLen) &&
((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) &&
pNode->removed == 0) {
code = 0; // it is found
atomic_sub_fetch_32(&pNode->refCount, 1);
pNode->removed = 1;
if (pNode->refCount <= 0) {
if (prevNode == NULL) {
pe->next = pNode->next;
} else {
prevNode->next = pNode->next;
}
if (pNode) {
code = 0; // it is found
if (data) memcpy(data, GET_HASH_NODE_DATA(pNode), dsize);
atomic_sub_fetch_32(&pNode->count, 1);
pNode->removed = 1;
if (pNode->count <= 0) {
if (prevNode) {
prevNode->next = pNode->next;
} else {
pe->next = pNode->next;
pe->num--;
atomic_sub_fetch_64(&pHashObj->size, 1);
FREE_HASH_NODE(pNode);
}
if (data) memcpy(data, GET_HASH_NODE_DATA(pNode), dsize);
pe->num--;
atomic_sub_fetch_64(&pHashObj->size, 1);
FREE_HASH_NODE(pHashObj, pNode);
} else {
prevNode = pNode;
pNode = pNode->next;
}
}
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch);
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
taosHashEntryWUnlock(pHashObj, pe);
taosHashRUnlock(pHashObj);
return code;
}
int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param) {
if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) {
return 0;
void taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param) {
if (pHashObj == NULL || taosHashTableEmpty(pHashObj) || fp == NULL) {
return;
}
// disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
taosHashRLock(pHashObj);
int32_t numOfEntries = (int32_t)pHashObj->capacity;
for (int32_t i = 0; i < numOfEntries; ++i) {
......@@ -506,63 +571,32 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
continue;
}
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWLockLatch(&pEntry->latch);
}
// todo remove the first node
SHashNode *pNode = NULL;
while((pNode = pEntry->next) != NULL) {
if (fp && (!fp(param, GET_HASH_NODE_DATA(pNode)))) {
pEntry->num -= 1;
atomic_sub_fetch_64(&pHashObj->size, 1);
pEntry->next = pNode->next;
if (pEntry->num == 0) {
assert(pEntry->next == NULL);
} else {
assert(pEntry->next != NULL);
}
taosHashEntryWLock(pHashObj, pEntry);
FREE_HASH_NODE(pHashObj, pNode);
SHashNode *pPrevNode = NULL;
SHashNode *pNode = pEntry->next;
while (pNode != NULL) {
if (fp(param, GET_HASH_NODE_DATA(pNode))) {
pPrevNode = pNode;
pNode = pNode->next;
} else {
break;
}
}
// handle the following node
if (pNode != NULL) {
assert(pNode == pEntry->next);
SHashNode *pNext = NULL;
while ((pNext = pNode->next) != NULL) {
// not qualified, remove it
if (fp && (!fp(param, GET_HASH_NODE_DATA(pNext)))) {
pNode->next = pNext->next;
pEntry->num -= 1;
atomic_sub_fetch_64(&pHashObj->size, 1);
if (pEntry->num == 0) {
assert(pEntry->next == NULL);
} else {
assert(pEntry->next != NULL);
}
FREE_HASH_NODE(pHashObj, pNext);
if (pPrevNode == NULL) {
pEntry->next = pNode->next;
} else {
pNode = pNext;
pPrevNode->next = pNode->next;
}
pEntry->num -= 1;
atomic_sub_fetch_64(&pHashObj->size, 1);
SHashNode *next = pNode->next;
FREE_HASH_NODE(pNode);
pNode = next;
}
}
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pEntry->latch);
}
taosHashEntryWUnlock(pHashObj, pEntry);
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
return 0;
taosHashRUnlock(pHashObj);
}
void taosHashClear(SHashObj *pHashObj) {
......@@ -572,12 +606,12 @@ void taosHashClear(SHashObj *pHashObj) {
SHashNode *pNode, *pNext;
__wr_lock(&pHashObj->lock, pHashObj->type);
taosHashWLock(pHashObj);
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
SHashEntry *pEntry = pHashObj->hashList[i];
if (pEntry->num == 0) {
assert(pEntry->next == 0);
assert(pEntry->next == NULL);
continue;
}
......@@ -586,7 +620,7 @@ void taosHashClear(SHashObj *pHashObj) {
while (pNode) {
pNext = pNode->next;
FREE_HASH_NODE(pHashObj, pNode);
FREE_HASH_NODE(pNode);
pNode = pNext;
}
......@@ -596,7 +630,7 @@ void taosHashClear(SHashObj *pHashObj) {
}
pHashObj->size = 0;
__wr_unlock(&pHashObj->lock, pHashObj->type);
taosHashWUnlock(pHashObj);
}
// the input paras should be SHashObj **, so the origin input will be set by tfree(*pHashObj)
......@@ -616,25 +650,28 @@ void taosHashCleanup(SHashObj *pHashObj) {
}
taosArrayDestroy(&pHashObj->pMemBlock);
memset(pHashObj, 0, sizeof(SHashObj));
tfree(pHashObj);
free(pHashObj);
}
// for profile only
int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj) {
int32_t taosHashGetMaxOverflowLinkLength(SHashObj *pHashObj) {
if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) {
return 0;
}
int32_t num = 0;
taosHashRLock(pHashObj);
for (int32_t i = 0; i < pHashObj->size; ++i) {
SHashEntry *pEntry = pHashObj->hashList[i];
// fine grain per entry lock is not held since this is used
// for profiling only and doesn't need an accurate count.
if (num < pEntry->num) {
num = pEntry->num;
}
}
taosHashRUnlock(pHashObj);
return num;
}
......@@ -644,27 +681,23 @@ void taosHashTableResize(SHashObj *pHashObj) {
return;
}
// double the original capacity
SHashNode *pNode = NULL;
SHashNode *pNext = NULL;
int32_t newSize = (int32_t)(pHashObj->capacity << 1u);
if (newSize > HASH_MAX_CAPACITY) {
// uDebug("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached",
// pHashObj->capacity, HASH_MAX_CAPACITY);
int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u);
if (newCapacity > HASH_MAX_CAPACITY) {
uDebug("current capacity:%zu, maximum capacity:%d, no resize applied due to limitation is reached",
pHashObj->capacity, HASH_MAX_CAPACITY);
return;
}
int64_t st = taosGetTimestampUs();
void *pNewEntryList = realloc(pHashObj->hashList, sizeof(void *) * newSize);
if (pNewEntryList == NULL) { // todo handle error
// uDebug("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity);
void *pNewEntryList = realloc(pHashObj->hashList, sizeof(void *) * newCapacity);
if (pNewEntryList == NULL) {
uDebug("cache resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
return;
}
pHashObj->hashList = pNewEntryList;
size_t inc = newSize - pHashObj->capacity;
size_t inc = newCapacity - pHashObj->capacity;
void * p = calloc(inc, sizeof(SHashEntry));
for (int32_t i = 0; i < inc; ++i) {
......@@ -673,78 +706,46 @@ void taosHashTableResize(SHashObj *pHashObj) {
taosArrayPush(pHashObj->pMemBlock, &p);
pHashObj->capacity = newSize;
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
SHashEntry *pe = pHashObj->hashList[i];
if (pe->num == 0) {
assert(pe->next == NULL);
} else {
assert(pe->next != NULL);
}
pHashObj->capacity = newCapacity;
for (int32_t idx = 0; idx < pHashObj->capacity; ++idx) {
SHashEntry *pe = pHashObj->hashList[idx];
SHashNode *pNode;
SHashNode *pNext;
SHashNode *pPrev = NULL;
if (pe->num == 0) {
assert(pe->next == NULL);
continue;
}
while ((pNode = pe->next) != NULL) {
int32_t j = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
if (j != i) {
pe->num -= 1;
pe->next = pNode->next;
if (pe->num == 0) {
assert(pe->next == NULL);
} else {
assert(pe->next != NULL);
}
SHashEntry *pNewEntry = pHashObj->hashList[j];
pushfrontNodeInEntryList(pNewEntry, pNode);
} else {
break;
}
}
if (pNode != NULL) {
while ((pNext = pNode->next) != NULL) {
int32_t j = HASH_INDEX(pNext->hashVal, pHashObj->capacity);
if (j != i) {
pe->num -= 1;
pNode->next = pNext->next;
pNext->next = NULL;
pNode = pe->next;
// added into new slot
SHashEntry *pNewEntry = pHashObj->hashList[j];
if (pNewEntry->num == 0) {
assert(pNewEntry->next == NULL);
} else {
assert(pNewEntry->next != NULL);
}
assert(pNode != NULL);
pushfrontNodeInEntryList(pNewEntry, pNext);
while (pNode != NULL) {
int32_t newIdx = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
pNext = pNode->next;
if (newIdx != idx) {
pe->num -= 1;
if (pPrev == NULL) {
pe->next = pNext;
} else {
pNode = pNext;
pPrev->next = pNext;
}
}
if (pe->num == 0) {
assert(pe->next == NULL);
SHashEntry *pNewEntry = pHashObj->hashList[newIdx];
pushfrontNodeInEntryList(pNewEntry, pNode);
} else {
assert(pe->next != NULL);
pPrev = pNode;
}
pNode = pNext;
}
}
int64_t et = taosGetTimestampUs();
uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", (int32_t)pHashObj->capacity,
((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
}
SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) {
......@@ -757,8 +758,8 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s
pNewNode->keyLen = (uint32_t)keyLen;
pNewNode->hashVal = hashVal;
pNewNode->dataLen = (uint32_t) dsize;
pNewNode->count = 1;
pNewNode->dataLen = (uint32_t)dsize;
pNewNode->refCount = 1;
pNewNode->removed = 0;
pNewNode->next = NULL;
......@@ -805,40 +806,37 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
*slot = HASH_INDEX(pOld->hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[*slot];
// lock entry
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWLockLatch(&pe->latch);
}
taosHashEntryWLock(pHashObj, pe);
SHashNode *pNode = pe->next;
while (pNode) {
if (pNode == pOld)
if (pNode == pOld)
break;
prevNode = pNode;
pNode = pNode->next;
}
if (pNode) {
if (pNode) {
pNode = pNode->next;
while (pNode) {
if (pNode->removed == 0) break;
pNode = pNode->next;
}
atomic_sub_fetch_32(&pOld->count, 1);
if (pOld->count <=0) {
atomic_sub_fetch_32(&pOld->refCount, 1);
if (pOld->refCount <=0) {
if (prevNode) {
prevNode->next = pOld->next;
} else {
pe->next = pOld->next;
}
pe->num--;
atomic_sub_fetch_64(&pHashObj->size, 1);
FREE_HASH_NODE(pHashObj, pOld);
}
FREE_HASH_NODE(pOld);
}
} else {
uError("pNode:%p data:%p is not there!!!", pNode, p);
}
......@@ -847,22 +845,20 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
}
void *taosHashIterate(SHashObj *pHashObj, void *p) {
if (pHashObj == NULL) return NULL;
if (pHashObj == NULL) return NULL;
int slot = 0;
char *data = NULL;
// only add the read lock to disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
taosHashRLock(pHashObj);
SHashNode *pNode = NULL;
if (p) {
pNode = taosHashReleaseNode(pHashObj, p, &slot);
if (pNode == NULL) {
SHashEntry *pe = pHashObj->hashList[slot];
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch);
}
taosHashEntryWUnlock(pHashObj, pe);
slot = slot + 1;
}
......@@ -872,10 +868,7 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
for (; slot < pHashObj->capacity; ++slot) {
SHashEntry *pe = pHashObj->hashList[slot];
// lock entry
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWLockLatch(&pe->latch);
}
taosHashEntryWLock(pHashObj, pe);
pNode = pe->next;
while (pNode) {
......@@ -885,22 +878,18 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
if (pNode) break;
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch);
}
taosHashEntryWUnlock(pHashObj, pe);
}
}
if (pNode) {
SHashEntry *pe = pHashObj->hashList[slot];
atomic_add_fetch_32(&pNode->count, 1);
atomic_add_fetch_32(&pNode->refCount, 1);
data = GET_HASH_NODE_DATA(pNode);
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch);
}
taosHashEntryWUnlock(pHashObj, pe);
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
taosHashRUnlock(pHashObj);
return data;
}
......@@ -909,15 +898,13 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) {
if (pHashObj == NULL || p == NULL) return;
// only add the read lock to disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
taosHashRLock(pHashObj);
int slot;
taosHashReleaseNode(pHashObj, p, &slot);
SHashEntry *pe = pHashObj->hashList[slot];
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch);
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
taosHashEntryWUnlock(pHashObj, pe);
taosHashRUnlock(pHashObj);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册