diff --git a/src/inc/hash.h b/src/inc/hash.h new file mode 100644 index 0000000000000000000000000000000000000000..54a43fb6ebc3f692c642e1270a948016b4244194 --- /dev/null +++ b/src/inc/hash.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_HASH_H +#define TDENGINE_HASH_H + +#include "hashutil.h" + +#define HASH_MAX_CAPACITY (1024 * 1024 * 16) +#define HASH_VALUE_IN_TRASH (-1) +#define HASH_DEFAULT_LOAD_FACTOR (0.75) +#define HASH_INDEX(v, c) ((v) & ((c)-1)) + +typedef struct SHashNode { + char *key; // null-terminated string + union { + struct SHashNode * prev; + struct SHashEntry *prev1; + }; + + struct SHashNode *next; + uint32_t hashVal; // the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash + uint32_t keyLen; // length of the key + char data[]; +} SHashNode; + +typedef struct SHashEntry { + SHashNode *next; + uint32_t num; +} SHashEntry; + +typedef struct HashObj { + SHashEntry **hashList; + uint32_t capacity; + int size; + _hash_fn_t hashFp; + bool multithreadSafe; // enable lock + +#if defined LINUX + pthread_rwlock_t lock; +#else + pthread_mutex_t lock; +#endif + +} HashObj; + +void *taosInitHashTable(uint32_t capacity, _hash_fn_t fn, bool multithreadSafe); + +int32_t taosAddToHashTable(HashObj *pObj, const char *key, uint32_t keyLen, void *data, uint32_t size); +void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen); + +char *taosGetDataFromHash(HashObj *pObj, const char *key, uint32_t keyLen); + +void taosCleanUpHashTable(void *handle); + +int32_t taosGetHashMaxOverflowLength(HashObj *pObj); + +int32_t taosCheckHashTable(HashObj *pObj); + +#endif // TDENGINE_HASH_H diff --git a/src/inc/hashutil.h b/src/inc/hashutil.h new file mode 100644 index 0000000000000000000000000000000000000000..063881b0b4338862d8552205b462b824b710e514 --- /dev/null +++ b/src/inc/hashutil.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_HASHUTIL_H +#define TDENGINE_HASHUTIL_H + + +typedef uint32_t (*_hash_fn_t)(const char *, uint32_t); + +/** + * murmur hash algorithm + * @key usually string + * @len key length + * @seed hash seed + * @out an int32 value + */ +uint32_t MurmurHash3_32(const char *key, uint32_t len); + +/** + * + * @param key + * @param len + * @return + */ +uint32_t taosIntHash_32(const char *key, uint32_t len); + + +uint32_t taosIntHash_64(const char *key, uint32_t len); + +_hash_fn_t taosGetDefaultHashFunction(int32_t type); + +#endif //TDENGINE_HASHUTIL_H diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 25fb0374b8840f0aa1511b60bfbf10090dcece4b..7da2e60a385fa7b76ffa181dadd870f205fea683 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -17,6 +17,8 @@ #include "taosmsg.h" #include "textbuffer.h" #include "ttime.h" +#include "hash.h" +#include "hashutil.h" #include "tinterpolation.h" #include "tscJoinProcess.h" @@ -52,8 +54,6 @@ enum { static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t offset, int32_t size); -//__read_data_fn_t readDataFunctor[2] = {copyDataFromMMapBuffer, readDataFromDiskFile}; - static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadInfo); static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __block_search_fn_t searchFn, bool loadData); @@ -163,6 +163,30 @@ bool isGroupbyNormalCol(SSqlGroupbyExpr *pGroupbyExpr) { return false; } +int16_t getGroupbyColumnType(SQuery* pQuery, SSqlGroupbyExpr *pGroupbyExpr) { + assert(pGroupbyExpr != NULL); + + int32_t colId = -2; + int16_t type = TSDB_DATA_TYPE_NULL; + + for(int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) { + SColIndexEx *pColIndex = &pGroupbyExpr->columnInfo[i]; + if (pColIndex->flag == TSDB_COL_NORMAL) { + colId = pColIndex->colId; + break; + } + } + + for(int32_t i = 0; i < pQuery->numOfCols; ++i) { + if (colId == pQuery->colList[i].data.colId) { + type = pQuery->colList[i].data.type; + break; + } + } + + return type; +} + bool isSelectivityWithTagsQuery(SQuery *pQuery) { bool hasTags = false; int32_t numOfSelectivity = 0; @@ -1446,7 +1470,7 @@ static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx * return true; } -static int32_t setGroupResultForKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, char *columnData) { +static int32_t setGroupResultForKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes) { SOutputRes *pOutputRes = NULL; // ignore the null value @@ -1454,35 +1478,17 @@ static int32_t setGroupResultForKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData, return -1; } - int64_t t = 0; - switch (type) { - case TSDB_DATA_TYPE_TINYINT: - t = GET_INT8_VAL(pData); - break; - case TSDB_DATA_TYPE_BIGINT: - t = GET_INT64_VAL(pData); - break; - case TSDB_DATA_TYPE_SMALLINT: - t = GET_INT16_VAL(pData); - break; - case TSDB_DATA_TYPE_INT: - default: - t = GET_INT32_VAL(pData); - break; - } - - SOutputRes **p1 = (SOutputRes **)taosGetIntHashData(pRuntimeEnv->hashList, t); + SOutputRes **p1 = (SOutputRes **)taosGetDataFromHash(pRuntimeEnv->hashList, pData, bytes); if (p1 != NULL) { pOutputRes = *p1; - } else { - // more than the threshold number, discard data that are not belong to current groups + } else { // more than the threshold number, discard data that are not belong to current groups if (pRuntimeEnv->usedIndex >= 10000) { return -1; } // add a new result set for a new group - char *b = (char *)&pRuntimeEnv->pResult[pRuntimeEnv->usedIndex++]; - pOutputRes = *(SOutputRes **)taosAddIntHash(pRuntimeEnv->hashList, t, (char *)&b); + pOutputRes = &pRuntimeEnv->pResult[pRuntimeEnv->usedIndex++]; + taosAddToHashTable(pRuntimeEnv->hashList, pData, bytes, (char *)&pOutputRes, POINTER_BYTES); } setGroupOutputBuffer(pRuntimeEnv, pOutputRes); @@ -1686,7 +1692,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * if (groupbyStateValue) { char *stateVal = groupbyColumnData + bytes * offset; - int32_t ret = setGroupResultForKey(pRuntimeEnv, stateVal, type, groupbyColumnData); + int32_t ret = setGroupResultForKey(pRuntimeEnv, stateVal, type, bytes); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code continue; } @@ -2229,7 +2235,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tfree(pRuntimeEnv->secondaryUnzipBuffer); - taosCleanUpIntHash(pRuntimeEnv->hashList); + taosCleanUpHashTable(pRuntimeEnv->hashList); if (pRuntimeEnv->pCtx != NULL) { for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) { @@ -3742,9 +3748,11 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete // check data in file or cache bool dataInCache = true; bool dataInDisk = true; - pSupporter->runtimeEnv.pQuery = pQuery; + + SQueryRuntimeEnv* pRuntimeEnv = &pSupporter->runtimeEnv; + pRuntimeEnv->pQuery = pQuery; - vnodeCheckIfDataExists(&pSupporter->runtimeEnv, pMeterObj, &dataInDisk, &dataInCache); + vnodeCheckIfDataExists(pRuntimeEnv, pMeterObj, &dataInDisk, &dataInCache); /* data in file or cache is not qualified for the query. abort */ if (!(dataInCache || dataInDisk)) { @@ -3755,11 +3763,11 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete return TSDB_CODE_SUCCESS; } - pSupporter->runtimeEnv.pTSBuf = param; - pSupporter->runtimeEnv.cur.vnodeIndex = -1; + pRuntimeEnv->pTSBuf = param; + pRuntimeEnv->cur.vnodeIndex = -1; if (param != NULL) { - int16_t order = (pQuery->order.order == pSupporter->runtimeEnv.pTSBuf->tsOrder) ? TSQL_SO_ASC : TSQL_SO_DESC; - tsBufSetTraverseOrder(pSupporter->runtimeEnv.pTSBuf, order); + int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSQL_SO_ASC : TSQL_SO_DESC; + tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order); } // create runtime environment @@ -3775,9 +3783,13 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete return ret; } - pSupporter->runtimeEnv.hashList = taosInitIntHash(10039, sizeof(void *), taosHashInt); - pSupporter->runtimeEnv.usedIndex = 0; - pSupporter->runtimeEnv.pResult = pSupporter->pResult; + int16_t type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); + _hash_fn_t fn = taosGetDefaultHashFunction(type); + + pRuntimeEnv->hashList = taosInitHashTable(10039, fn, false); + + pRuntimeEnv->usedIndex = 0; + pRuntimeEnv->pResult = pSupporter->pResult; } // in case of last_row query, we set the query timestamp to pMeterObj->lastKey; @@ -3820,7 +3832,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete int64_t rs = taosGetIntervalStartTimestamp(pSupporter->rawSKey, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision); - taosInitInterpoInfo(&pSupporter->runtimeEnv.interpoInfo, pQuery->order.order, rs, 0, 0); + taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, rs, 0, 0); allocMemForInterpo(pSupporter, pQuery, pMeterObj); if (!isPointInterpoQuery(pQuery)) { @@ -3843,9 +3855,9 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { teardownQueryRuntimeEnv(&pSupporter->runtimeEnv); tfree(pSupporter->pMeterSidExtInfo); - if (pSupporter->pMeterObj != NULL) { - taosCleanUpIntHash(pSupporter->pMeterObj); - pSupporter->pMeterObj = NULL; + if (pSupporter->pMetersHashTable != NULL) { + taosCleanUpHashTable(pSupporter->pMetersHashTable); + pSupporter->pMetersHashTable = NULL; } if (pSupporter->pSidSet != NULL || isGroupbyNormalCol(pQInfo->query.pGroupbyExpr)) { @@ -3904,10 +3916,11 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) pQuery->pointsRead = 0; changeExecuteScanOrder(pQuery, true); + SQueryRuntimeEnv* pRuntimeEnv = &pSupporter->runtimeEnv; - doInitQueryFileInfoFD(&pSupporter->runtimeEnv.vnodeFileInfo); - vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo); - vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo); + doInitQueryFileInfoFD(&pRuntimeEnv->vnodeFileInfo); + vnodeInitDataBlockInfo(&pRuntimeEnv->loadBlockInfo); + vnodeInitLoadCompBlockInfo(&pRuntimeEnv->loadCompBlockInfo); /* * since we employ the output control mechanism in main loop. @@ -3929,15 +3942,15 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) } // get one queried meter - SMeterObj *pMeter = getMeterObj(pSupporter->pMeterObj, pSupporter->pSidSet->pSids[0]->sid); + SMeterObj *pMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pSidSet->pSids[0]->sid); - pSupporter->runtimeEnv.pTSBuf = param; - pSupporter->runtimeEnv.cur.vnodeIndex = -1; + pRuntimeEnv->pTSBuf = param; + pRuntimeEnv->cur.vnodeIndex = -1; // set the ts-comp file traverse order if (param != NULL) { - int16_t order = (pQuery->order.order == pSupporter->runtimeEnv.pTSBuf->tsOrder) ? TSQL_SO_ASC : TSQL_SO_DESC; - tsBufSetTraverseOrder(pSupporter->runtimeEnv.pTSBuf, order); + int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSQL_SO_ASC : TSQL_SO_DESC; + tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order); } int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pSupporter->runtimeEnv, pTagSchema, TSQL_SO_ASC, true); @@ -3953,9 +3966,9 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) } if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags; - pSupporter->runtimeEnv.hashList = taosInitIntHash(10039, sizeof(void *), taosHashInt); - pSupporter->runtimeEnv.usedIndex = 0; - pSupporter->runtimeEnv.pResult = pSupporter->pResult; + pRuntimeEnv->hashList = taosInitHashTable(10039, taosIntHash_64, false); + pRuntimeEnv->usedIndex = 0; + pRuntimeEnv->pResult = pSupporter->pResult; } if (pQuery->nAggTimeInterval != 0) { @@ -3976,7 +3989,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) return TSDB_CODE_SERV_NO_DISKSPACE; } - pSupporter->runtimeEnv.numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize; + pRuntimeEnv->numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize; pSupporter->lastPageId = -1; pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; @@ -3995,7 +4008,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) TSKEY revisedStime = taosGetIntervalStartTimestamp(pSupporter->rawSKey, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision); - taosInitInterpoInfo(&pSupporter->runtimeEnv.interpoInfo, pQuery->order.order, revisedStime, 0, 0); + taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, revisedStime, 0, 0); return TSDB_CODE_SUCCESS; } @@ -4014,7 +4027,7 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) { } else { int32_t num = 0; for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) { - SMeterObj *pMeter = getMeterObj(pSupporter->pMeterObj, pSupporter->pSidSet->pSids[i]->sid); + SMeterObj *pMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pSidSet->pSids[i]->sid); atomic_fetch_sub_32(&(pMeter->numOfQueries), 1); if (pMeter->numOfQueries > 0) { @@ -5060,7 +5073,7 @@ int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) { if (pMeterInfo[i].pMeterQInfo != NULL && pMeterInfo[i].pMeterQInfo->lastResRows > 0) { int32_t index = pMeterInfo[i].meterOrderIdx; - pRuntimeEnv->pMeterObj = getMeterObj(pSupporter->pMeterObj, pSupporter->pSidSet->pSids[index]->sid); + pRuntimeEnv->pMeterObj = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pSidSet->pSids[index]->sid); assert(pRuntimeEnv->pMeterObj == pMeterInfo[i].pMeterObj); int32_t ret = setIntervalQueryExecutionContext(pSupporter, i, pMeterInfo[i].pMeterQInfo); @@ -5666,7 +5679,7 @@ int32_t vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, tSidSet *pSidSet TSKEY skey, ekey; for (int32_t i = 0; i < pSidSet->numOfSids; ++i) { // load all meter meta info - SMeterObj *pMeterObj = getMeterObj(pSupporter->pMeterObj, pMeterSidExtInfo[i]->sid); + SMeterObj *pMeterObj = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[i]->sid); if (pMeterObj == NULL) { dError("QInfo:%p failed to find required sid:%d", pQInfo, pMeterSidExtInfo[i]->sid); continue; diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index c69b27537e82fc7a58aff764276bc8d932f18a1c..da17b0dd189a7d5f09762ed267462f0b8efdfccc 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -92,7 +92,7 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe SMeterSidExtInfo **pMeterSidExtInfo = pSupporter->pMeterSidExtInfo; - SMeterObj *pTempMeterObj = getMeterObj(pSupporter->pMeterObj, pMeterSidExtInfo[0]->sid); + SMeterObj *pTempMeterObj = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid); assert(pTempMeterObj != NULL); __block_search_fn_t searchFn = vnodeSearchKeyFunc[pTempMeterObj->searchAlgorithm]; @@ -111,7 +111,7 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe } for (int32_t k = start; k <= end; ++k) { - SMeterObj *pMeterObj = getMeterObj(pSupporter->pMeterObj, pMeterSidExtInfo[k]->sid); + SMeterObj *pMeterObj = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[k]->sid); if (pMeterObj == NULL) { dError("QInfo:%p failed to find meterId:%d, continue", pQInfo, pMeterSidExtInfo[k]->sid); continue; @@ -266,7 +266,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe SMeterDataBlockInfoEx *pDataBlockInfoEx = NULL; int32_t nAllocBlocksInfoSize = 0; - SMeterObj * pTempMeter = getMeterObj(pSupporter->pMeterObj, pSupporter->pMeterSidExtInfo[0]->sid); + SMeterObj * pTempMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pMeterSidExtInfo[0]->sid); __block_search_fn_t searchFn = vnodeSearchKeyFunc[pTempMeter->searchAlgorithm]; int32_t vnodeId = pTempMeter->vnode; @@ -475,7 +475,7 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool * setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - SMeterObj *pMeterObj = getMeterObj(pSupporter->pMeterObj, pMeterSidExtInfo[index]->sid); + SMeterObj *pMeterObj = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[index]->sid); if (pMeterObj == NULL) { dError("QInfo:%p do not find required meter id: %d, all meterObjs id is:", pQInfo, pMeterSidExtInfo[index]->sid); return false; @@ -576,7 +576,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { SQuery * pQuery = &pQInfo->query; tSidSet *pSids = pSupporter->pSidSet; - SMeterObj *pOneMeter = getMeterObj(pSupporter->pMeterObj, pMeterSidExtInfo[0]->sid); + SMeterObj *pOneMeter = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid); resetCtxOutputBuf(pRuntimeEnv); @@ -604,7 +604,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { } // get the last key of meters that belongs to this group - SMeterObj *pMeterObj = getMeterObj(pSupporter->pMeterObj, pMeterSidExtInfo[k]->sid); + SMeterObj *pMeterObj = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[k]->sid); if (pMeterObj != NULL) { if (key < pMeterObj->lastKey) { key = pMeterObj->lastKey; @@ -674,10 +674,10 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { } pRuntimeEnv->usedIndex = 0; - taosCleanUpIntHash(pRuntimeEnv->hashList); + taosCleanUpHashTable(pRuntimeEnv->hashList); int32_t primeHashSlot = 10039; - pRuntimeEnv->hashList = taosInitIntHash(primeHashSlot, POINTER_BYTES, taosHashInt); + pRuntimeEnv->hashList = taosInitHashTable(primeHashSlot, taosIntHash_32, false); while (pSupporter->meterIdx < pSupporter->numOfMeters) { int32_t k = pSupporter->meterIdx; diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index bbd3e9465c32174566f08b809545ae4b5d7e5f65..a76faf8a8d1ebfed7f7dd55f9c4facf768710581 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -25,6 +25,8 @@ #include "vnode.h" #include "vnodeRead.h" #include "vnodeUtil.h" +#include "hash.h" +#include "hashutil.h" int (*pQueryFunc[])(SMeterObj *, SQuery *) = {vnodeQueryFromCache, vnodeQueryFromFile}; @@ -655,8 +657,9 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE SMeterQuerySupportObj *pSupporter = (SMeterQuerySupportObj *)calloc(1, sizeof(SMeterQuerySupportObj)); pSupporter->numOfMeters = 1; - pSupporter->pMeterObj = taosInitIntHash(pSupporter->numOfMeters, POINTER_BYTES, taosHashInt); - taosAddIntHash(pSupporter->pMeterObj, pMetersObj[0]->sid, (char *)&pMetersObj[0]); + pSupporter->pMetersHashTable = taosInitHashTable(pSupporter->numOfMeters, taosIntHash_32, false); + taosAddToHashTable(pSupporter->pMetersHashTable, (const char*) &pMetersObj[0]->sid, sizeof(pMeterObj[0].sid), + (char *)&pMetersObj[0], POINTER_BYTES); pSupporter->pSidSet = NULL; pSupporter->subgroupIdx = -1; @@ -748,9 +751,10 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE SMeterQuerySupportObj *pSupporter = (SMeterQuerySupportObj *)calloc(1, sizeof(SMeterQuerySupportObj)); pSupporter->numOfMeters = pQueryMsg->numOfSids; - pSupporter->pMeterObj = taosInitIntHash(pSupporter->numOfMeters, POINTER_BYTES, taosHashInt); + pSupporter->pMetersHashTable = taosInitHashTable(pSupporter->numOfMeters, taosIntHash_32, false); for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) { - taosAddIntHash(pSupporter->pMeterObj, pMetersObj[i]->sid, (char *)&pMetersObj[i]); + taosAddToHashTable(pSupporter->pMetersHashTable, (const char*) &pMetersObj[i]->sid, sizeof(pMetersObj[i]->sid), (char *)&pMetersObj[i], + POINTER_BYTES); } pSupporter->pMeterSidExtInfo = (SMeterSidExtInfo **)pQueryMsg->pSidExtInfo; diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index ce1cabe1415c37173b3db1a1d3cc138b7cbef6f7..8a15cc896051a2459d4fd874866109025e44fb6d 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -472,7 +472,8 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { assert(code != TSDB_CODE_ACTION_IN_PROGRESS); - if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) { + if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS) && + pRetrieve->qhandle != 0) { dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code); vnodeDecRefCount(pObj->qhandle); pObj->qhandle = NULL; diff --git a/src/util/src/hash.c b/src/util/src/hash.c new file mode 100644 index 0000000000000000000000000000000000000000..a0ca4a96bbf5e8ca07e6cae063d8423af16d2a9c --- /dev/null +++ b/src/util/src/hash.c @@ -0,0 +1,537 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" + +#include "hash.h" +#include "tlog.h" +#include "ttime.h" +#include "tutil.h" + +static FORCE_INLINE void __wr_lock(void *lock) { +#if defined LINUX + pthread_rwlock_wrlock(lock); +#else + pthread_mutex_lock(lock); +#endif +} + +static FORCE_INLINE void __rd_lock(void *lock) { +#if defined LINUX + pthread_rwlock_rdlock(lock); +#else + pthread_mutex_lock(lock); +#endif +} + +static FORCE_INLINE void __unlock(void *lock) { +#if defined LINUX + pthread_rwlock_unlock(lock); +#else + pthread_mutex_unlock(lock); +#endif +} + +static FORCE_INLINE int32_t __lock_init(void *lock) { +#if defined LINUX + return pthread_rwlock_init(lock, NULL); +#else + return pthread_mutex_init(lock, NULL); +#endif +} + +static FORCE_INLINE void __lock_destroy(void *lock) { +#if defined LINUX + pthread_rwlock_destroy(lock); +#else + pthread_mutex_destroy(lock); +#endif +} + +static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { + int32_t len = MIN(length, HASH_MAX_CAPACITY); + + uint32_t i = 4; + while (i < len) i = (i << 1U); + return i; +} + +/** + * hash key function + * + * @param key key string + * @param len length of key + * @return hash value + */ +static FORCE_INLINE uint32_t taosHashKey(const char *key, uint32_t len) { return MurmurHash3_32(key, len); } + +/** + * inplace update node in hash table + * @param pObj hash table object + * @param pNode data node + */ +static void doUpdateHashTable(HashObj *pObj, SHashNode *pNode) { + if (pNode->prev1) { + pNode->prev1->next = pNode; + } + + if (pNode->next) { + (pNode->next)->prev = pNode; + } + + pTrace("key:%s %p update hash table", pNode->key, pNode); +} + +/** + * get SHashNode from hashlist, nodes from trash are not included. + * @param pObj Cache objection + * @param key key for hash + * @param keyLen key length + * @return + */ +static SHashNode *doGetNodeFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen, uint32_t *hashVal) { + uint32_t hash = (*pObj->hashFp)(key, keyLen); + + int32_t slot = HASH_INDEX(hash, pObj->capacity); + SHashEntry *pEntry = pObj->hashList[slot]; + + SHashNode *pNode = pEntry->next; + while (pNode) { + if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) { + break; + } + + pNode = pNode->next; + } + + if (pNode) { + assert(HASH_INDEX(pNode->hashVal, pObj->capacity) == slot); + } + + // return the calculated hash value, to avoid calculating it again in other functions + if (hashVal != NULL) { + *hashVal = hash; + } + + return pNode; +} + +/** + * resize the hash list if the threshold is reached + * + * @param pObj + */ +static void taosHashTableResize(HashObj *pObj) { + if (pObj->size < pObj->capacity * HASH_DEFAULT_LOAD_FACTOR) { + return; + } + + // double the original capacity + SHashNode *pNode = NULL; + SHashNode *pNext = NULL; + + int32_t newSize = pObj->capacity << 1U; + if (newSize > HASH_MAX_CAPACITY) { + pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", pObj->capacity, + HASH_MAX_CAPACITY); + return; + } + + int64_t st = taosGetTimestampUs(); + + SHashEntry **pNewEntry = realloc(pObj->hashList, sizeof(SHashEntry*) * newSize); + if (pNewEntry == NULL) { + pTrace("cache resize failed due to out of memory, capacity remain:%d", pObj->capacity); + return; + } + + pObj->hashList = pNewEntry; + for(int32_t i = pObj->capacity; i < newSize; ++i) { + pObj->hashList[i] = calloc(1, sizeof(SHashEntry)); + } + + pObj->capacity = newSize; + + for (int32_t i = 0; i < pObj->capacity; ++i) { + SHashEntry *pEntry = pObj->hashList[i]; + + pNode = pEntry->next; + if (pNode != NULL) { + assert(pNode->prev1 == pEntry && pEntry->num > 0); + } + + while (pNode) { + int32_t j = HASH_INDEX(pNode->hashVal, pObj->capacity); + if (j == i) { // this key resides in the same slot, no need to relocate it + pNode = pNode->next; + } else { + pNext = pNode->next; + + // remove from current slot + assert(pNode->prev1 != NULL); + + if (pNode->prev1 == pEntry) { // first node of the overflow linked list + pEntry->next = pNode->next; + } else { + pNode->prev->next = pNode->next; + } + + pEntry->num--; + assert(pEntry->num >= 0); + + if (pNode->next != NULL) { + (pNode->next)->prev = pNode->prev; + } + + // added into new slot + pNode->next = NULL; + pNode->prev1 = NULL; + + SHashEntry *pNewIndexEntry = pObj->hashList[j]; + + if (pNewIndexEntry->next != NULL) { + assert(pNewIndexEntry->next->prev1 == pNewIndexEntry); + + pNewIndexEntry->next->prev = pNode; + } + + pNode->next = pNewIndexEntry->next; + pNode->prev1 = pNewIndexEntry; + + pNewIndexEntry->next = pNode; + pNewIndexEntry->num++; + + // continue + pNode = pNext; + } + } + } + + int64_t et = taosGetTimestampUs(); + + pTrace("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pObj->capacity, + ((double)pObj->size) / pObj->capacity, (et - st) / 1000.0); +} + +/** + * @param capacity maximum slots available for hash elements + * @param fn hash function + * @return + */ +void *taosInitHashTable(uint32_t capacity, _hash_fn_t fn, bool multithreadSafe) { + if (capacity == 0 || fn == NULL) { + return NULL; + } + + HashObj *pObj = (HashObj *)calloc(1, sizeof(HashObj)); + if (pObj == NULL) { + pError("failed to allocate memory, reason:%s", strerror(errno)); + return NULL; + } + + // the max slots is not defined by user + pObj->capacity = taosHashCapacity(capacity); + assert((pObj->capacity & (pObj->capacity - 1)) == 0); + + pObj->hashFp = fn; + + pObj->hashList = (SHashEntry **)calloc(pObj->capacity, sizeof(SHashEntry*)); + if (pObj->hashList == NULL) { + free(pObj); + pError("failed to allocate memory, reason:%s", strerror(errno)); + return NULL; + } + + for(int32_t i = 0; i < pObj->capacity; ++i) { + pObj->hashList[i] = calloc(1, sizeof(SHashEntry)); + } + + if (multithreadSafe && (__lock_init(pObj) != 0)) { + free(pObj->hashList); + free(pObj); + + pError("failed to init lock, reason:%s", strerror(errno)); + return NULL; + } + + return (void *)pObj; +} + +/** + * @param key key of object for hash, usually a null-terminated string + * @param keyLen length of key + * @param pData actually data. required a consecutive memory block, no pointer is allowed + * in pData. Pointer copy causes memory access error. + * @param size size of block + * @return SHashNode + */ +static SHashNode *doCreateHashNode(const char *key, uint32_t keyLen, const char *pData, size_t dataSize, + uint32_t hashVal) { + size_t totalSize = dataSize + sizeof(SHashNode) + keyLen; + + SHashNode *pNewNode = calloc(1, totalSize); + if (pNewNode == NULL) { + pError("failed to allocate memory, reason:%s", strerror(errno)); + return NULL; + } + + memcpy(pNewNode->data, pData, dataSize); + + pNewNode->key = pNewNode->data + dataSize; + memcpy(pNewNode->key, key, keyLen); + pNewNode->keyLen = keyLen; + + pNewNode->hashVal = hashVal; + + return pNewNode; +} + +static SHashNode *doUpdateHashNode(SHashNode *pNode, const char *key, uint32_t keyLen, const char *pData, + size_t dataSize) { + size_t size = dataSize + sizeof(SHashNode) + keyLen; + + SHashNode *pNewNode = (SHashNode *)realloc(pNode, size); + if (pNewNode == NULL) { + return NULL; + } + + memcpy(pNewNode->data, pData, dataSize); + + pNewNode->key = pNewNode->data + dataSize; + + assert(memcmp(pNewNode->key, key, keyLen) == 0 && keyLen == pNewNode->keyLen); + + memcpy(pNewNode->key, key, keyLen); + return pNewNode; +} + +/** + * insert the hash node at the front of the linked list + * + * @param pObj + * @param pNode + */ +static void doAddToHashTable(HashObj *pObj, SHashNode *pNode) { + assert(pNode != NULL); + + int32_t index = HASH_INDEX(pNode->hashVal, pObj->capacity); + SHashEntry *pEntry = pObj->hashList[index]; + + pNode->next = pEntry->next; + + if (pEntry->next) { + pEntry->next->prev = pNode; + } + + pEntry->next = pNode; + pNode->prev1 = pEntry; + + pEntry->num++; + + pObj->size++; + + char key[512] = {0}; + memcpy(key, pNode->key, MIN(512, pNode->keyLen)); + + pTrace("key:%s %p add to hash table", key, pNode); +} + +/** + * add data node into hash table + * @param pObj hash object + * @param pNode hash node + */ +int32_t taosAddToHashTable(HashObj *pObj, const char *key, uint32_t keyLen, void *data, uint32_t size) { + if (pObj->multithreadSafe) { + __wr_lock(&pObj->lock); + } + + uint32_t hashVal = 0; + SHashNode *pNode = doGetNodeFromHashTable(pObj, key, keyLen, &hashVal); + + if (pNode == NULL) { // no data in hash table with the specified key, add it into hash table + taosHashTableResize(pObj); + + SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal); + if (pNewNode == NULL) { + if (pObj->multithreadSafe) { + __unlock(&pObj->lock); + } + + return -1; + } + + doAddToHashTable(pObj, pNewNode); + } else { + SHashNode *pNewNode = doUpdateHashNode(pNode, key, keyLen, data, size); + if (pNewNode == NULL) { + if (pObj->multithreadSafe) { + __unlock(&pObj->lock); + } + + return -1; + } + + doUpdateHashTable(pObj, pNewNode); + } + + if (pObj->multithreadSafe) { + __unlock(&pObj->lock); + } + + return 0; +} + +char *taosGetDataFromHash(HashObj *pObj, const char *key, uint32_t keyLen) { + if (pObj->multithreadSafe) { + __rd_lock(&pObj->lock); + } + + uint32_t hashVal = 0; + SHashNode *pNode = doGetNodeFromHashTable(pObj, key, keyLen, &hashVal); + + if (pObj->multithreadSafe) { + __unlock(&pObj->lock); + } + + if (pNode != NULL) { + assert(pNode->hashVal == hashVal); + + return pNode->data; + } else { + return NULL; + } +} + +/** + * remove node in hash list + * @param pObj + * @param pNode + */ +void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) { + if (pObj->multithreadSafe) { + __wr_lock(&pObj->lock); + } + + SHashNode *pNode = doGetNodeFromHashTable(pObj, key, keyLen, NULL); + if (pNode == NULL) { + if (pObj->multithreadSafe) { + __unlock(&pObj->lock); + } + + return; + } + + SHashNode *pNext = pNode->next; + if (pNode->prev != NULL) { + pNode->prev->next = pNext; + } + + if (pNext != NULL) { + pNext->prev = pNode->prev; + } + + uint32_t index = HASH_INDEX(pNode->hashVal, pObj->capacity); + SHashEntry *pEntry = pObj->hashList[index]; + pEntry->num--; + + pObj->size--; + + pNode->next = NULL; + pNode->prev = NULL; + + pTrace("key:%s %p remove from hash table", pNode->key, pNode); + tfree(pNode); + + if (pObj->multithreadSafe) { + __unlock(&pObj->lock); + } +} + +void taosCleanUpHashTable(void *handle) { + HashObj *pObj = (HashObj *)handle; + if (pObj == NULL || pObj->capacity <= 0) return; + + SHashNode *pNode, *pNext; + + if (pObj->multithreadSafe) { + __wr_lock(&pObj->lock); + } + + if (pObj->hashList) { + for (int32_t i = 0; i < pObj->capacity; ++i) { + SHashEntry *pEntry = pObj->hashList[i]; + pNode = pEntry->next; + + while (pNode) { + pNext = pNode->next; + free(pNode); + pNode = pNext; + } + } + + free(pObj->hashList); + } + + if (pObj->multithreadSafe) { + __unlock(&pObj->lock); + __lock_destroy(&pObj->lock); + } + + memset(pObj, 0, sizeof(HashObj)); + free(pObj); +} + +// for profile only +int32_t taosGetHashMaxOverflowLength(HashObj* pObj) { + if (pObj == NULL || pObj->size == 0) { + return 0; + } + + int32_t num = 0; + + for(int32_t i = 0; i < pObj->size; ++i) { + SHashEntry *pEntry = pObj->hashList[i]; + if (num < pEntry->num) { + num = pEntry->num; + } + } + + return num; +} + +int32_t taosCheckHashTable(HashObj *pObj) { + for(int32_t i = 0; i < pObj->capacity; ++i) { + SHashEntry *pEntry = pObj->hashList[i]; + + SHashNode* pNode = pEntry->next; + if (pNode != NULL) { + assert(pEntry == pNode->prev1); + int32_t num = 1; + + SHashNode* pNext = pNode->next; + + while(pNext) { + assert(pNext->prev == pNode); + + pNode = pNext; + pNext = pNext->next; + num ++; + } + + assert(num == pEntry->num); + } + } +} diff --git a/src/util/src/thashutil.c b/src/util/src/thashutil.c index b6b3ea682ef945a838f67ca227c8033624234725..6166929c729e584c8c34a77a3b1ce5df760845f4 100644 --- a/src/util/src/thashutil.c +++ b/src/util/src/thashutil.c @@ -8,6 +8,7 @@ * */ #include "tutil.h" +#include "hashutil.h" #define ROTL32(x, r) ((x) << (r) | (x) >> (32 - (r))) @@ -67,7 +68,7 @@ static void MurmurHash3_32_s(const void *key, int len, uint32_t seed, void *out) *(uint32_t *)out = h1; } -uint32_t MurmurHash3_32(const void *key, int len) { +uint32_t MurmurHash3_32(const char *key, uint32_t len) { const int32_t hashSeed = 0x12345678; uint32_t val = 0; @@ -75,3 +76,26 @@ uint32_t MurmurHash3_32(const void *key, int len) { return val; } + +uint32_t taosIntHash_32(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint32_t *)key; } + +uint32_t taosIntHash_64(const char *key, uint32_t UNUSED_PARAM(len)) { + uint64_t val = *(uint64_t *)key; + + uint64_t hash = val >> 16U; + hash += (val & 0xFFFFU); + + return hash; +} + +_hash_fn_t taosGetDefaultHashFunction(int32_t type) { + _hash_fn_t fn = NULL; + switch(type) { + case TSDB_DATA_TYPE_BIGINT: fn = taosIntHash_64;break; + case TSDB_DATA_TYPE_BINARY: fn = MurmurHash3_32;break; + case TSDB_DATA_TYPE_INT: fn = taosIntHash_32; break; + default: fn = taosIntHash_32;break; + } + + return fn; + } \ No newline at end of file diff --git a/src/util/src/ttokenizer.c b/src/util/src/ttokenizer.c index 0fbc4dc93503db57dd1fd98e79516cfbd89a5515..1caec2736e8a92aa41696334cfdc660a18844868 100644 --- a/src/util/src/ttokenizer.c +++ b/src/util/src/ttokenizer.c @@ -14,11 +14,13 @@ */ #include "os.h" +#include "hashutil.h" #include "shash.h" #include "tutil.h" #include "tsqldef.h" #include "tstoken.h" #include "ttypes.h" +#include "hash.h" // All the keywords of the SQL language are stored in a hash table typedef struct SKeyword { @@ -225,11 +227,9 @@ static SKeyword keywordTable[] = { {"STABLE", TK_STABLE}, {"FILE", TK_FILE}, {"VNODES", TK_VNODES}, + {"UNION", TK_UNION}, }; -/* This is the hash table */ -static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; - static const char isIdChar[] = { /* x0 x1 x2 x3 x4 x5 x6 x7 x8 x9 xA xB xC xD xE xF */ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* 0x */ @@ -243,27 +243,22 @@ static const char isIdChar[] = { }; static void* KeywordHashTable = NULL; -int tSQLKeywordCode(const char* z, int n) { - int i; - static char needInit = 1; - if (needInit) { - // Initialize the keyword hash table - pthread_mutex_lock(&mutex); - - // double check - if (needInit) { - int nk = tListLen(keywordTable); - - KeywordHashTable = taosInitStrHash(nk, POINTER_BYTES, taosHashStringStep1); - for (i = 0; i < nk; i++) { - keywordTable[i].len = strlen(keywordTable[i].name); - void* ptr = &keywordTable[i]; - taosAddStrHash(KeywordHashTable, (char*)keywordTable[i].name, (void*)&ptr); - } - needInit = 0; - } - pthread_mutex_unlock(&mutex); + +static void doInitKeywordsTable() { + int numOfEntries = tListLen(keywordTable); + + KeywordHashTable = taosInitHashTable(numOfEntries, MurmurHash3_32, false); + for (int32_t i = 0; i < numOfEntries; i++) { + keywordTable[i].len = strlen(keywordTable[i].name); + void* ptr = &keywordTable[i]; + taosAddToHashTable(KeywordHashTable, keywordTable[i].name, keywordTable[i].len, (void*)&ptr, POINTER_BYTES); } +} + +static pthread_once_t keywordsHashTableInit = PTHREAD_ONCE_INIT; + +int tSQLKeywordCode(const char* z, int n) { + pthread_once(&keywordsHashTableInit, doInitKeywordsTable); char key[128] = {0}; for (int32_t j = 0; j < n; ++j) { @@ -274,7 +269,7 @@ int tSQLKeywordCode(const char* z, int n) { } } - SKeyword** pKey = (SKeyword**)taosGetStrHashData(KeywordHashTable, key); + SKeyword** pKey = (SKeyword**)taosGetDataFromHash(KeywordHashTable, key, n); if (pKey != NULL) { return (*pKey)->type; } else {