未验证 提交 17c60b6c 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #11483 from taosdata/feature/3.0_liaohj

ehn(query): remove some redundant codes in executor.
...@@ -203,11 +203,10 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows); ...@@ -203,11 +203,10 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
void blockDataCleanup(SSDataBlock* pDataBlock); void blockDataCleanup(SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
void* blockDataDestroy(SSDataBlock* pBlock);
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n); int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
void blockDebugShowData(const SArray* dataBlocks); void blockDebugShowData(const SArray* dataBlocks);
......
...@@ -596,12 +596,15 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo) { ...@@ -596,12 +596,15 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
pResultInfo->row[i] = varDataVal(pStart); pResultInfo->row[i] = varDataVal(pStart);
} else { } else {
pResultInfo->row[i] = NULL; pResultInfo->row[i] = NULL;
pResultInfo->length[i] = 0;
} }
} else { } else {
if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) { if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current; pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
pResultInfo->length[i] = bytes;
} else { } else {
pResultInfo->row[i] = NULL; pResultInfo->row[i] = NULL;
pResultInfo->length[i] = 0;
} }
} }
} }
......
...@@ -410,7 +410,11 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { ...@@ -410,7 +410,11 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
} }
SResultColumn *pCol = &pResultInfo->pCol[col]; SResultColumn *pCol = &pResultInfo->pCol[col];
return colDataIsNull_f(pCol->nullbitmap, row); if (IS_VAR_DATA_TYPE(pResultInfo->fields[col].type)) {
return (pCol->offset[row] == -1);
} else {
return colDataIsNull_f(pCol->nullbitmap, row);
}
} }
bool taos_is_update_query(TAOS_RES *res) { return taos_num_fields(res) == 0; } bool taos_is_update_query(TAOS_RES *res) { return taos_num_fields(res) == 0; }
......
...@@ -459,11 +459,10 @@ TEST(testCase, create_multiple_tables) { ...@@ -459,11 +459,10 @@ TEST(testCase, create_multiple_tables) {
taos_free_result(pRes); taos_free_result(pRes);
for (int32_t i = 0; i < 25000; ++i) { for (int32_t i = 0; i < 500; i += 2) {
char sql[512] = {0}; char sql[512] = {0};
snprintf(sql, tListLen(sql), snprintf(sql, tListLen(sql),
"create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5) t_x_%d using st1 tags(911)", i, "create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5)", i, i + 1);
(i + 1) * 30, (i + 2) * 40);
TAOS_RES* pres = taos_query(pConn, sql); TAOS_RES* pres = taos_query(pConn, sql);
if (taos_errno(pres) != 0) { if (taos_errno(pres) != 0) {
printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres));
...@@ -653,6 +652,7 @@ TEST(testCase, projection_query_stables) { ...@@ -653,6 +652,7 @@ TEST(testCase, projection_query_stables) {
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
#endif #endif
TEST(testCase, agg_query_tables) { TEST(testCase, agg_query_tables) {
...@@ -662,7 +662,7 @@ TEST(testCase, agg_query_tables) { ...@@ -662,7 +662,7 @@ TEST(testCase, agg_query_tables) {
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "select length('abc') from tu"); pRes = taos_query(pConn, "select * from test_block_raw.all_type");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
...@@ -673,6 +673,10 @@ TEST(testCase, agg_query_tables) { ...@@ -673,6 +673,10 @@ TEST(testCase, agg_query_tables) {
TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
int32_t n = 0;
void* data = NULL;
int32_t code = taos_fetch_raw_block(pRes, &n, &data);
char str[512] = {0}; char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) { while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t* length = taos_fetch_lengths(pRes); int32_t* length = taos_fetch_lengths(pRes);
......
...@@ -1149,10 +1149,11 @@ void* blockDataDestroy(SSDataBlock* pBlock) { ...@@ -1149,10 +1149,11 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
return NULL; return NULL;
} }
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
if(pDataBlock == NULL){ if(pDataBlock == NULL){
return NULL; return NULL;
} }
int32_t numOfCols = pDataBlock->info.numOfCols; int32_t numOfCols = pDataBlock->info.numOfCols;
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
...@@ -1160,6 +1161,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { ...@@ -1160,6 +1161,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) {
pBlock->info.numOfCols = numOfCols; pBlock->info.numOfCols = numOfCols;
pBlock->info.hasVarCol = pDataBlock->info.hasVarCol; pBlock->info.hasVarCol = pDataBlock->info.hasVarCol;
pBlock->info.rowSize = pDataBlock->info.rows;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {0}; SColumnInfoData colInfo = {0};
...@@ -1168,6 +1170,23 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { ...@@ -1168,6 +1170,23 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) {
taosArrayPush(pBlock->pDataBlock, &colInfo); taosArrayPush(pBlock->pDataBlock, &colInfo);
} }
if (copyData) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
int32_t code = colInfoDataEnsureCapacity(pDst, pDataBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
colDataAssign(pDst, pSrc, pDataBlock->info.rows);
}
pBlock->info.rows = pDataBlock->info.rows;
pBlock->info.capacity = pDataBlock->info.rows;
}
return pBlock; return pBlock;
} }
......
...@@ -48,8 +48,6 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int ...@@ -48,8 +48,6 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) #define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
#define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL ? 0 : ((_r)->outputBuf)->info.rows)
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0) #define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
enum { enum {
...@@ -84,21 +82,6 @@ typedef struct SResultInfo { // TODO refactor ...@@ -84,21 +82,6 @@ typedef struct SResultInfo { // TODO refactor
int32_t threshold; // result size threshold in rows. int32_t threshold; // result size threshold in rows.
} SResultInfo; } SResultInfo;
typedef struct SColumnFilterElem {
int16_t bytes; // column length
__filter_func_t fp;
SColumnFilterInfo filterInfo;
void* q;
} SColumnFilterElem;
typedef struct SSingleColumnFilterInfo {
void* pData;
void* pData2; // used for nchar column
int32_t numOfFilters;
SColumnInfo info;
SColumnFilterElem* pFilters;
} SSingleColumnFilterInfo;
typedef struct STableQueryInfo { typedef struct STableQueryInfo {
TSKEY lastKey; // last check ts TSKEY lastKey; // last check ts
uint64_t uid; // table uid uint64_t uid; // table uid
...@@ -169,66 +152,36 @@ typedef struct SOperatorCostInfo { ...@@ -169,66 +152,36 @@ typedef struct SOperatorCostInfo {
uint64_t totalCost; uint64_t totalCost;
} SOperatorCostInfo; } SOperatorCostInfo;
typedef struct SOrder {
uint32_t order;
SColumn col;
} SOrder;
// The basic query information extracted from the SQueryInfo tree to support the // The basic query information extracted from the SQueryInfo tree to support the
// execution of query in a data node. // execution of query in a data node.
typedef struct STaskAttr { typedef struct STaskAttr {
SLimit limit; SLimit limit;
SLimit slimit; SLimit slimit;
bool stableQuery; // super table query or not
// todo comment it bool topBotQuery; // TODO used bitwise flag
bool stableQuery; // super table query or not bool groupbyColumn; // denote if this is a groupby normal column query
bool topBotQuery; // TODO used bitwise flag bool timeWindowInterpo; // if the time window start/end required interpolation
bool groupbyColumn; // denote if this is a groupby normal column query bool tsCompQuery; // is tscomp query
bool hasTagResults; // if there are tag values in final result or not bool diffQuery; // is diff query
bool timeWindowInterpo; // if the time window start/end required interpolation bool pointInterpQuery; // point interpolation query
bool queryBlockDist; // if query data block distribution int32_t havingNum; // having expr number
bool stabledev; // super table stddev query int16_t numOfCols;
bool tsCompQuery; // is tscomp query int16_t numOfTags;
bool diffQuery; // is diff query STimeWindow window;
bool simpleAgg; SInterval interval;
bool pointInterpQuery; // point interpolation query int16_t precision;
bool needReverseScan; // need reverse scan int16_t numOfOutput;
bool distinct; // distinct query or not int16_t fillType;
bool stateWindow; // window State on sub/normal table int32_t resultRowSize;
bool createFilterOperator; // if filter operator is needed int32_t tagLen; // tag value length of current query
bool multigroupResult; // multigroup result can exist in one SSDataBlock
int32_t interBufSize; // intermediate buffer sizse SExprInfo *pExpr1;
SColumnInfo* tagColList;
int32_t havingNum; // having expr number int32_t numOfFilterCols;
int64_t* fillVal;
SOrder order;
int16_t numOfCols;
int16_t numOfTags;
STimeWindow window;
SInterval interval;
int16_t precision;
int16_t numOfOutput;
int16_t fillType;
int32_t srcRowSize; // todo extract struct
int32_t resultRowSize;
int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query.
int32_t maxTableColumnWidth;
int32_t tagLen; // tag value length of current query
SExprInfo* pExpr1;
SColumnInfo* tableCols;
SColumnInfo* tagColList;
int32_t numOfFilterCols;
int64_t* fillVal;
SSingleColumnFilterInfo* pFilterInfo;
void* tsdb; void* tsdb;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo> STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
int32_t vgId; int32_t vgId;
SArray* pUdfInfo; // no need to free
} STaskAttr; } STaskAttr;
struct SOperatorInfo; struct SOperatorInfo;
...@@ -252,7 +205,6 @@ typedef struct STaskIdInfo { ...@@ -252,7 +205,6 @@ typedef struct STaskIdInfo {
typedef struct SExecTaskInfo { typedef struct SExecTaskInfo {
STaskIdInfo id; STaskIdInfo id;
char* content;
uint32_t status; uint32_t status;
STimeWindow window; STimeWindow window;
STaskCostInfo cost; STaskCostInfo cost;
...@@ -262,7 +214,7 @@ typedef struct SExecTaskInfo { ...@@ -262,7 +214,7 @@ typedef struct SExecTaskInfo {
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
char* sql; // query sql string char* sql; // query sql string
jmp_buf env; // jump to this position when error happens. jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
struct SOperatorInfo* pRoot; struct SOperatorInfo* pRoot;
} SExecTaskInfo; } SExecTaskInfo;
...@@ -297,7 +249,7 @@ typedef struct STaskRuntimeEnv { ...@@ -297,7 +249,7 @@ typedef struct STaskRuntimeEnv {
int64_t currentOffset; // dynamic offset value int64_t currentOffset; // dynamic offset value
STableQueryInfo* current; STableQueryInfo* current;
SResultInfo resultInfo; SResultInfo resultInfo;
SHashObj* pTableRetrieveTsMap; SHashObj* pTableRetrieveTsMap;
struct SUdfInfo* pUdfInfo; struct SUdfInfo* pUdfInfo;
} STaskRuntimeEnv; } STaskRuntimeEnv;
...@@ -339,25 +291,6 @@ typedef struct { ...@@ -339,25 +291,6 @@ typedef struct {
SColumnInfo* colList; SColumnInfo* colList;
} SQueriedTableInfo; } SQueriedTableInfo;
typedef struct SQInfo {
void* signature;
uint64_t qId;
int32_t code; // error code to returned to client
int64_t owner; // if it is in execution
STaskRuntimeEnv runtimeEnv;
STaskAttr query;
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
TdThreadMutex lock; // used to synchronize the rsp/query threads
tsem_t ready;
int32_t dataReady; // denote if query result is ready or not
void* rspContext; // response context
int64_t startExecTs; // start to exec timestamp
char* sql; // query sql string
STaskCostInfo summary;
} SQInfo;
typedef enum { typedef enum {
EX_SOURCE_DATA_NOT_READY = 0x1, EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2, EX_SOURCE_DATA_READY = 0x2,
...@@ -523,24 +456,6 @@ typedef struct SProjectOperatorInfo { ...@@ -523,24 +456,6 @@ typedef struct SProjectOperatorInfo {
int64_t curOutput; int64_t curOutput;
} SProjectOperatorInfo; } SProjectOperatorInfo;
typedef struct SSLimitOperatorInfo {
int64_t groupTotal;
int64_t currentGroupOffset;
int64_t rowsTotal;
int64_t currentOffset;
SLimit limit;
SLimit slimit;
char** prevRow;
SArray* orderColumnList;
bool hasPrev;
bool ignoreCurrentGroup;
bool multigroupResult;
SSDataBlock* pRes; // result buffer
SSDataBlock* pPrevBlock;
int64_t capacity;
int64_t threshold;
} SSLimitOperatorInfo;
typedef struct SFillOperatorInfo { typedef struct SFillOperatorInfo {
struct SFillInfo* pFillInfo; struct SFillInfo* pFillInfo;
SSDataBlock* pRes; SSDataBlock* pRes;
...@@ -553,10 +468,10 @@ typedef struct SFillOperatorInfo { ...@@ -553,10 +468,10 @@ typedef struct SFillOperatorInfo {
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct { typedef struct {
char *pData; char *pData;
bool isNull; bool isNull;
int16_t type; int16_t type;
int32_t bytes; int32_t bytes;
} SGroupKeys, SStateKeys; } SGroupKeys, SStateKeys;
typedef struct SGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
...@@ -582,19 +497,19 @@ typedef struct SDataGroupInfo { ...@@ -582,19 +497,19 @@ typedef struct SDataGroupInfo {
// The sort in partition may be needed later. // The sort in partition may be needed later.
typedef struct SPartitionOperatorInfo { typedef struct SPartitionOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SArray* pGroupCols; SArray* pGroupCols;
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys> SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
char* keyBuf; // group by keys for hash char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width int32_t groupKeyLen; // total group by column width
SHashObj* pGroupSet; // quick locate the window object for each result SHashObj* pGroupSet; // quick locate the window object for each result
SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file
int32_t rowCapacity; // maximum number of rows for each buffer page int32_t rowCapacity; // maximum number of rows for each buffer page
int32_t* columnOffset; // start position for each column data int32_t* columnOffset; // start position for each column data
void* pGroupIter; // group iterator void* pGroupIter; // group iterator
int32_t pageIndex; // page index of current group int32_t pageIndex; // page index of current group
} SPartitionOperatorInfo; } SPartitionOperatorInfo;
typedef struct SWindowRowsSup { typedef struct SWindowRowsSup {
...@@ -633,26 +548,21 @@ typedef struct SStateWindowOperatorInfo { ...@@ -633,26 +548,21 @@ typedef struct SStateWindowOperatorInfo {
} SStateWindowOperatorInfo; } SStateWindowOperatorInfo;
typedef struct SSortedMergeOperatorInfo { typedef struct SSortedMergeOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
bool hasVarCol; bool hasVarCol;
SArray* pSortInfo; SArray* pSortInfo;
int32_t numOfSources; int32_t numOfSources;
SSortHandle *pSortHandle;
SSortHandle *pSortHandle; int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
int32_t bufPageSize; int32_t resultRowFactor;
uint32_t sortBufSize; // max buffer size for in-memory sort bool hasGroupVal;
SDiskbasedBuf *pTupleStore; // keep the final results
int32_t resultRowFactor; int32_t numOfResPerPage;
bool hasGroupVal; char** groupVal;
SArray *groupInfo;
SDiskbasedBuf *pTupleStore; // keep the final results SAggSupporter aggSup;
int32_t numOfResPerPage;
char** groupVal;
SArray *groupInfo;
SAggSupporter aggSup;
} SSortedMergeOperatorInfo; } SSortedMergeOperatorInfo;
typedef struct SSortOperatorInfo { typedef struct SSortOperatorInfo {
...@@ -680,8 +590,7 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf ...@@ -680,8 +590,7 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf
void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, int32_t* rowCellOffset); void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, int32_t* rowCellOffset);
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput); void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput);
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows,
char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs,
...@@ -748,7 +657,6 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo); ...@@ -748,7 +657,6 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo);
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType); void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType);
void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code); void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code);
void calculateOperatorProfResults(SQInfo* pQInfo);
void queryCostStatis(SExecTaskInfo* pTaskInfo); void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
......
...@@ -337,11 +337,11 @@ int32_t tsDescOrder(const void* p1, const void* p2) { ...@@ -337,11 +337,11 @@ int32_t tsDescOrder(const void* p1, const void* p2) {
void orderTheResultRows(STaskRuntimeEnv* pRuntimeEnv) { void orderTheResultRows(STaskRuntimeEnv* pRuntimeEnv) {
__compar_fn_t fn = NULL; __compar_fn_t fn = NULL;
if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) { // if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) {
fn = tsAscOrder; // fn = tsAscOrder;
} else { // } else {
fn = tsDescOrder; // fn = tsDescOrder;
} // }
taosArraySort(pRuntimeEnv->pResultRowArrayList, fn); taosArraySort(pRuntimeEnv->pResultRowArrayList, fn);
} }
...@@ -377,7 +377,7 @@ static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupRe ...@@ -377,7 +377,7 @@ static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupRe
static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList,
int32_t* rowCellInfoOffset) { int32_t* rowCellInfoOffset) {
bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr); bool ascQuery = true;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -413,7 +413,8 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv ...@@ -413,7 +413,8 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
goto _end; goto _end;
} }
SCompSupporter cs = {pTableQueryInfoList, posList, pRuntimeEnv->pQueryAttr->order.order}; int32_t order = TSDB_ORDER_ASC;
SCompSupporter cs = {pTableQueryInfoList, posList, order};
int32_t ret = tMergeTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); int32_t ret = tMergeTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
......
...@@ -52,7 +52,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -52,7 +52,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
SSDataBlock* p = createOneDataBlock(pDataBlock); SSDataBlock* p = createOneDataBlock(pDataBlock, false);
p->info = pDataBlock->info; p->info = pDataBlock->info;
taosArrayClear(p->pDataBlock); taosArrayClear(p->pDataBlock);
......
...@@ -990,7 +990,8 @@ static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, ...@@ -990,7 +990,8 @@ static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext,
static FORCE_INLINE TSKEY reviseWindowEkey(STaskAttr* pQueryAttr, STimeWindow* pWindow) { static FORCE_INLINE TSKEY reviseWindowEkey(STaskAttr* pQueryAttr, STimeWindow* pWindow) {
TSKEY ekey = -1; TSKEY ekey = -1;
if (QUERY_IS_ASC_QUERY(pQueryAttr)) { int32_t order = TSDB_ORDER_ASC;
if (order == TSDB_ORDER_ASC) {
ekey = pWindow->ekey; ekey = pWindow->ekey;
if (ekey > pQueryAttr->window.ekey) { if (ekey > pQueryAttr->window.ekey) {
ekey = pQueryAttr->window.ekey; ekey = pQueryAttr->window.ekey;
...@@ -1700,9 +1701,8 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { ...@@ -1700,9 +1701,8 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
} }
} }
int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup) {
SAggSupporter* pAggSup) {
SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo; SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
SqlFunctionCtx* pCtx = binfo->pCtx; SqlFunctionCtx* pCtx = binfo->pCtx;
...@@ -1961,7 +1961,8 @@ static bool isCachedLastQuery(STaskAttr* pQueryAttr) { ...@@ -1961,7 +1961,8 @@ static bool isCachedLastQuery(STaskAttr* pQueryAttr) {
return false; return false;
} }
if (pQueryAttr->order.order != TSDB_ORDER_DESC || !TSWINDOW_IS_EQUAL(pQueryAttr->window, TSWINDOW_DESC_INITIALIZER)) { int32_t order = TSDB_ORDER_ASC;
if (order != TSDB_ORDER_DESC || !TSWINDOW_IS_EQUAL(pQueryAttr->window, TSWINDOW_DESC_INITIALIZER)) {
return false; return false;
} }
...@@ -2187,7 +2188,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI ...@@ -2187,7 +2188,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI
TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey); TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey);
TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey); TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
if (QUERY_IS_ASC_QUERY(pQueryAttr)) { if (true) {
// getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w); // getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w);
assert(w.ekey >= pBlockInfo->window.skey); assert(w.ekey >= pBlockInfo->window.skey);
...@@ -2257,57 +2258,6 @@ static int32_t doTSJoinFilter(STaskRuntimeEnv* pRuntimeEnv, TSKEY key, bool ascQ ...@@ -2257,57 +2258,6 @@ static int32_t doTSJoinFilter(STaskRuntimeEnv* pRuntimeEnv, TSKEY key, bool ascQ
return TS_JOIN_TS_EQUAL; return TS_JOIN_TS_EQUAL;
} }
bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p) {
bool all = true;
for (int32_t i = 0; i < numOfRows; ++i) {
bool qualified = false;
for (int32_t k = 0; k < numOfFilterCols; ++k) {
char* pElem = (char*)pFilterInfo[k].pData + pFilterInfo[k].info.bytes * i;
qualified = false;
for (int32_t j = 0; j < pFilterInfo[k].numOfFilters; ++j) {
SColumnFilterElem* pFilterElem = NULL;
// SColumnFilterElem* pFilterElem = &pFilterInfo[k].pFilters[j];
bool isnull = isNull(pElem, pFilterInfo[k].info.type);
if (isnull) {
// if (pFilterElem->fp == isNullOperator) {
// qualified = true;
// break;
// } else {
// continue;
// }
} else {
// if (pFilterElem->fp == notNullOperator) {
// qualified = true;
// break;
// } else if (pFilterElem->fp == isNullOperator) {
// continue;
// }
}
if (pFilterElem->fp(pFilterElem, pElem, pElem, pFilterInfo[k].info.type)) {
qualified = true;
break;
}
}
if (!qualified) {
break;
}
}
p[i] = qualified ? 1 : 0;
if (!qualified) {
all = false;
}
}
return all;
}
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) { void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) {
int32_t len = 0; int32_t len = 0;
int32_t start = 0; int32_t start = 0;
...@@ -2357,49 +2307,6 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) { ...@@ -2357,49 +2307,6 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) {
} }
} }
void filterRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols,
SSDataBlock* pBlock, bool ascQuery) {
int32_t numOfRows = pBlock->info.rows;
int8_t* p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
bool all = true;
#if 0
if (pRuntimeEnv->pTsBuf != NULL) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
TSKEY* k = (TSKEY*) pColInfoData->pData;
for (int32_t i = 0; i < numOfRows; ++i) {
int32_t offset = ascQuery? i:(numOfRows - i - 1);
int32_t ret = doTSJoinFilter(pRuntimeEnv, k[offset], ascQuery);
if (ret == TS_JOIN_TAG_NOT_EQUALS) {
break;
} else if (ret == TS_JOIN_TS_NOT_EQUALS) {
all = false;
continue;
} else {
assert(ret == TS_JOIN_TS_EQUAL);
p[offset] = true;
}
if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) {
break;
}
}
// save the cursor status
pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
} else {
all = doFilterDataBlock(pFilterInfo, numOfFilterCols, numOfRows, p);
}
#endif
if (!all) {
doCompactSDataBlock(pBlock, numOfRows, p);
}
taosMemoryFreeClear(p);
}
void filterColRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock, bool ascQuery) { void filterColRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock, bool ascQuery) {
int32_t numOfRows = pBlock->info.rows; int32_t numOfRows = pBlock->info.rows;
...@@ -3131,7 +3038,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { ...@@ -3131,7 +3038,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols); bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
filterFreeInfo(filter); filterFreeInfo(filter);
SSDataBlock* px = createOneDataBlock(pBlock); SSDataBlock* px = createOneDataBlock(pBlock, false);
blockDataEnsureCapacity(px, pBlock->info.rows); blockDataEnsureCapacity(px, pBlock->info.rows);
// todo extract method // todo extract method
...@@ -3509,22 +3416,22 @@ static void doOperatorExecProfOnce(SOperatorStackItem* item, SQueryProfEvent* ev ...@@ -3509,22 +3416,22 @@ static void doOperatorExecProfOnce(SOperatorStackItem* item, SQueryProfEvent* ev
} }
} }
void calculateOperatorProfResults(SQInfo* pQInfo) { void calculateOperatorProfResults(void) {
if (pQInfo->summary.queryProfEvents == NULL) { // if (pQInfo->summary.queryProfEvents == NULL) {
// qDebug("QInfo:0x%"PRIx64" query prof events array is null", pQInfo->qId); // // qDebug("QInfo:0x%"PRIx64" query prof events array is null", pQInfo->qId);
return; // return;
} // }
//
if (pQInfo->summary.operatorProfResults == NULL) { // if (pQInfo->summary.operatorProfResults == NULL) {
// qDebug("QInfo:0x%"PRIx64" operator prof results hash is null", pQInfo->qId); // // qDebug("QInfo:0x%"PRIx64" operator prof results hash is null", pQInfo->qId);
return; // return;
} // }
SArray* opStack = taosArrayInit(32, sizeof(SOperatorStackItem)); SArray* opStack = taosArrayInit(32, sizeof(SOperatorStackItem));
if (opStack == NULL) { if (opStack == NULL) {
return; return;
} }
#if 0
size_t size = taosArrayGetSize(pQInfo->summary.queryProfEvents); size_t size = taosArrayGetSize(pQInfo->summary.queryProfEvents);
SHashObj* profResults = pQInfo->summary.operatorProfResults; SHashObj* profResults = pQInfo->summary.operatorProfResults;
...@@ -3547,7 +3454,7 @@ void calculateOperatorProfResults(SQInfo* pQInfo) { ...@@ -3547,7 +3454,7 @@ void calculateOperatorProfResults(SQInfo* pQInfo) {
} }
} }
} }
#endif
taosArrayDestroy(opStack); taosArrayDestroy(opStack);
} }
...@@ -4507,13 +4414,6 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -4507,13 +4414,6 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
} }
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
SSLimitOperatorInfo* pInfo = (SSLimitOperatorInfo*)param;
taosArrayDestroy(pInfo->orderColumnList);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
taosMemoryFreeClear(pInfo->prevRow);
}
static void assignExprInfo(SExprInfo* dst, const SExprInfo* src) { static void assignExprInfo(SExprInfo* dst, const SExprInfo* src) {
assert(dst != NULL && src != NULL); assert(dst != NULL && src != NULL);
...@@ -4713,7 +4613,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { ...@@ -4713,7 +4613,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
SSortedMergeOperatorInfo* pInfo = pOperator->info; SSortedMergeOperatorInfo* pInfo = pOperator->info;
SSortHandle* pHandle = pInfo->pSortHandle; SSortHandle* pHandle = pInfo->pSortHandle;
SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes); SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false);
blockDataEnsureCapacity(pDataBlock, pInfo->binfo.capacity); blockDataEnsureCapacity(pDataBlock, pInfo->binfo.capacity);
while (1) { while (1) {
...@@ -5546,8 +5446,6 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup ...@@ -5546,8 +5446,6 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
} }
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) { while (1) {
...@@ -5563,14 +5461,13 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup ...@@ -5563,14 +5461,13 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order); setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC);
setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex); hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex);
} }
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
pQueryAttr->order.order = order; // TODO : restore the order
doCloseAllTimeWindow(pRuntimeEnv); doCloseAllTimeWindow(pRuntimeEnv);
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
......
...@@ -227,7 +227,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -227,7 +227,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
} }
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -244,7 +244,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -244,7 +244,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
if (num > 0) { if (num > 0) {
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret = int32_t ret =
setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
......
...@@ -258,16 +258,14 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, ...@@ -258,16 +258,14 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
return pOperator; return pOperator;
} }
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) { SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) {
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
pInfo->dataReader = pTsdbReadHandle; pInfo->dataReader = pTsdbReadHandle;
pInfo->times = 1; pInfo->times = 1;
pInfo->reverseTimes = 0; pInfo->reverseTimes = 0;
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
pInfo->current = 0; pInfo->current = 0;
pInfo->prevGroupId = -1; pInfo->prevGroupId = -1;
pRuntimeEnv->enableGroupData = true;
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableSeqScanOperator"; pOperator->name = "TableSeqScanOperator";
...@@ -275,8 +273,6 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim ...@@ -275,8 +273,6 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->getNextFn = doTableScanImpl; pOperator->getNextFn = doTableScanImpl;
return pOperator; return pOperator;
...@@ -594,7 +590,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { ...@@ -594,7 +590,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
filterFreeInfo(filter); filterFreeInfo(filter);
SSDataBlock* px = createOneDataBlock(pInfo->pRes); SSDataBlock* px = createOneDataBlock(pInfo->pRes, false);
blockDataEnsureCapacity(px, pInfo->pRes->info.rows); blockDataEnsureCapacity(px, pInfo->pRes->info.rows);
// TODO refactor // TODO refactor
...@@ -683,71 +679,70 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -683,71 +679,70 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
return NULL; return NULL;
} }
int64_t startTs = taosGetTimestampUs(); while (1) {
int64_t startTs = taosGetTimestampUs();
_retry: pInfo->req.type = pInfo->type;
pInfo->req.type = pInfo->type; strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
if (pInfo->showRewrite) {
char dbName[TSDB_DB_NAME_LEN] = {0};
getDBNameFromCondition(pInfo->pCondition, dbName);
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
}
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); if (pInfo->showRewrite) {
char* buf1 = taosMemoryCalloc(1, contLen); char dbName[TSDB_DB_NAME_LEN] = {0};
tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req); getDBNameFromCondition(pInfo->pCondition, dbName);
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
}
// send the fetch remote task result reques int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); char* buf1 = taosMemoryCalloc(1, contLen);
if (NULL == pMsgSendInfo) { tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; // send the fetch remote task result reques
return NULL; SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
} if (NULL == pMsgSendInfo) {
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pMsgSendInfo->param = pOperator; pMsgSendInfo->param = pOperator;
pMsgSendInfo->msgInfo.pData = buf1; pMsgSendInfo->msgInfo.pData = buf1;
pMsgSendInfo->msgInfo.len = contLen; pMsgSendInfo->msgInfo.len = contLen;
pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE; pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE;
pMsgSendInfo->fp = loadSysTableContentCb; pMsgSendInfo->fp = loadSysTableContentCb;
int64_t transporterId = 0; int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo); int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo);
tsem_wait(&pInfo->ready); tsem_wait(&pInfo->ready);
if (pTaskInfo->code) { if (pTaskInfo->code) {
return NULL; qDebug("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo),
} pInfo->loadInfo.totalRows, tstrerror(pTaskInfo->code));
return NULL;
}
SRetrieveMetaTableRsp* pRsp = pInfo->pRsp; SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
pInfo->req.showId = pRsp->handle; pInfo->req.showId = pRsp->handle;
if (pRsp->numOfRows == 0 || pRsp->completed) { if (pRsp->numOfRows == 0 || pRsp->completed) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} qDebug("%s load meta data from mnode completed, rowsOfSource:%d, totalRows:%" PRIu64 " ", GET_TASKID(pTaskInfo),
pRsp->numOfRows, pInfo->loadInfo.totalRows);
if (pRsp->numOfRows == 0) { if (pRsp->numOfRows == 0) {
// qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" return NULL;
// try next", }
// GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, }
// pDataInfo->totalRows, pExchangeInfo->totalRows);
return NULL;
}
SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp; SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen, setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data,
pOperator->numOfOutput, startTs, NULL, pInfo->scanCols); pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL, pInfo->scanCols);
doFilterResult(pInfo); // todo log the filter info
if (pInfo->pRes->info.rows == 0) { doFilterResult(pInfo);
goto _retry; if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes;
}
} }
return pInfo->pRes;
} }
return NULL;
} }
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
......
...@@ -99,7 +99,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t ...@@ -99,7 +99,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t
pSortHandle->numOfPages = numOfPages; pSortHandle->numOfPages = numOfPages;
pSortHandle->pSortInfo = pSortInfo; pSortHandle->pSortInfo = pSortInfo;
pSortHandle->pIndexMap = pIndexMap; pSortHandle->pIndexMap = pIndexMap;
pSortHandle->pDataBlock = createOneDataBlock(pBlock); pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
pSortHandle->cmpParam.orderInfo = pSortInfo; pSortHandle->cmpParam.orderInfo = pSortInfo;
...@@ -206,7 +206,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { ...@@ -206,7 +206,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
blockDataCleanup(pDataBlock); blockDataCleanup(pDataBlock);
SSDataBlock* pBlock = createOneDataBlock(pDataBlock); SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false);
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId); return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
} }
...@@ -488,7 +488,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { ...@@ -488,7 +488,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
tMergeTreeDestroy(pHandle->pMergeTree); tMergeTreeDestroy(pHandle->pMergeTree);
pHandle->numOfCompletedSources = 0; pHandle->numOfCompletedSources = 0;
SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock); SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false);
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId); code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId);
if (code != 0) { if (code != 0) {
return code; return code;
...@@ -531,7 +531,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { ...@@ -531,7 +531,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
} }
if (pHandle->pDataBlock == NULL) { if (pHandle->pDataBlock == NULL) {
pHandle->pDataBlock = createOneDataBlock(pBlock); pHandle->pDataBlock = createOneDataBlock(pBlock, false);
} }
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap); int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap);
......
...@@ -1100,6 +1100,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -1100,6 +1100,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SSubmitRsp *rsp = (SSubmitRsp *)msg; SSubmitRsp *rsp = (SSubmitRsp *)msg;
SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(rsp->code);
} }
SCH_ERR_JRET(rspCode); SCH_ERR_JRET(rspCode);
SSubmitRsp *rsp = (SSubmitRsp *)msg; SSubmitRsp *rsp = (SSubmitRsp *)msg;
...@@ -1298,7 +1299,6 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in ...@@ -1298,7 +1299,6 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
_return: _return:
if (pJob) { if (pJob) {
schReleaseJob(pParam->refId); schReleaseJob(pParam->refId);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册