diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 6f933c663eba534250bc9cb77cfd545212551dd8..15f3246013c5d289aa6689d55de162c435153344 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -203,11 +203,10 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows); void blockDataCleanup(SSDataBlock* pDataBlock); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); -void* blockDataDestroy(SSDataBlock* pBlock); int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n); -SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); +SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); void blockDebugShowData(const SArray* dataBlocks); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 52262eeaef17f19f5d6d0fb875eb9f8d4f604ade..96a7230ff3df8ef336d457b8a23231522f2ad216 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -596,12 +596,15 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo) { pResultInfo->row[i] = varDataVal(pStart); } else { pResultInfo->row[i] = NULL; + pResultInfo->length[i] = 0; } } else { if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) { pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current; + pResultInfo->length[i] = bytes; } else { pResultInfo->row[i] = NULL; + pResultInfo->length[i] = 0; } } } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 0b88a75a7b293df68fb4825ba0e23dcdb60588dd..e293c8923d3d5f1b329cbe71cb5f7e44ef301820 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -410,7 +410,11 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { } SResultColumn *pCol = &pResultInfo->pCol[col]; - return colDataIsNull_f(pCol->nullbitmap, row); + if (IS_VAR_DATA_TYPE(pResultInfo->fields[col].type)) { + return (pCol->offset[row] == -1); + } else { + return colDataIsNull_f(pCol->nullbitmap, row); + } } bool taos_is_update_query(TAOS_RES *res) { return taos_num_fields(res) == 0; } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index f5d8d7ec67be194a2aeb685359af0c7d65958ecf..2730723a1734bd37c0da324538fbba48bb4f34ec 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -459,11 +459,10 @@ TEST(testCase, create_multiple_tables) { taos_free_result(pRes); - for (int32_t i = 0; i < 25000; ++i) { + for (int32_t i = 0; i < 500; i += 2) { char sql[512] = {0}; snprintf(sql, tListLen(sql), - "create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5) t_x_%d using st1 tags(911)", i, - (i + 1) * 30, (i + 2) * 40); + "create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5)", i, i + 1); TAOS_RES* pres = taos_query(pConn, sql); if (taos_errno(pres) != 0) { printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); @@ -653,6 +652,7 @@ TEST(testCase, projection_query_stables) { taos_free_result(pRes); taos_close(pConn); } + #endif TEST(testCase, agg_query_tables) { @@ -662,7 +662,7 @@ TEST(testCase, agg_query_tables) { TAOS_RES* pRes = taos_query(pConn, "use abc1"); taos_free_result(pRes); - pRes = taos_query(pConn, "select length('abc') from tu"); + pRes = taos_query(pConn, "select * from test_block_raw.all_type"); if (taos_errno(pRes) != 0) { printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); @@ -673,6 +673,10 @@ TEST(testCase, agg_query_tables) { TAOS_FIELD* pFields = taos_fetch_fields(pRes); int32_t numOfFields = taos_num_fields(pRes); + int32_t n = 0; + void* data = NULL; + int32_t code = taos_fetch_raw_block(pRes, &n, &data); + char str[512] = {0}; while ((pRow = taos_fetch_row(pRes)) != NULL) { int32_t* length = taos_fetch_lengths(pRes); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index dad5805c6698b1c43920e9e4f854ab01d8217e29..beabc1b6eb276b06c4b3e87c917b09b11df41e29 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1149,10 +1149,11 @@ void* blockDataDestroy(SSDataBlock* pBlock) { return NULL; } -SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { +SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { if(pDataBlock == NULL){ return NULL; } + int32_t numOfCols = pDataBlock->info.numOfCols; SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); @@ -1160,6 +1161,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { pBlock->info.numOfCols = numOfCols; pBlock->info.hasVarCol = pDataBlock->info.hasVarCol; + pBlock->info.rowSize = pDataBlock->info.rows; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {0}; @@ -1168,6 +1170,23 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { taosArrayPush(pBlock->pDataBlock, &colInfo); } + if (copyData) { + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); + SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); + + int32_t code = colInfoDataEnsureCapacity(pDst, pDataBlock->info.rows); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } + + colDataAssign(pDst, pSrc, pDataBlock->info.rows); + } + + pBlock->info.rows = pDataBlock->info.rows; + pBlock->info.capacity = pDataBlock->info.rows; + } + return pBlock; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index cb787e77a62c83e7c1670c09f43e0adc17cbfb07..cbd2dc66f5f9dfa8e2c7219ab2a67e659ec3779f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -48,8 +48,6 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int #define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) -#define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL ? 0 : ((_r)->outputBuf)->info.rows) - #define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0) enum { @@ -84,21 +82,6 @@ typedef struct SResultInfo { // TODO refactor int32_t threshold; // result size threshold in rows. } SResultInfo; -typedef struct SColumnFilterElem { - int16_t bytes; // column length - __filter_func_t fp; - SColumnFilterInfo filterInfo; - void* q; -} SColumnFilterElem; - -typedef struct SSingleColumnFilterInfo { - void* pData; - void* pData2; // used for nchar column - int32_t numOfFilters; - SColumnInfo info; - SColumnFilterElem* pFilters; -} SSingleColumnFilterInfo; - typedef struct STableQueryInfo { TSKEY lastKey; // last check ts uint64_t uid; // table uid @@ -169,66 +152,36 @@ typedef struct SOperatorCostInfo { uint64_t totalCost; } SOperatorCostInfo; -typedef struct SOrder { - uint32_t order; - SColumn col; -} SOrder; - // The basic query information extracted from the SQueryInfo tree to support the // execution of query in a data node. typedef struct STaskAttr { - SLimit limit; - SLimit slimit; - - // todo comment it - bool stableQuery; // super table query or not - bool topBotQuery; // TODO used bitwise flag - bool groupbyColumn; // denote if this is a groupby normal column query - bool hasTagResults; // if there are tag values in final result or not - bool timeWindowInterpo; // if the time window start/end required interpolation - bool queryBlockDist; // if query data block distribution - bool stabledev; // super table stddev query - bool tsCompQuery; // is tscomp query - bool diffQuery; // is diff query - bool simpleAgg; - bool pointInterpQuery; // point interpolation query - bool needReverseScan; // need reverse scan - bool distinct; // distinct query or not - bool stateWindow; // window State on sub/normal table - bool createFilterOperator; // if filter operator is needed - bool multigroupResult; // multigroup result can exist in one SSDataBlock - int32_t interBufSize; // intermediate buffer sizse - - int32_t havingNum; // having expr number - - SOrder order; - int16_t numOfCols; - int16_t numOfTags; - - STimeWindow window; - SInterval interval; - int16_t precision; - int16_t numOfOutput; - int16_t fillType; - - int32_t srcRowSize; // todo extract struct - int32_t resultRowSize; - int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query. - int32_t maxTableColumnWidth; - int32_t tagLen; // tag value length of current query - - SExprInfo* pExpr1; - - SColumnInfo* tableCols; - SColumnInfo* tagColList; - int32_t numOfFilterCols; - int64_t* fillVal; - - SSingleColumnFilterInfo* pFilterInfo; + SLimit limit; + SLimit slimit; + bool stableQuery; // super table query or not + bool topBotQuery; // TODO used bitwise flag + bool groupbyColumn; // denote if this is a groupby normal column query + bool timeWindowInterpo; // if the time window start/end required interpolation + bool tsCompQuery; // is tscomp query + bool diffQuery; // is diff query + bool pointInterpQuery; // point interpolation query + int32_t havingNum; // having expr number + int16_t numOfCols; + int16_t numOfTags; + STimeWindow window; + SInterval interval; + int16_t precision; + int16_t numOfOutput; + int16_t fillType; + int32_t resultRowSize; + int32_t tagLen; // tag value length of current query + + SExprInfo *pExpr1; + SColumnInfo* tagColList; + int32_t numOfFilterCols; + int64_t* fillVal; void* tsdb; STableGroupInfo tableGroupInfo; // table list SArray int32_t vgId; - SArray* pUdfInfo; // no need to free } STaskAttr; struct SOperatorInfo; @@ -252,7 +205,6 @@ typedef struct STaskIdInfo { typedef struct SExecTaskInfo { STaskIdInfo id; - char* content; uint32_t status; STimeWindow window; STaskCostInfo cost; @@ -262,7 +214,7 @@ typedef struct SExecTaskInfo { STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure char* sql; // query sql string jmp_buf env; // jump to this position when error happens. - EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] struct SOperatorInfo* pRoot; } SExecTaskInfo; @@ -297,7 +249,7 @@ typedef struct STaskRuntimeEnv { int64_t currentOffset; // dynamic offset value STableQueryInfo* current; - SResultInfo resultInfo; + SResultInfo resultInfo; SHashObj* pTableRetrieveTsMap; struct SUdfInfo* pUdfInfo; } STaskRuntimeEnv; @@ -339,25 +291,6 @@ typedef struct { SColumnInfo* colList; } SQueriedTableInfo; -typedef struct SQInfo { - void* signature; - uint64_t qId; - int32_t code; // error code to returned to client - int64_t owner; // if it is in execution - - STaskRuntimeEnv runtimeEnv; - STaskAttr query; - void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; - - TdThreadMutex lock; // used to synchronize the rsp/query threads - tsem_t ready; - int32_t dataReady; // denote if query result is ready or not - void* rspContext; // response context - int64_t startExecTs; // start to exec timestamp - char* sql; // query sql string - STaskCostInfo summary; -} SQInfo; - typedef enum { EX_SOURCE_DATA_NOT_READY = 0x1, EX_SOURCE_DATA_READY = 0x2, @@ -523,24 +456,6 @@ typedef struct SProjectOperatorInfo { int64_t curOutput; } SProjectOperatorInfo; -typedef struct SSLimitOperatorInfo { - int64_t groupTotal; - int64_t currentGroupOffset; - int64_t rowsTotal; - int64_t currentOffset; - SLimit limit; - SLimit slimit; - char** prevRow; - SArray* orderColumnList; - bool hasPrev; - bool ignoreCurrentGroup; - bool multigroupResult; - SSDataBlock* pRes; // result buffer - SSDataBlock* pPrevBlock; - int64_t capacity; - int64_t threshold; -} SSLimitOperatorInfo; - typedef struct SFillOperatorInfo { struct SFillInfo* pFillInfo; SSDataBlock* pRes; @@ -553,10 +468,10 @@ typedef struct SFillOperatorInfo { } SFillOperatorInfo; typedef struct { - char *pData; - bool isNull; - int16_t type; - int32_t bytes; + char *pData; + bool isNull; + int16_t type; + int32_t bytes; } SGroupKeys, SStateKeys; typedef struct SGroupbyOperatorInfo { @@ -582,19 +497,19 @@ typedef struct SDataGroupInfo { // The sort in partition may be needed later. typedef struct SPartitionOperatorInfo { - SOptrBasicInfo binfo; - SArray* pGroupCols; - SArray* pGroupColVals; // current group column values, SArray - char* keyBuf; // group by keys for hash - int32_t groupKeyLen; // total group by column width - SHashObj* pGroupSet; // quick locate the window object for each result - - SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file - int32_t rowCapacity; // maximum number of rows for each buffer page - int32_t* columnOffset; // start position for each column data - - void* pGroupIter; // group iterator - int32_t pageIndex; // page index of current group + SOptrBasicInfo binfo; + SArray* pGroupCols; + SArray* pGroupColVals; // current group column values, SArray + char* keyBuf; // group by keys for hash + int32_t groupKeyLen; // total group by column width + SHashObj* pGroupSet; // quick locate the window object for each result + + SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file + int32_t rowCapacity; // maximum number of rows for each buffer page + int32_t* columnOffset; // start position for each column data + + void* pGroupIter; // group iterator + int32_t pageIndex; // page index of current group } SPartitionOperatorInfo; typedef struct SWindowRowsSup { @@ -633,26 +548,21 @@ typedef struct SStateWindowOperatorInfo { } SStateWindowOperatorInfo; typedef struct SSortedMergeOperatorInfo { - SOptrBasicInfo binfo; - bool hasVarCol; + SOptrBasicInfo binfo; + bool hasVarCol; - SArray* pSortInfo; - int32_t numOfSources; - - SSortHandle *pSortHandle; - - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - - int32_t resultRowFactor; - bool hasGroupVal; - - SDiskbasedBuf *pTupleStore; // keep the final results - int32_t numOfResPerPage; - - char** groupVal; - SArray *groupInfo; - SAggSupporter aggSup; + SArray* pSortInfo; + int32_t numOfSources; + SSortHandle *pSortHandle; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + int32_t resultRowFactor; + bool hasGroupVal; + SDiskbasedBuf *pTupleStore; // keep the final results + int32_t numOfResPerPage; + char** groupVal; + SArray *groupInfo; + SAggSupporter aggSup; } SSortedMergeOperatorInfo; typedef struct SSortOperatorInfo { @@ -680,8 +590,7 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, int32_t* rowCellOffset); void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); -int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, - int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup); +int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup); void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput); int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, @@ -748,7 +657,6 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo); void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType); void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code); -void calculateOperatorProfResults(SQInfo* pQInfo); void queryCostStatis(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 40e86c7840c43dccc801fe9117bb7273ad6ce461..1377b2f72916b9bb60acbf52ba0a329b23500795 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -337,11 +337,11 @@ int32_t tsDescOrder(const void* p1, const void* p2) { void orderTheResultRows(STaskRuntimeEnv* pRuntimeEnv) { __compar_fn_t fn = NULL; - if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) { - fn = tsAscOrder; - } else { - fn = tsDescOrder; - } +// if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) { +// fn = tsAscOrder; +// } else { +// fn = tsDescOrder; +// } taosArraySort(pRuntimeEnv->pResultRowArrayList, fn); } @@ -377,7 +377,7 @@ static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupRe static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, int32_t* rowCellInfoOffset) { - bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr); + bool ascQuery = true; int32_t code = TSDB_CODE_SUCCESS; @@ -413,7 +413,8 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv goto _end; } - SCompSupporter cs = {pTableQueryInfoList, posList, pRuntimeEnv->pQueryAttr->order.order}; + int32_t order = TSDB_ORDER_ASC; + SCompSupporter cs = {pTableQueryInfoList, posList, order}; int32_t ret = tMergeTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); if (ret != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 15203d91cafc3611fd316c91db8f0f4a895cfb02..4af7e563e6eb85dceae04be58c069c5087060b49 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -52,7 +52,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; - SSDataBlock* p = createOneDataBlock(pDataBlock); + SSDataBlock* p = createOneDataBlock(pDataBlock, false); p->info = pDataBlock->info; taosArrayClear(p->pDataBlock); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index afcffa3d286d3bcefbcac96b24a63911db567eea..14acff320418e68602a20842333c4865f55bffb6 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -990,7 +990,8 @@ static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, static FORCE_INLINE TSKEY reviseWindowEkey(STaskAttr* pQueryAttr, STimeWindow* pWindow) { TSKEY ekey = -1; - if (QUERY_IS_ASC_QUERY(pQueryAttr)) { + int32_t order = TSDB_ORDER_ASC; + if (order == TSDB_ORDER_ASC) { ekey = pWindow->ekey; if (ekey > pQueryAttr->window.ekey) { ekey = pQueryAttr->window.ekey; @@ -1700,9 +1701,8 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { } } -int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, - int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, - SAggSupporter* pAggSup) { +int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, + int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup) { SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo; SqlFunctionCtx* pCtx = binfo->pCtx; @@ -1961,7 +1961,8 @@ static bool isCachedLastQuery(STaskAttr* pQueryAttr) { return false; } - if (pQueryAttr->order.order != TSDB_ORDER_DESC || !TSWINDOW_IS_EQUAL(pQueryAttr->window, TSWINDOW_DESC_INITIALIZER)) { + int32_t order = TSDB_ORDER_ASC; + if (order != TSDB_ORDER_DESC || !TSWINDOW_IS_EQUAL(pQueryAttr->window, TSWINDOW_DESC_INITIALIZER)) { return false; } @@ -2187,7 +2188,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey); TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey); - if (QUERY_IS_ASC_QUERY(pQueryAttr)) { + if (true) { // getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w); assert(w.ekey >= pBlockInfo->window.skey); @@ -2257,57 +2258,6 @@ static int32_t doTSJoinFilter(STaskRuntimeEnv* pRuntimeEnv, TSKEY key, bool ascQ return TS_JOIN_TS_EQUAL; } -bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p) { - bool all = true; - - for (int32_t i = 0; i < numOfRows; ++i) { - bool qualified = false; - - for (int32_t k = 0; k < numOfFilterCols; ++k) { - char* pElem = (char*)pFilterInfo[k].pData + pFilterInfo[k].info.bytes * i; - - qualified = false; - for (int32_t j = 0; j < pFilterInfo[k].numOfFilters; ++j) { - SColumnFilterElem* pFilterElem = NULL; - // SColumnFilterElem* pFilterElem = &pFilterInfo[k].pFilters[j]; - - bool isnull = isNull(pElem, pFilterInfo[k].info.type); - if (isnull) { - // if (pFilterElem->fp == isNullOperator) { - // qualified = true; - // break; - // } else { - // continue; - // } - } else { - // if (pFilterElem->fp == notNullOperator) { - // qualified = true; - // break; - // } else if (pFilterElem->fp == isNullOperator) { - // continue; - // } - } - - if (pFilterElem->fp(pFilterElem, pElem, pElem, pFilterInfo[k].info.type)) { - qualified = true; - break; - } - } - - if (!qualified) { - break; - } - } - - p[i] = qualified ? 1 : 0; - if (!qualified) { - all = false; - } - } - - return all; -} - void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) { int32_t len = 0; int32_t start = 0; @@ -2357,49 +2307,6 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) { } } -void filterRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, - SSDataBlock* pBlock, bool ascQuery) { - int32_t numOfRows = pBlock->info.rows; - - int8_t* p = taosMemoryCalloc(numOfRows, sizeof(int8_t)); - bool all = true; -#if 0 - if (pRuntimeEnv->pTsBuf != NULL) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); - - TSKEY* k = (TSKEY*) pColInfoData->pData; - for (int32_t i = 0; i < numOfRows; ++i) { - int32_t offset = ascQuery? i:(numOfRows - i - 1); - int32_t ret = doTSJoinFilter(pRuntimeEnv, k[offset], ascQuery); - if (ret == TS_JOIN_TAG_NOT_EQUALS) { - break; - } else if (ret == TS_JOIN_TS_NOT_EQUALS) { - all = false; - continue; - } else { - assert(ret == TS_JOIN_TS_EQUAL); - p[offset] = true; - } - - if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) { - break; - } - } - - // save the cursor status - pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); - } else { - all = doFilterDataBlock(pFilterInfo, numOfFilterCols, numOfRows, p); - } -#endif - - if (!all) { - doCompactSDataBlock(pBlock, numOfRows, p); - } - - taosMemoryFreeClear(p); -} - void filterColRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock, bool ascQuery) { int32_t numOfRows = pBlock->info.rows; @@ -3131,7 +3038,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols); filterFreeInfo(filter); - SSDataBlock* px = createOneDataBlock(pBlock); + SSDataBlock* px = createOneDataBlock(pBlock, false); blockDataEnsureCapacity(px, pBlock->info.rows); // todo extract method @@ -3509,22 +3416,22 @@ static void doOperatorExecProfOnce(SOperatorStackItem* item, SQueryProfEvent* ev } } -void calculateOperatorProfResults(SQInfo* pQInfo) { - if (pQInfo->summary.queryProfEvents == NULL) { - // qDebug("QInfo:0x%"PRIx64" query prof events array is null", pQInfo->qId); - return; - } - - if (pQInfo->summary.operatorProfResults == NULL) { - // qDebug("QInfo:0x%"PRIx64" operator prof results hash is null", pQInfo->qId); - return; - } +void calculateOperatorProfResults(void) { +// if (pQInfo->summary.queryProfEvents == NULL) { +// // qDebug("QInfo:0x%"PRIx64" query prof events array is null", pQInfo->qId); +// return; +// } +// +// if (pQInfo->summary.operatorProfResults == NULL) { +// // qDebug("QInfo:0x%"PRIx64" operator prof results hash is null", pQInfo->qId); +// return; +// } SArray* opStack = taosArrayInit(32, sizeof(SOperatorStackItem)); if (opStack == NULL) { return; } - +#if 0 size_t size = taosArrayGetSize(pQInfo->summary.queryProfEvents); SHashObj* profResults = pQInfo->summary.operatorProfResults; @@ -3547,7 +3454,7 @@ void calculateOperatorProfResults(SQInfo* pQInfo) { } } } - +#endif taosArrayDestroy(opStack); } @@ -4507,13 +4414,6 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { cleanupAggSup(&pInfo->aggSup); } -static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { - SSLimitOperatorInfo* pInfo = (SSLimitOperatorInfo*)param; - taosArrayDestroy(pInfo->orderColumnList); - pInfo->pRes = blockDataDestroy(pInfo->pRes); - taosMemoryFreeClear(pInfo->prevRow); -} - static void assignExprInfo(SExprInfo* dst, const SExprInfo* src) { assert(dst != NULL && src != NULL); @@ -4713,7 +4613,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { SSortedMergeOperatorInfo* pInfo = pOperator->info; SSortHandle* pHandle = pInfo->pSortHandle; - SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes); + SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false); blockDataEnsureCapacity(pDataBlock, pInfo->binfo.capacity); while (1) { @@ -5546,8 +5446,6 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup } STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - int32_t order = pQueryAttr->order.order; - SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -5563,14 +5461,13 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); - setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order); + setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex); } pOperator->status = OP_RES_TO_RETURN; - pQueryAttr->order.order = order; // TODO : restore the order doCloseAllTimeWindow(pRuntimeEnv); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index c8a6697d4e944c44923ba0a09f8f995a266eeeef..cd79d719dad44f51cfa407889c06837bdf56850b 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -227,7 +227,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); - int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); + int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } @@ -244,7 +244,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { if (num > 0) { len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t ret = - setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, + setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f7f1d470bbf1ccb7bd087929e6b1747dfc020ff9..59ef31b2ff494ae766463908a8a16f62e5cc442a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -258,16 +258,14 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, return pOperator; } -SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) { +SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); pInfo->dataReader = pTsdbReadHandle; pInfo->times = 1; pInfo->reverseTimes = 0; - pInfo->order = pRuntimeEnv->pQueryAttr->order.order; pInfo->current = 0; pInfo->prevGroupId = -1; - pRuntimeEnv->enableGroupData = true; SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableSeqScanOperator"; @@ -275,8 +273,6 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim pOperator->blockingOptr = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; - pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->getNextFn = doTableScanImpl; return pOperator; @@ -594,7 +590,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); filterFreeInfo(filter); - SSDataBlock* px = createOneDataBlock(pInfo->pRes); + SSDataBlock* px = createOneDataBlock(pInfo->pRes, false); blockDataEnsureCapacity(px, pInfo->pRes->info.rows); // TODO refactor @@ -683,71 +679,70 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { return NULL; } - int64_t startTs = taosGetTimestampUs(); + while (1) { + int64_t startTs = taosGetTimestampUs(); - _retry: - pInfo->req.type = pInfo->type; - strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb)); - if (pInfo->showRewrite) { - char dbName[TSDB_DB_NAME_LEN] = {0}; - getDBNameFromCondition(pInfo->pCondition, dbName); - sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName); - } + pInfo->req.type = pInfo->type; + strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb)); - int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); - char* buf1 = taosMemoryCalloc(1, contLen); - tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req); + if (pInfo->showRewrite) { + char dbName[TSDB_DB_NAME_LEN] = {0}; + getDBNameFromCondition(pInfo->pCondition, dbName); + sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName); + } - // send the fetch remote task result reques - SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); - if (NULL == pMsgSendInfo) { - qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } + int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); + char* buf1 = taosMemoryCalloc(1, contLen); + tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req); + + // send the fetch remote task result reques + SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (NULL == pMsgSendInfo) { + qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } - pMsgSendInfo->param = pOperator; - pMsgSendInfo->msgInfo.pData = buf1; - pMsgSendInfo->msgInfo.len = contLen; - pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE; - pMsgSendInfo->fp = loadSysTableContentCb; + pMsgSendInfo->param = pOperator; + pMsgSendInfo->msgInfo.pData = buf1; + pMsgSendInfo->msgInfo.len = contLen; + pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE; + pMsgSendInfo->fp = loadSysTableContentCb; - int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo); - tsem_wait(&pInfo->ready); + int64_t transporterId = 0; + int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo); + tsem_wait(&pInfo->ready); - if (pTaskInfo->code) { - return NULL; - } + if (pTaskInfo->code) { + qDebug("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo), + pInfo->loadInfo.totalRows, tstrerror(pTaskInfo->code)); + return NULL; + } - SRetrieveMetaTableRsp* pRsp = pInfo->pRsp; - pInfo->req.showId = pRsp->handle; + SRetrieveMetaTableRsp* pRsp = pInfo->pRsp; + pInfo->req.showId = pRsp->handle; - if (pRsp->numOfRows == 0 || pRsp->completed) { - pOperator->status = OP_EXEC_DONE; - } + if (pRsp->numOfRows == 0 || pRsp->completed) { + pOperator->status = OP_EXEC_DONE; + qDebug("%s load meta data from mnode completed, rowsOfSource:%d, totalRows:%" PRIu64 " ", GET_TASKID(pTaskInfo), + pRsp->numOfRows, pInfo->loadInfo.totalRows); - if (pRsp->numOfRows == 0) { - // qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" - // try next", - // GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, - // pDataInfo->totalRows, pExchangeInfo->totalRows); - return NULL; - } + if (pRsp->numOfRows == 0) { + return NULL; + } + } - SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp; - setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen, - pOperator->numOfOutput, startTs, NULL, pInfo->scanCols); + SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp; + setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, + pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL, pInfo->scanCols); - doFilterResult(pInfo); - if (pInfo->pRes->info.rows == 0) { - goto _retry; + // todo log the filter info + doFilterResult(pInfo); + if (pInfo->pRes->info.rows > 0) { + return pInfo->pRes; + } } - - return pInfo->pRes; } - - return NULL; } SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 885112d440aac01f5bdfad1018fd5bb2ca5feac7..2ac28200e79d3b365fc75d055f785e0a99963355 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -99,7 +99,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t pSortHandle->numOfPages = numOfPages; pSortHandle->pSortInfo = pSortInfo; pSortHandle->pIndexMap = pIndexMap; - pSortHandle->pDataBlock = createOneDataBlock(pBlock); + pSortHandle->pDataBlock = createOneDataBlock(pBlock, false); pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); pSortHandle->cmpParam.orderInfo = pSortInfo; @@ -206,7 +206,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { blockDataCleanup(pDataBlock); - SSDataBlock* pBlock = createOneDataBlock(pDataBlock); + SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false); return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId); } @@ -488,7 +488,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { tMergeTreeDestroy(pHandle->pMergeTree); pHandle->numOfCompletedSources = 0; - SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock); + SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false); code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId); if (code != 0) { return code; @@ -531,7 +531,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { } if (pHandle->pDataBlock == NULL) { - pHandle->pDataBlock = createOneDataBlock(pBlock); + pHandle->pDataBlock = createOneDataBlock(pBlock, false); } int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 11f8e880a669170130155d4a7683dd71cf5421d9..c5258afcc18bc1e23333cee63a8fc88c18b6ea28 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1100,6 +1100,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SSubmitRsp *rsp = (SSubmitRsp *)msg; SCH_ERR_JRET(rsp->code); } + SCH_ERR_JRET(rspCode); SSubmitRsp *rsp = (SSubmitRsp *)msg; @@ -1298,7 +1299,6 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); _return: - if (pJob) { schReleaseJob(pParam->refId); } diff --git a/tools/taosTools-1.4.1-Linux-x64.tar.gz b/tools/taosTools-1.4.1-Linux-x64.tar.gz deleted file mode 100644 index caefa3d95768cc0551bdf2f9ba9b5631942b9e5a..0000000000000000000000000000000000000000 Binary files a/tools/taosTools-1.4.1-Linux-x64.tar.gz and /dev/null differ