未验证 提交 04750d90 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18905 from taosdata/feature/3_liaohj

refactor: do multiple refactor and improve some query perf.
......@@ -123,21 +123,37 @@ ELSE ()
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
ENDIF ()
IF (TD_INTEL_64 OR TD_INTEL_32)
ADD_DEFINITIONS("-msse4.2")
IF("${FMA_SUPPORT}" MATCHES "true")
MESSAGE(STATUS "fma function supported")
ADD_DEFINITIONS("-mfma")
ELSE ()
MESSAGE(STATUS "fma function NOT supported")
INCLUDE(CheckCCompilerFlag)
IF (("${CMAKE_C_COMPILER_ID}" MATCHES "Clang") OR ("${CMAKE_C_COMPILER_ID}" MATCHES "AppleClang"))
SET(COMPILER_SUPPORT_SSE42 true)
MESSAGE(STATUS "Always enable sse4.2 for Clang/AppleClang")
ELSE()
CHECK_C_COMPILER_FLAG("-msse4.2" COMPILER_SUPPORT_SSE42)
ENDIF()
CHECK_C_COMPILER_FLAG("-mfma" COMPILER_SUPPORT_FMA)
CHECK_C_COMPILER_FLAG("-mavx" COMPILER_SUPPORT_AVX)
CHECK_C_COMPILER_FLAG("-mavx2" COMPILER_SUPPORT_AVX2)
IF (COMPILER_SUPPORT_SSE42)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -msse4.2")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -msse4.2")
ENDIF()
IF (COMPILER_SUPPORT_FMA)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfma")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mfma")
ENDIF()
IF ("${SIMD_SUPPORT}" MATCHES "true")
IF (COMPILER_SUPPORT_AVX)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx")
ENDIF()
IF("${SIMD_SUPPORT}" MATCHES "true")
ADD_DEFINITIONS("-mavx -mavx2")
MESSAGE(STATUS "SIMD instructions (AVX/AVX2) is ACTIVATED")
ELSE()
MESSAGE(STATUS "SIMD instruction (AVX/AVX2)is NOT ACTIVATED")
IF (COMPILER_SUPPORT_AVX2)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx2")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2")
ENDIF()
ENDIF ()
MESSAGE(STATUS "SIMD instructions (AVX/AVX2) is ACTIVATED")
ENDIF()
ENDIF ()
......@@ -147,7 +147,7 @@ ELSE ()
ENDIF ()
ENDIF ()
MESSAGE(STATUS "platform arch:" ${PLATFORM_ARCH_STR})
MESSAGE(STATUS "Platform arch:" ${PLATFORM_ARCH_STR})
MESSAGE("C Compiler: ${CMAKE_C_COMPILER} (${CMAKE_C_COMPILER_ID}, ${CMAKE_C_COMPILER_VERSION})")
MESSAGE("CXX Compiler: ${CMAKE_CXX_COMPILER} (${CMAKE_C_COMPILER_ID}, ${CMAKE_CXX_COMPILER_VERSION})")
......@@ -195,6 +195,7 @@ typedef struct SDataBlockInfo {
uint32_t capacity;
SBlockID id;
int16_t hasVarCol;
int16_t dataLoad; // denote if the data is loaded or not
// TODO: optimize and remove following
int64_t version; // used for stream, and need serialization
......
......@@ -36,7 +36,7 @@ extern int64_t tsStreamMax;
extern float tsNumOfCores;
extern int64_t tsTotalMemoryKB;
extern char *tsProcPath;
extern char tsSIMDEnable;
extern char tsSIMDBuiltins;
extern char tsSSE42Enable;
extern char tsAVXEnable;
extern char tsAVX2Enable;
......
......@@ -358,7 +358,11 @@ size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSiz
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) {
if (pDataBlock == NULL || pDataBlock->info.rows <= 0) {
if (pDataBlock->info.rows > 0) {
// ASSERT(pDataBlock->info.dataLoad == 1);
}
if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0) {
return 0;
}
......@@ -1157,13 +1161,14 @@ void blockDataEmpty(SSDataBlock* pDataBlock) {
}
pInfo->rows = 0;
pInfo->dataLoad = 0;
pInfo->window.ekey = 0;
pInfo->window.skey = 0;
}
// todo temporarily disable it
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, bool clearPayload) {
ASSERT(numOfRows > 0 /*&& pBlockInfo->capacity >= pBlockInfo->rows*/);
ASSERT(numOfRows > 0);
if (numOfRows <= pBlockInfo->capacity) {
return TSDB_CODE_SUCCESS;
}
......@@ -1220,7 +1225,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo*
return TSDB_CODE_SUCCESS;
}
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
pColumn->hasNull = false;
if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
......@@ -2427,6 +2432,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
pStart += colLen[i];
}
pBlock->info.dataLoad = 1;
pBlock->info.rows = numOfRows;
ASSERT(pStart - pData == dataLen);
return pStart;
......
......@@ -341,7 +341,7 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "AVX", tsAVXEnable, 0) != 0) return -1;
if (cfgAddBool(pCfg, "AVX2", tsAVX2Enable, 0) != 0) return -1;
if (cfgAddBool(pCfg, "FMA", tsFMAEnable, 0) != 0) return -1;
if (cfgAddBool(pCfg, "SIMD-Supported", tsSIMDEnable, 0) != 0) return -1;
if (cfgAddBool(pCfg, "SIMD-builtins", tsSIMDBuiltins, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "openMax", tsOpenMax, 0, INT64_MAX, 1) != 0) return -1;
if (cfgAddInt64(pCfg, "streamMax", tsStreamMax, 0, INT64_MAX, 1) != 0) return -1;
......
......@@ -533,6 +533,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
pBlock->info.id.uid = pReader->msgIter.uid;
pBlock->info.rows = pReader->msgIter.numOfRows;
pBlock->info.version = pReader->pMsg->version;
pBlock->info.dataLoad = 1;
while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row);
......
......@@ -1143,6 +1143,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
i += 1;
}
pResBlock->info.dataLoad = 1;
pResBlock->info.rows = dumpedRows;
pDumpInfo->rowIndex += step * dumpedRows;
......@@ -2538,6 +2539,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
_end:
pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
pResBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);
setComposedBlockFlag(pReader, true);
......@@ -3622,6 +3624,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow*
i += 1;
}
pBlock->info.dataLoad = 1;
pBlock->info.rows += 1;
pScanInfo->lastKey = pTSRow->ts;
return TSDB_CODE_SUCCESS;
......@@ -3669,6 +3672,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
i += 1;
}
pResBlock->info.dataLoad = 1;
pResBlock->info.rows += 1;
return TSDB_CODE_SUCCESS;
}
......
......@@ -510,6 +510,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo
blockDataEnsureCapacity(pRes, pBlock->info.rows);
// data from mnode
pRes->info.dataLoad = 1;
pRes->info.rows = pBlock->info.rows;
relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
blockDataDestroy(pBlock);
......
......@@ -1080,7 +1080,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
pBlock->info.id.groupId);
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, 0);
return 0;
}
......@@ -2546,6 +2546,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
pBlock->info.rows += pRow->numOfRows;
releaseOutputBuf(pState, &key, pRow);
}
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, 0);
return TSDB_CODE_SUCCESS;
}
......@@ -2635,6 +2636,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
}
}
pBlock->info.dataLoad = 1;
pBlock->info.rows += pRow->numOfRows;
// saveSessionDiscBuf(pState, pKey, pVal, size);
releaseOutputBuf(pState, NULL, pRow);
......
......@@ -698,6 +698,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
pInfo->pageIndex += 1;
releaseBufPage(pInfo->pBuf, page);
pInfo->binfo.pRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId;
......@@ -960,6 +961,8 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
}
taosArrayDestroy(pParInfo->rowIds);
pParInfo->rowIds = NULL;
pDest->info.dataLoad = 1;
blockDataUpdateTsWindow(pDest, pInfo->tsColIndex);
pDest->info.id.groupId = pParInfo->groupId;
pOperator->resultInfo.totalRows += pDest->info.rows;
......
......@@ -87,11 +87,11 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
}
int32_t numOfCols = 0;
SSDataBlock* pResBlock = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pRes = pResBlock;
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->exprSupp.pExprInfo = pExprInfo;
......@@ -401,6 +401,7 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
// the pDataBlock are always the same one, no need to call this again
pRes->info.rows = nrows;
pRes->info.dataLoad = 1;
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break;
}
......@@ -412,7 +413,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
SSDataBlock* pRes = pJoinInfo->pRes;
blockDataCleanup(pRes);
blockDataEnsureCapacity(pRes, 4096);
while (true) {
int32_t numOfRowsBefore = pRes->info.rows;
doMergeJoinImpl(pOperator, pRes);
......
......@@ -654,6 +654,7 @@ static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, S
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput, SArray* pPseudoList) {
setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
pResult->info.dataLoad = 1;
if (pSrcBlock == NULL) {
for (int32_t k = 0; k < numOfOutput; ++k) {
......
......@@ -110,9 +110,9 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
if (order == TSDB_ORDER_ASC) {
w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
assert(w.ekey >= pBlockInfo->window.skey);
ASSERT(w.ekey >= pBlockInfo->window.skey);
if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
if (w.ekey < pBlockInfo->window.ekey) {
return true;
}
......@@ -122,16 +122,16 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
break;
}
assert(w.ekey > pBlockInfo->window.ekey);
ASSERT(w.ekey > pBlockInfo->window.ekey);
if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
return true;
}
}
} else {
w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
assert(w.skey <= pBlockInfo->window.ekey);
ASSERT(w.skey <= pBlockInfo->window.ekey);
if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
if (w.skey > pBlockInfo->window.skey) {
return true;
}
......@@ -1342,6 +1342,7 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
}
pDestBlock->info.type = STREAM_CLEAR;
pDestBlock->info.version = pSrcBlock->info.version;
pDestBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pDestBlock, 0);
return code;
}
......@@ -1450,6 +1451,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
}
if (out && pInfo->pUpdateDataRes->info.rows > 0) {
pInfo->pUpdateDataRes->info.version = pBlock->info.version;
pInfo->pUpdateDataRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
}
......@@ -1512,6 +1514,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
}
pInfo->pRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
blockDataFreeRes((SSDataBlock*)pBlock);
......@@ -1793,6 +1796,7 @@ FETCH_NEXT_BLOCK:
// TODO move into scan
pBlock->info.calWin.skey = INT64_MIN;
pBlock->info.calWin.ekey = INT64_MAX;
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, 0);
switch (pBlock->info.type) {
case STREAM_NORMAL:
......@@ -1970,6 +1974,7 @@ FETCH_NEXT_BLOCK:
}
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
pInfo->pRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
......
......@@ -105,6 +105,7 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
}
}
pBlock->info.dataLoad = 1;
pBlock->info.rows += 1;
}
......@@ -698,6 +699,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
pInfo->limitInfo.numOfOutputRows += p->info.rows;
pDataBlock->info.rows = p->info.rows;
pDataBlock->info.id.groupId = pInfo->groupId;
pDataBlock->info.dataLoad = 1;
}
qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
......
......@@ -1037,9 +1037,10 @@ SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SRe
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) {
TSKEY* tsCols = NULL;
if (pBlock->pDataBlock != NULL) {
if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad == 1) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
ASSERT(tsCols[0] != 0);
// no data in primary ts
if (tsCols[0] == 0 && tsCols[pBlock->info.rows - 1] == 0) {
......@@ -1083,8 +1084,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, pInfo->inputOrder, scanFlag, true);
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag);
}
......
......@@ -166,6 +166,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
pInfo->current += 1;
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, 0);
return pBlock;
}
......
......@@ -514,7 +514,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
numOfElem = pInput->numOfRows;
pAvgRes->count += pInput->numOfRows;
bool simdAvailable = tsAVXEnable && tsSIMDEnable && (numOfRows > THRESHOLD_SIZE);
bool simdAvailable = tsAVXEnable && tsSIMDBuiltins && (numOfRows > THRESHOLD_SIZE);
switch(type) {
case TSDB_DATA_TYPE_UTINYINT:
......
......@@ -369,7 +369,7 @@ static int32_t findFirstValPosition(const SColumnInfoData* pCol, int32_t start,
static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc,
bool signVal) {
// AVX2 version to speedup the loop
if (tsAVX2Enable && tsSIMDEnable) {
if (tsAVX2Enable && tsSIMDBuiltins) {
pBuf->v = i8VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
} else {
if (!pBuf->assign) {
......@@ -403,7 +403,7 @@ static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SM
static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc,
bool signVal) {
// AVX2 version to speedup the loop
if (tsAVX2Enable && tsSIMDEnable) {
if (tsAVX2Enable && tsSIMDBuiltins) {
pBuf->v = i16VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
} else {
if (!pBuf->assign) {
......@@ -437,7 +437,7 @@ static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, S
static void handleInt32Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc,
bool signVal) {
// AVX2 version to speedup the loop
if (tsAVX2Enable && tsSIMDEnable) {
if (tsAVX2Enable && tsSIMDBuiltins) {
pBuf->v = i32VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
} else {
if (!pBuf->assign) {
......@@ -500,7 +500,7 @@ static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRo
float* val = (float*)&pBuf->v;
// AVX version to speedup the loop
if (tsAVXEnable && tsSIMDEnable) {
if (tsAVXEnable && tsSIMDBuiltins) {
*val = floatVectorCmpAVX(pData, numOfRows, isMinFunc);
} else {
if (!pBuf->assign) {
......@@ -530,7 +530,7 @@ static void handleDoubleCol(SColumnInfoData* pCol, int32_t start, int32_t numOfR
double* val = (double*)&pBuf->v;
// AVX version to speedup the loop
if (tsAVXEnable && tsSIMDEnable) {
if (tsAVXEnable && tsSIMDBuiltins) {
*val = (double)doubleVectorCmpAVX(pData, numOfRows, isMinFunc);
} else {
if (!pBuf->assign) {
......
......@@ -37,7 +37,7 @@ float tsNumOfCores = 0;
int64_t tsTotalMemoryKB = 0;
char *tsProcPath = NULL;
char tsSIMDEnable = 0;
char tsSIMDBuiltins = 0;
char tsSSE42Enable = 0;
char tsAVXEnable = 0;
char tsAVX2Enable = 0;
......
......@@ -485,11 +485,11 @@ int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma) {
#ifdef _TD_X86_
// Since the compiler is not support avx/avx2 instructions, the global variables always need to be
// set to be false
#if __AVX__ || __AVX2__
tsSIMDEnable = true;
#else
tsSIMDEnable = false;
#endif
//#if __AVX__ || __AVX2__
// tsSIMDBuiltins = true;
//#else
// tsSIMDBuiltins = false;
//#endif
uint32_t eax = 0, ebx = 0, ecx = 0, edx = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册