提交 b271fee3 编写于 作者: H Haojun Liao

[td-11818] add log.

上级 1444f0b4
...@@ -89,7 +89,7 @@ enum { ...@@ -89,7 +89,7 @@ enum {
}; };
enum { enum {
MASTER_SCAN = 0x0u, MAIN_SCAN = 0x0u,
REVERSE_SCAN = 0x1u, REVERSE_SCAN = 0x1u,
REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan
MERGE_STAGE = 0x20u, MERGE_STAGE = 0x20u,
......
...@@ -64,6 +64,33 @@ enum { ...@@ -64,6 +64,33 @@ enum {
QUERY_OVER = 0x4u, QUERY_OVER = 0x4u,
}; };
enum OPERATOR_TYPE_E {
OP_TableScan = 1,
OP_DataBlocksOptScan = 2,
OP_TableSeqScan = 3,
OP_TagScan = 4,
OP_TableBlockInfoScan= 5,
OP_Aggregate = 6,
OP_Project = 7,
OP_Groupby = 8,
OP_Limit = 9,
OP_SLimit = 10,
OP_TimeWindow = 11,
OP_SessionWindow = 12,
OP_Fill = 13,
OP_MultiTableAggregate = 14,
OP_MultiTableTimeInterval = 15,
OP_DummyInput = 16, //TODO remove it after fully refactor.
OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream.
OP_GlobalAggregate = 18, // global merge for the multi-way data sources.
OP_Filter = 19,
OP_Distinct = 20,
OP_Join = 21,
OP_StateWindow = 22,
OP_AllTimeWindow = 23,
OP_AllMultiTableTimeInterval = 24,
OP_Order = 25,
};
typedef struct SResultRowCell { typedef struct SResultRowCell {
uint64_t groupId; uint64_t groupId;
...@@ -99,7 +126,7 @@ typedef struct STableQueryInfo { ...@@ -99,7 +126,7 @@ typedef struct STableQueryInfo {
TSKEY lastKey; TSKEY lastKey;
int32_t groupIndex; // group id in table list int32_t groupIndex; // group id in table list
SVariant tag; SVariant tag;
STimeWindow win; STimeWindow win; // todo remove it later
STSCursor cur; STSCursor cur;
void* pTable; // for retrieve the page id list void* pTable; // for retrieve the page id list
SResultRowInfo resInfo; SResultRowInfo resInfo;
...@@ -127,31 +154,34 @@ typedef struct { ...@@ -127,31 +154,34 @@ typedef struct {
int64_t sumRunTimes; int64_t sumRunTimes;
} SOperatorProfResult; } SOperatorProfResult;
typedef struct SQueryCostInfo { typedef struct STaskCostInfo {
uint64_t loadStatisTime; int64_t start;
uint64_t loadFileBlockTime; int64_t end;
uint64_t loadDataInCacheTime;
uint64_t loadStatisSize; uint64_t loadStatisTime;
uint64_t loadFileBlockSize; uint64_t loadFileBlockTime;
uint64_t loadDataInCacheSize; uint64_t loadDataInCacheTime;
uint64_t loadStatisSize;
uint64_t loadDataTime; uint64_t loadFileBlockSize;
uint64_t totalRows; uint64_t loadDataInCacheSize;
uint64_t totalCheckedRows;
uint32_t totalBlocks; uint64_t loadDataTime;
uint32_t loadBlocks; uint64_t totalRows;
uint32_t loadBlockStatis; uint64_t totalCheckedRows;
uint32_t discardBlocks; uint32_t totalBlocks;
uint64_t elapsedTime; uint32_t loadBlocks;
uint64_t firstStageMergeTime; uint32_t loadBlockStatis;
uint64_t winInfoSize; uint32_t discardBlocks;
uint64_t tableInfoSize; uint64_t elapsedTime;
uint64_t hashSize; uint64_t firstStageMergeTime;
uint64_t numOfTimeWindows; uint64_t winInfoSize;
uint64_t tableInfoSize;
SArray* queryProfEvents; //SArray<SQueryProfEvent> uint64_t hashSize;
SHashObj* operatorProfResults; //map<operator_type, SQueryProfEvent> uint64_t numOfTimeWindows;
} SQueryCostInfo;
SArray *queryProfEvents; //SArray<SQueryProfEvent>
SHashObj *operatorProfResults; //map<operator_type, SQueryProfEvent>
} STaskCostInfo;
typedef struct { typedef struct {
int64_t vgroupLimit; int64_t vgroupLimit;
...@@ -235,9 +265,33 @@ typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num); ...@@ -235,9 +265,33 @@ typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num);
struct SOperatorInfo; struct SOperatorInfo;
typedef struct STaskIdInfo {
uint64_t queryId; // this is also a request id
uint64_t subplanId;
uint64_t templateId;
uint64_t taskId; // this is a subplan id
} STaskIdInfo;
typedef struct STaskInfo {
STaskIdInfo id;
char *content;
uint32_t status;
STimeWindow window;
STaskCostInfo cost;
int64_t owner; // if it is in execution
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
pthread_mutex_t lock; // used to synchronize the rsp/query threads
// tsem_t ready;
// int32_t dataReady; // denote if query result is ready or not
// void* rspContext; // response context
char *sql; // query sql string
jmp_buf env;
} STaskInfo;
typedef struct STaskRuntimeEnv { typedef struct STaskRuntimeEnv {
jmp_buf env; jmp_buf env;
STaskAttr* pQueryAttr; STaskAttr* pQueryAttr;
uint32_t status; // query status uint32_t status; // query status
void* qinfo; void* qinfo;
uint8_t scanFlag; // denotes reversed scan of data or not uint8_t scanFlag; // denotes reversed scan of data or not
...@@ -287,9 +341,10 @@ typedef struct SOperatorInfo { ...@@ -287,9 +341,10 @@ typedef struct SOperatorInfo {
void *info; // extension attribution void *info; // extension attribution
SExprInfo *pExpr; SExprInfo *pExpr;
STaskRuntimeEnv *pRuntimeEnv; STaskRuntimeEnv *pRuntimeEnv;
STaskInfo *pTaskInfo;
struct SOperatorInfo ** pDownstream; // upstream pointer list struct SOperatorInfo **pDownstream; // downstram pointer list
int32_t numOfUpstream; // number of upstream. The value is always ONE expect for join operator int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
__operator_fn_t exec; __operator_fn_t exec;
__optr_cleanup_fn_t cleanup; __optr_cleanup_fn_t cleanup;
} SOperatorInfo; } SOperatorInfo;
...@@ -321,7 +376,7 @@ typedef struct SQInfo { ...@@ -321,7 +376,7 @@ typedef struct SQInfo {
void* rspContext; // response context void* rspContext; // response context
int64_t startExecTs; // start to exec timestamp int64_t startExecTs; // start to exec timestamp
char* sql; // query sql string char* sql; // query sql string
SQueryCostInfo summary; STaskCostInfo summary;
} SQInfo; } SQInfo;
typedef struct STaskParam { typedef struct STaskParam {
...@@ -365,9 +420,12 @@ typedef struct STableScanInfo { ...@@ -365,9 +420,12 @@ typedef struct STableScanInfo {
SSDataBlock block; SSDataBlock block;
int32_t numOfOutput; int32_t numOfOutput;
int64_t elapsedTime; int64_t elapsedTime;
int32_t tableIndex; int32_t tableIndex;
int32_t prevGroupId; // previous table group id
int32_t prevGroupId; // previous table group id
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
STimeWindow window;
} STableScanInfo; } STableScanInfo;
typedef struct STagScanInfo { typedef struct STagScanInfo {
...@@ -512,7 +570,7 @@ typedef struct SOrderOperatorInfo { ...@@ -512,7 +570,7 @@ typedef struct SOrderOperatorInfo {
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime); SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tcache.h"
#include "tglobal.h"
#include "tmsg.h"
#include "exception.h"
#include "thash.h"
#include "executorimpl.h"
#include "executor.h"
#include "tlosertree.h"
#include "ttypes.h"
#include "query.h"
typedef struct STaskMgmt {
pthread_mutex_t lock;
SCacheObj *qinfoPool; // query handle pool
int32_t vgId;
bool closed;
} STaskMgmt;
static void taskMgmtKillTaskFn(void* handle, void* param1) {
void** fp = (void**)handle;
qKillTask(*fp);
}
static void freeqinfoFn(void *qhandle) {
void** handle = qhandle;
if (handle == NULL || *handle == NULL) {
return;
}
qKillTask(*handle);
qDestroyTask(*handle);
}
void freeParam(STaskParam *param) {
tfree(param->sql);
tfree(param->tagCond);
tfree(param->tbnameCond);
tfree(param->pTableIdList);
taosArrayDestroy(param->pOperator);
tfree(param->pExprs);
tfree(param->pSecExprs);
tfree(param->pExpr);
tfree(param->pSecExpr);
tfree(param->pGroupColIndex);
tfree(param->pTagColumnInfo);
tfree(param->pGroupbyExpr);
tfree(param->prevResult);
}
// todo parse json to get the operator tree.
int32_t qCreateTask(void* tsdb, int32_t vgId, void* pQueryMsg, qTaskInfo_t* pTaskInfo, uint64_t taskId) {
assert(pQueryMsg != NULL && tsdb != NULL);
int32_t code = TSDB_CODE_SUCCESS;
#if 0
STaskParam param = {0};
code = convertQueryMsg(pQueryMsg, &param);
if (code != TSDB_CODE_SUCCESS) {
goto _over;
}
if (pQueryMsg->numOfTables <= 0) {
qError("Invalid number of tables to query, numOfTables:%d", pQueryMsg->numOfTables);
code = TSDB_CODE_QRY_INVALID_MSG;
goto _over;
}
if (param.pTableIdList == NULL || taosArrayGetSize(param.pTableIdList) == 0) {
qError("qmsg:%p, SQueryTableMsg wrong format", pQueryMsg);
code = TSDB_CODE_QRY_INVALID_MSG;
goto _over;
}
SQueriedTableInfo info = { .numOfTags = pQueryMsg->numOfTags, .numOfCols = pQueryMsg->numOfCols, .colList = pQueryMsg->tableCols};
if ((code = createQueryFunc(&info, pQueryMsg->numOfOutput, &param.pExprs, param.pExpr, param.pTagColumnInfo,
pQueryMsg->queryType, pQueryMsg, param.pUdfInfo)) != TSDB_CODE_SUCCESS) {
goto _over;
}
if (param.pSecExpr != NULL) {
if ((code = createIndirectQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, &param.pSecExprs, param.pSecExpr, param.pExprs, param.pUdfInfo)) != TSDB_CODE_SUCCESS) {
goto _over;
}
}
if (param.colCond != NULL) {
if ((code = createQueryFilter(param.colCond, pQueryMsg->colCondLen, &param.pFilters)) != TSDB_CODE_SUCCESS) {
goto _over;
}
}
param.pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, param.pGroupColIndex, &code);
if ((param.pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) {
goto _over;
}
bool isSTableQuery = false;
STableGroupInfo tableGroupInfo = {0};
int64_t st = taosGetTimestampUs();
if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_TABLE_QUERY)) {
STableIdInfo *id = taosArrayGet(param.pTableIdList, 0);
qDebug("qmsg:%p query normal table, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid);
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, pQueryMsg->window.skey, &tableGroupInfo)) != TSDB_CODE_SUCCESS) {
goto _over;
}
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_STABLE_QUERY)) {
isSTableQuery = true;
// also note there's possibility that only one table in the super table
if (!TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY)) {
STableIdInfo *id = taosArrayGet(param.pTableIdList, 0);
// group by normal column, do not pass the group by condition to tsdb to group table into different group
int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols;
if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(param.pGroupColIndex->flag)) {
numOfGroupByCols = 0;
}
qDebug("qmsg:%p query stable, uid:%"PRIu64", tid:%d", pQueryMsg, id->uid, id->tid);
code = tsdbQuerySTableByTagCond(tsdb, id->uid, pQueryMsg->window.skey, param.tagCond, pQueryMsg->tagCondLen,
pQueryMsg->tagNameRelType, param.tbnameCond, &tableGroupInfo, param.pGroupColIndex, numOfGroupByCols);
if (code != TSDB_CODE_SUCCESS) {
qError("qmsg:%p failed to query stable, reason: %s", pQueryMsg, tstrerror(code));
goto _over;
}
} else {
code = tsdbGetTableGroupFromIdList(tsdb, param.pTableIdList, &tableGroupInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _over;
}
qDebug("qmsg:%p query on %u tables in one group from client", pQueryMsg, tableGroupInfo.numOfTables);
}
int64_t el = taosGetTimestampUs() - st;
qDebug("qmsg:%p tag filter completed, numOfTables:%u, elapsed time:%"PRId64"us", pQueryMsg, tableGroupInfo.numOfTables, el);
} else {
assert(0);
}
code = checkForQueryBuf(tableGroupInfo.numOfTables);
if (code != TSDB_CODE_SUCCESS) { // not enough query buffer, abort
goto _over;
}
assert(pQueryMsg->stableQuery == isSTableQuery);
(*pTaskInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo,
param.pTagColumnInfo, param.pFilters, vgId, param.sql, qId, param.pUdfInfo);
param.sql = NULL;
param.pExprs = NULL;
param.pSecExprs = NULL;
param.pGroupbyExpr = NULL;
param.pTagColumnInfo = NULL;
param.pFilters = NULL;
if ((*pTaskInfo) == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _over;
}
param.pUdfInfo = NULL;
code = initQInfo(&pQueryMsg->tsBuf, tsdb, NULL, *pTaskInfo, &param, (char*)pQueryMsg, pQueryMsg->prevResultLen, NULL);
_over:
if (param.pGroupbyExpr != NULL) {
taosArrayDestroy(param.pGroupbyExpr->columnInfo);
}
tfree(param.colCond);
destroyUdfInfo(param.pUdfInfo);
taosArrayDestroy(param.pTableIdList);
param.pTableIdList = NULL;
freeParam(&param);
for (int32_t i = 0; i < pQueryMsg->numOfCols; i++) {
SColumnInfo* column = pQueryMsg->tableCols + i;
freeColumnFilterInfo(column->flist.filterInfo, column->flist.numOfFilters);
}
filterFreeInfo(param.pFilters);
//pTaskInfo already freed in initQInfo, but *pTaskInfo may not pointer to null;
if (code != TSDB_CODE_SUCCESS) {
*pTaskInfo = NULL;
}
#endif
// if failed to add ref for all tables in this query, abort current query
return code;
}
#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo){
if(pQInfo->sql) {
int ms = 0;
char* pcnt = strstr(pQInfo->sql, " count(*)");
if(pcnt) return 0;
char* pos = strstr(pQInfo->sql, " t_");
if(pos){
pos += 3;
ms = atoi(pos);
while(*pos >= '0' && *pos <= '9'){
pos ++;
}
char unit_char = *pos;
if(unit_char == 'h'){
ms *= 3600*1000;
} else if(unit_char == 'm'){
ms *= 60*1000;
} else if(unit_char == 's'){
ms *= 1000;
}
}
if(ms == 0) return 0;
printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
if(ms < 1000) {
taosMsleep(ms);
} else {
int used_ms = 0;
while(used_ms < ms) {
taosMsleep(1000);
used_ms += 1000;
if(isQueryKilled(pQInfo)){
printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
break;
}
}
}
}
return 1;
}
#endif
bool qExecTask(qTaskInfo_t qinfo, uint64_t *qId) {
SQInfo *pQInfo = (SQInfo *)qinfo;
assert(pQInfo && pQInfo->signature == pQInfo);
int64_t threadId = taosGetSelfPthreadId();
int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pQInfo->owner, 0, threadId)) != 0) {
qError("QInfo:0x%"PRIx64"-%p qhandle is now executed by thread:%p", pQInfo->qId, pQInfo, (void*) curOwner);
pQInfo->code = TSDB_CODE_QRY_IN_EXEC;
return false;
}
*qId = pQInfo->qId;
if(pQInfo->startExecTs == 0)
pQInfo->startExecTs = taosGetTimestampMs();
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:0x%"PRIx64" it is already killed, abort", pQInfo->qId);
return doBuildResCheck(pQInfo);
}
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:0x%"PRIx64" no table exists for query, abort", pQInfo->qId);
setTaskStatus(pRuntimeEnv, QUERY_COMPLETED);
return doBuildResCheck(pQInfo);
}
// error occurs, record the error code and return to client
int32_t ret = setjmp(pQInfo->runtimeEnv.env);
if (ret != TSDB_CODE_SUCCESS) {
publishQueryAbortEvent(pQInfo, ret);
pQInfo->code = ret;
qDebug("QInfo:0x%"PRIx64" query abort due to error/cancel occurs, code:%s", pQInfo->qId, tstrerror(pQInfo->code));
return doBuildResCheck(pQInfo);
}
qDebug("QInfo:0x%"PRIx64" query task is launched", pQInfo->qId);
bool newgroup = false;
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
int64_t st = taosGetTimestampUs();
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup);
pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st);
#ifdef TEST_IMPL
waitMoment(pQInfo);
#endif
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_AFTER_OPERATOR_EXEC);
pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv);
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:0x%"PRIx64" query is killed", pQInfo->qId);
} else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) {
qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pQInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables,
pRuntimeEnv->resultInfo.total);
} else {
qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows", pQInfo->qId,
GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total);
}
return doBuildResCheck(pQInfo);
}
int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) {
SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
qError("QInfo invalid qhandle");
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
*buildRes = false;
if (IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:0x%"PRIx64" query is killed, code:0x%08x", pQInfo->qId, pQInfo->code);
return pQInfo->code;
}
int32_t code = TSDB_CODE_SUCCESS;
if (tsRetrieveBlockingModel) {
pQInfo->rspContext = pRspContext;
tsem_wait(&pQInfo->ready);
*buildRes = true;
code = pQInfo->code;
} else {
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STaskAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
pthread_mutex_lock(&pQInfo->lock);
assert(pQInfo->rspContext == NULL);
if (pQInfo->dataReady == QUERY_RESULT_READY) {
*buildRes = true;
qDebug("QInfo:0x%"PRIx64" retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo->qId, pQueryAttr->resultRowSize,
GET_NUM_OF_RESULTS(pRuntimeEnv), tstrerror(pQInfo->code));
} else {
*buildRes = false;
qDebug("QInfo:0x%"PRIx64" retrieve req set query return result after paused", pQInfo->qId);
pQInfo->rspContext = pRspContext;
assert(pQInfo->rspContext != NULL);
}
code = pQInfo->code;
pthread_mutex_unlock(&pQInfo->lock);
}
return code;
}
void* qGetResultRetrieveMsg(qTaskInfo_t qinfo) {
SQInfo* pQInfo = (SQInfo*) qinfo;
assert(pQInfo != NULL);
return pQInfo->rspContext;
}
int32_t qKillTask(qTaskInfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qDebug("QInfo:0x%"PRIx64" query killed", pQInfo->qId);
setQueryKilled(pQInfo);
// Wait for the query executing thread being stopped/
// Once the query is stopped, the owner of qHandle will be cleared immediately.
while (pQInfo->owner != 0) {
taosMsleep(100);
}
return TSDB_CODE_SUCCESS;
}
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
return isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQInfo->runtimeEnv.status, QUERY_OVER);
}
void qDestroyTask(qTaskInfo_t qHandle) {
SQInfo* pQInfo = (SQInfo*) qHandle;
if (!isValidQInfo(pQInfo)) {
return;
}
qDebug("QInfo:0x%"PRIx64" query completed", pQInfo->qId);
queryCostStatis(pQInfo); // print the query cost summary
doDestroyTask(pQInfo);
}
void* qOpenTaskMgmt(int32_t vgId) {
const int32_t refreshHandleInterval = 30; // every 30 seconds, refresh handle pool
char cacheName[128] = {0};
sprintf(cacheName, "qhandle_%d", vgId);
STaskMgmt* pTaskMgmt = calloc(1, sizeof(STaskMgmt));
if (pTaskMgmt == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pTaskMgmt->qinfoPool = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshHandleInterval, true, freeqinfoFn, cacheName);
pTaskMgmt->closed = false;
pTaskMgmt->vgId = vgId;
pthread_mutex_init(&pTaskMgmt->lock, NULL);
qDebug("vgId:%d, open queryTaskMgmt success", vgId);
return pTaskMgmt;
}
void qTaskMgmtNotifyClosing(void* pQMgmt) {
if (pQMgmt == NULL) {
return;
}
STaskMgmt* pQueryMgmt = pQMgmt;
qInfo("vgId:%d, set querymgmt closed, wait for all queries cancelled", pQueryMgmt->vgId);
pthread_mutex_lock(&pQueryMgmt->lock);
pQueryMgmt->closed = true;
pthread_mutex_unlock(&pQueryMgmt->lock);
taosCacheRefresh(pQueryMgmt->qinfoPool, taskMgmtKillTaskFn, NULL);
}
void qQueryMgmtReOpen(void *pQMgmt) {
if (pQMgmt == NULL) {
return;
}
STaskMgmt *pQueryMgmt = pQMgmt;
qInfo("vgId:%d, set querymgmt reopen", pQueryMgmt->vgId);
pthread_mutex_lock(&pQueryMgmt->lock);
pQueryMgmt->closed = false;
pthread_mutex_unlock(&pQueryMgmt->lock);
}
void qCleanupTaskMgmt(void* pQMgmt) {
if (pQMgmt == NULL) {
return;
}
STaskMgmt* pQueryMgmt = pQMgmt;
int32_t vgId = pQueryMgmt->vgId;
assert(pQueryMgmt->closed);
SCacheObj* pqinfoPool = pQueryMgmt->qinfoPool;
pQueryMgmt->qinfoPool = NULL;
taosCacheCleanup(pqinfoPool);
pthread_mutex_destroy(&pQueryMgmt->lock);
tfree(pQueryMgmt);
qDebug("vgId:%d, queryMgmt cleanup completed", vgId);
}
void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo) {
if (pMgmt == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
}
STaskMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL) {
qError("QInfo:0x%"PRIx64"-%p failed to add qhandle into qMgmt, since qMgmt is closed", qId, (void*)qInfo);
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
}
pthread_mutex_lock(&pQueryMgmt->lock);
if (pQueryMgmt->closed) {
pthread_mutex_unlock(&pQueryMgmt->lock);
qError("QInfo:0x%"PRIx64"-%p failed to add qhandle into cache, since qMgmt is colsing", qId, (void*)qInfo);
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
} else {
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &qId, sizeof(qId), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE),
(getMaximumIdleDurationSec()*1000));
pthread_mutex_unlock(&pQueryMgmt->lock);
return handle;
}
}
void** qAcquireTask(void* pMgmt, uint64_t _key) {
STaskMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->closed) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
}
if (pQueryMgmt->qinfoPool == NULL) {
terrno = TSDB_CODE_QRY_INVALID_QHANDLE;
return NULL;
}
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, &_key, sizeof(_key));
if (handle == NULL || *handle == NULL) {
terrno = TSDB_CODE_QRY_INVALID_QHANDLE;
return NULL;
} else {
return handle;
}
}
void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle) {
STaskMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL) {
return NULL;
}
taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle);
return 0;
}
#if 0
//kill by qid
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount) {
int32_t error = TSDB_CODE_SUCCESS;
void** handle = qAcquireTask(pMgmt, qId);
if(handle == NULL) return terrno;
SQInfo* pQInfo = (SQInfo*)(*handle);
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId);
setQueryKilled(pQInfo);
// wait query stop
int32_t loop = 0;
while (pQInfo->owner != 0) {
taosMsleep(waitMs);
if(loop++ > waitCount){
error = TSDB_CODE_FAILED;
break;
}
}
qReleaseTask(pMgmt, (void **)&handle, true);
return error;
}
#endif
\ No newline at end of file
...@@ -18,18 +18,19 @@ ...@@ -18,18 +18,19 @@
#include "ttime.h" #include "ttime.h"
#include "exception.h" #include "exception.h"
#include "function.h" #include "../../../../contrib/cJson/cJSON.h"
#include "executorimpl.h" #include "executorimpl.h"
#include "thash.h"
#include "function.h" #include "function.h"
#include "tcompare.h" #include "tcompare.h"
#include "tcompression.h" #include "tcompression.h"
#include "thash.h"
#include "ttypes.h" #include "ttypes.h"
#include "query.h"
#define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN) #define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN) #define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN)
#define IS_REPEAT_SCAN(runtime) ((runtime)->scanFlag == REPEAT_SCAN) #define IS_REPEAT_SCAN(runtime) ((runtime)->scanFlag == REPEAT_SCAN)
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN) #define SET_MAIN_SCAN_FLAG(runtime) ((runtime)->scanFlag = MAIN_SCAN)
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey)) #define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey))
...@@ -203,10 +204,12 @@ static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput); ...@@ -203,10 +204,12 @@ static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOperatorInfo(SOperatorInfo* pOperator); static void destroyOperatorInfo(SOperatorInfo* pOperator);
void setTaskStatus(STaskInfo *pTaskInfo, int8_t status);
static void doSetOperatorCompleted(SOperatorInfo* pOperator) { static void doSetOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
if (pOperator->pRuntimeEnv != NULL) { if (pOperator->pTaskInfo != NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
} }
} }
...@@ -1332,7 +1335,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1332,7 +1335,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows); TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows);
STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQueryAttr); STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQueryAttr);
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); bool masterScan = IS_MAIN_SCAN(pRuntimeEnv);
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx, int32_t ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
...@@ -1440,7 +1443,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1440,7 +1443,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows); TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows);
STimeWindow win = getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr); STimeWindow win = getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr);
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); bool masterScan = IS_MAIN_SCAN(pRuntimeEnv);
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
int32_t forwardStep = 0; int32_t forwardStep = 0;
...@@ -1584,7 +1587,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf ...@@ -1584,7 +1587,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
// primary timestamp column // primary timestamp column
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0);
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); bool masterScan = IS_MAIN_SCAN(pRuntimeEnv);
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
int64_t gap = pOperator->pRuntimeEnv->pQueryAttr->sw.gap; int64_t gap = pOperator->pRuntimeEnv->pQueryAttr->sw.gap;
...@@ -1990,30 +1993,30 @@ static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv, int32_t numOfT ...@@ -1990,30 +1993,30 @@ static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv, int32_t numOfT
// case OP_MultiTableTimeInterval: { // case OP_MultiTableTimeInterval: {
// pRuntimeEnv->proot = // pRuntimeEnv->proot =
// createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); // createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); // setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
// break; // break;
// } // }
// case OP_AllMultiTableTimeInterval: { // case OP_AllMultiTableTimeInterval: {
// pRuntimeEnv->proot = // pRuntimeEnv->proot =
// createAllMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); // createAllMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); // setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
// break; // break;
// } // }
// case OP_TimeWindow: { // case OP_TimeWindow: {
// pRuntimeEnv->proot = // pRuntimeEnv->proot =
// createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); // createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; // int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType;
// if (opType != OP_DummyInput && opType != OP_Join) { // if (opType != OP_DummyInput && opType != OP_Join) {
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); // setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
// } // }
// break; // break;
// } // }
// case OP_AllTimeWindow: { // case OP_AllTimeWindow: {
// pRuntimeEnv->proot = // pRuntimeEnv->proot =
// createAllTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); // createAllTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; // int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType;
// if (opType != OP_DummyInput && opType != OP_Join) { // if (opType != OP_DummyInput && opType != OP_Join) {
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); // setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
// } // }
// break; // break;
// } // }
...@@ -2021,34 +2024,34 @@ static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv, int32_t numOfT ...@@ -2021,34 +2024,34 @@ static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv, int32_t numOfT
// pRuntimeEnv->proot = // pRuntimeEnv->proot =
// createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); // createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// //
// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; // int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType;
// if (opType != OP_DummyInput) { // if (opType != OP_DummyInput) {
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); // setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
// } // }
// break; // break;
// } // }
// case OP_SessionWindow: { // case OP_SessionWindow: {
// pRuntimeEnv->proot = // pRuntimeEnv->proot =
// createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); // createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; // int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType;
// if (opType != OP_DummyInput) { // if (opType != OP_DummyInput) {
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); // setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
// } // }
// break; // break;
// } // }
// case OP_MultiTableAggregate: { // case OP_MultiTableAggregate: {
// pRuntimeEnv->proot = // pRuntimeEnv->proot =
// createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); // createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); // setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
// break; // break;
// } // }
// case OP_Aggregate: { // case OP_Aggregate: {
// pRuntimeEnv->proot = // pRuntimeEnv->proot =
// createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); // createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// //
// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; // int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType;
// if (opType != OP_DummyInput && opType != OP_Join) { // if (opType != OP_DummyInput && opType != OP_Join) {
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); // setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
// } // }
// break; // break;
// } // }
...@@ -2070,9 +2073,9 @@ static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv, int32_t numOfT ...@@ -2070,9 +2073,9 @@ static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv, int32_t numOfT
// //
// case OP_StateWindow: { // case OP_StateWindow: {
// pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); // pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; // int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType;
// if (opType != OP_DummyInput) { // if (opType != OP_DummyInput) {
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); // setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
// } // }
// break; // break;
// } // }
...@@ -2884,22 +2887,35 @@ void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFi ...@@ -2884,22 +2887,35 @@ void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFi
} }
} }
int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, int32_t loadDataBlock(STaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
uint32_t* status) { STaskCostInfo* pCost = &pTaskInfo->cost;
pCost->totalBlocks += 1;
pCost->totalRows += pBlock->info.rows;
pCost->totalCheckedRows += pBlock->info.rows;
pCost->loadBlocks += 1;
// pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL);
if (pBlock->pDataBlock == NULL) {
return terrno;
}
}
int32_t loadDataBlockOnDemand(STaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
*status = BLK_DATA_NO_NEEDED; *status = BLK_DATA_NO_NEEDED;
pBlock->pDataBlock = NULL;
pBlock->pBlockAgg = NULL;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; pBlock->pDataBlock = NULL;
int64_t groupId = pRuntimeEnv->current->groupIndex; pBlock->pBlockAgg = NULL;
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
// int64_t groupId = pRuntimeEnv->current->groupIndex;
// bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
SQInfo* pQInfo = pRuntimeEnv->qinfo; STaskCostInfo* pCost = &pTaskInfo->cost;
SQueryCostInfo* pCost = &pQInfo->summary;
pCost->totalBlocks += 1; pCost->totalBlocks += 1;
pCost->totalRows += pBlock->info.rows; pCost->totalRows += pBlock->info.rows;
#if 0
if (pRuntimeEnv->pTsBuf != NULL) { if (pRuntimeEnv->pTsBuf != NULL) {
(*status) = BLK_DATA_ALL_NEEDED; (*status) = BLK_DATA_ALL_NEEDED;
...@@ -2924,7 +2940,7 @@ int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTab ...@@ -2924,7 +2940,7 @@ int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTab
// Calculate all time windows that are overlapping or contain current data block. // Calculate all time windows that are overlapping or contain current data block.
// If current data block is contained by all possible time window, do not load current data block. // If current data block is contained by all possible time window, do not load current data block.
if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 || if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
(QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pQueryAttr, &pBlock->info))) { (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
(*status) = BLK_DATA_ALL_NEEDED; (*status) = BLK_DATA_ALL_NEEDED;
} }
...@@ -2937,7 +2953,7 @@ int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTab ...@@ -2937,7 +2953,7 @@ int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTab
if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); bool masterScan = IS_MAIN_SCAN(pRuntimeEnv);
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
...@@ -2966,7 +2982,7 @@ int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTab ...@@ -2966,7 +2982,7 @@ int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTab
} }
SDataBlockInfo* pBlockInfo = &pBlock->info; SDataBlockInfo* pBlockInfo = &pBlock->info;
*status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status); // *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
if ((*status) == BLK_DATA_NO_NEEDED || (*status) == BLK_DATA_DISCARD) { if ((*status) == BLK_DATA_NO_NEEDED || (*status) == BLK_DATA_DISCARD) {
//qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey, //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
...@@ -2993,7 +3009,7 @@ int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTab ...@@ -2993,7 +3009,7 @@ int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTab
if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); bool masterScan = IS_MAIN_SCAN(pRuntimeEnv);
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
...@@ -3045,7 +3061,7 @@ int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTab ...@@ -3045,7 +3061,7 @@ int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTab
// filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery); // filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
// } // }
} }
#endif
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3264,9 +3280,8 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo) ...@@ -3264,9 +3280,8 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo)
} }
} }
static void setupQueryRangeForReverseScan(STaskRuntimeEnv* pRuntimeEnv) { static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; #if 0
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv)); int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
for(int32_t i = 0; i < numOfGroups; ++i) { for(int32_t i = 0; i < numOfGroups; ++i) {
SArray *group = GET_TABLEGROUP(pRuntimeEnv, i); SArray *group = GET_TABLEGROUP(pRuntimeEnv, i);
...@@ -3285,6 +3300,8 @@ static void setupQueryRangeForReverseScan(STaskRuntimeEnv* pRuntimeEnv) { ...@@ -3285,6 +3300,8 @@ static void setupQueryRangeForReverseScan(STaskRuntimeEnv* pRuntimeEnv) {
// assert(pCheckInfo->pTable == pTableKeyInfo->pTable); // assert(pCheckInfo->pTable == pTableKeyInfo->pTable);
} }
} }
#endif
} }
void switchCtxOrder(SQLFunctionCtx* pCtx, int32_t numOfOutput) { void switchCtxOrder(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
...@@ -3432,35 +3449,33 @@ void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) { ...@@ -3432,35 +3449,33 @@ void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) {
} }
} }
void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status) { void setTaskStatus(STaskInfo *pTaskInfo, int8_t status) {
if (status == QUERY_NOT_COMPLETED) { if (status == QUERY_NOT_COMPLETED) {
pRuntimeEnv->status = status; pTaskInfo->status = status;
} else { } else {
// QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
CLEAR_QUERY_STATUS(pRuntimeEnv, QUERY_NOT_COMPLETED); CLEAR_QUERY_STATUS(pTaskInfo, QUERY_NOT_COMPLETED);
pRuntimeEnv->status |= status; pTaskInfo->status |= status;
} }
} }
static void setupEnvForReverseScan(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SQLFunctionCtx* pCtx, int32_t numOfOutput) { static void setupEnvForReverseScan(STableScanInfo *pTableScanInfo, SQLFunctionCtx* pCtx, int32_t numOfOutput) {
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; // if (pRuntimeEnv->pTsBuf) {
// SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order);
if (pRuntimeEnv->pTsBuf) { // bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf);
SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order); // assert(ret);
bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf); // }
assert(ret);
}
// reverse order time range // reverse order time range
SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); SWAP(pTableScanInfo->window.skey, pTableScanInfo->window.ekey, TSKEY);
SET_REVERSE_SCAN_FLAG(pRuntimeEnv); SET_REVERSE_SCAN_FLAG(pTableScanInfo);
setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED); // setTaskStatus(pTableScanInfo, QUERY_NOT_COMPLETED);
switchCtxOrder(pCtx, numOfOutput); switchCtxOrder(pCtx, numOfOutput);
SWITCH_ORDER(pQueryAttr->order.order);
setupQueryRangeForReverseScan(pRuntimeEnv); SWITCH_ORDER(pTableScanInfo->order);
setupQueryRangeForReverseScan(pTableScanInfo);
} }
void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
...@@ -4041,7 +4056,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -4041,7 +4056,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
// Check if query is completed or not for stable query or normal table query respectively. // Check if query is completed or not for stable query or normal table query respectively.
if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) && pRuntimeEnv->proot->status == OP_EXEC_DONE) { if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) && pRuntimeEnv->proot->status == OP_EXEC_DONE) {
setQueryStatus(pRuntimeEnv, QUERY_OVER); // setTaskStatus(pOperator->pTaskInfo, QUERY_OVER);
} }
} }
...@@ -4159,7 +4174,7 @@ void calculateOperatorProfResults(SQInfo* pQInfo) { ...@@ -4159,7 +4174,7 @@ void calculateOperatorProfResults(SQInfo* pQInfo) {
void queryCostStatis(SQInfo *pQInfo) { void queryCostStatis(SQInfo *pQInfo) {
STaskRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; STaskRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryCostInfo *pSummary = &pQInfo->summary; STaskCostInfo *pSummary = &pQInfo->summary;
uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable); uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map); hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
...@@ -4435,11 +4450,11 @@ void queryCostStatis(SQInfo *pQInfo) { ...@@ -4435,11 +4450,11 @@ void queryCostStatis(SQInfo *pQInfo) {
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream) { void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream) {
if (p->pDownstream == NULL) { if (p->pDownstream == NULL) {
assert(p->numOfUpstream == 0); assert(p->numOfDownstream == 0);
} }
p->pDownstream = realloc(p->pDownstream, POINTER_BYTES * (p->numOfUpstream + 1)); p->pDownstream = realloc(p->pDownstream, POINTER_BYTES * (p->numOfDownstream + 1));
p->pDownstream[p->numOfUpstream++] = pUpstream; p->pDownstream[p->numOfDownstream++] = pUpstream;
} }
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo); static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo);
...@@ -4587,21 +4602,21 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr ...@@ -4587,21 +4602,21 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
return code; return code;
} }
setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED); // setTaskStatus(pOperator->pTaskInfo, QUERY_NOT_COMPLETED);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void doTableQueryInfoTimeWindowCheck(STaskAttr* pQueryAttr, STableQueryInfo* pTableQueryInfo) { static void doTableQueryInfoTimeWindowCheck(STaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) {
if (QUERY_IS_ASC_QUERY(pQueryAttr)) { if (order == TSDB_ORDER_ASC) {
assert( assert(
(pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) && (pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) &&
(pTableQueryInfo->lastKey >= pTableQueryInfo->win.skey) && (pTableQueryInfo->lastKey >= pTaskInfo->window.skey) &&
(pTableQueryInfo->win.skey >= pQueryAttr->window.skey && pTableQueryInfo->win.ekey <= pQueryAttr->window.ekey)); (pTableQueryInfo->win.skey >= pTaskInfo->window.skey && pTableQueryInfo->win.ekey <= pTaskInfo->window.ekey));
} else { } else {
assert( assert(
(pTableQueryInfo->win.skey >= pTableQueryInfo->win.ekey) && (pTableQueryInfo->win.skey >= pTableQueryInfo->win.ekey) &&
(pTableQueryInfo->lastKey <= pTableQueryInfo->win.skey) && (pTableQueryInfo->lastKey <= pTaskInfo->window.skey) &&
(pTableQueryInfo->win.skey <= pQueryAttr->window.skey && pTableQueryInfo->win.ekey >= pQueryAttr->window.ekey)); (pTableQueryInfo->win.skey <= pTaskInfo->window.skey && pTableQueryInfo->win.ekey >= pTaskInfo->window.ekey));
} }
} }
...@@ -4664,44 +4679,37 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) { ...@@ -4664,44 +4679,37 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*) param; SOperatorInfo *pOperator = (SOperatorInfo*) param;
STableScanInfo *pTableScanInfo = pOperator->info; STableScanInfo *pTableScanInfo = pOperator->info;
STaskInfo *pTaskInfo = pOperator->pTaskInfo;
SSDataBlock *pBlock = &pTableScanInfo->block; SSDataBlock *pBlock = &pTableScanInfo->block;
STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; STableGroupInfo *pTableGroupInfo = &pOperator->pTaskInfo->tableqinfoGroupInfo;
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STableGroupInfo *pTableGroupInfo = &pOperator->pRuntimeEnv->tableqinfoGroupInfo;
*newgroup = false; *newgroup = false;
#if 0
while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { while (/*tsdbNextDataBlock(pTableScanInfo->pQueryHandle)*/1) {
if (isQueryKilled(pOperator->pRuntimeEnv->qinfo)) { if (isQueryKilled(pOperator->pRuntimeEnv->qinfo)) {
longjmp(pOperator->pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pOperator->pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
pTableScanInfo->numOfBlocks += 1; pTableScanInfo->numOfBlocks += 1;
tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); // tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info);
// todo opt // todo opt
if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) { // if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) {
STableQueryInfo** pTableQueryInfo = // STableQueryInfo** pTableQueryInfo =
(STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.uid, sizeof(pBlock->info.uid)); // (STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.uid, sizeof(pBlock->info.uid));
if (pTableQueryInfo == NULL) { // if (pTableQueryInfo == NULL) {
break; // break;
} // }
//
pRuntimeEnv->current = *pTableQueryInfo; // pRuntimeEnv->current = *pTableQueryInfo;
doTableQueryInfoTimeWindowCheck(pQueryAttr, *pTableQueryInfo); // doTableQueryInfoTimeWindowCheck(pTaskInfo, *pTableQueryInfo, pTableScanInfo->order);
// }
if (pRuntimeEnv->enableGroupData) {
if(pTableScanInfo->prevGroupId != -1 && pTableScanInfo->prevGroupId != (*pTableQueryInfo)->groupIndex) {
*newgroup = true;
}
}
pTableScanInfo->prevGroupId = (*pTableQueryInfo)->groupIndex;
}
// this function never returns error? // this function never returns error?
uint32_t status; uint32_t status;
int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); int32_t code = loadDataBlock(pTaskInfo, pTableScanInfo, pBlock, &status);
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pOperator->pRuntimeEnv->env, code); longjmp(pOperator->pRuntimeEnv->env, code);
} }
...@@ -4713,7 +4721,6 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) { ...@@ -4713,7 +4721,6 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) {
return pBlock; return pBlock;
} }
#endif
return NULL; return NULL;
} }
...@@ -4721,9 +4728,8 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) { ...@@ -4721,9 +4728,8 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) {
static SSDataBlock* doTableScan(void* param, bool *newgroup) { static SSDataBlock* doTableScan(void* param, bool *newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
STableScanInfo *pTableScanInfo = pOperator->info; STableScanInfo *pTableScanInfo = pOperator->info;
STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; STaskInfo *pTaskInfo = pOperator->pTaskInfo;
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo; SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo;
*newgroup = false; *newgroup = false;
...@@ -4746,14 +4752,14 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { ...@@ -4746,14 +4752,14 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
// STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); // STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
// tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond); // tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond);
setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED); setTaskStatus(pTaskInfo, QUERY_NOT_COMPLETED);
pRuntimeEnv->scanFlag = REPEAT_SCAN; pTableScanInfo->scanFlag = REPEAT_SCAN;
if (pRuntimeEnv->pTsBuf) {
bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf);
assert(ret);
}
// if (pTaskInfo->pTsBuf) {
// bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf);
// assert(ret);
// }
//
if (pResultRowInfo->size > 0) { if (pResultRowInfo->size > 0) {
pResultRowInfo->curPos = 0; pResultRowInfo->curPos = 0;
} }
...@@ -4763,17 +4769,15 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { ...@@ -4763,17 +4769,15 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
} }
SSDataBlock *p = NULL; SSDataBlock *p = NULL;
// todo refactor
if (pTableScanInfo->reverseTimes > 0) { if (pTableScanInfo->reverseTimes > 0) {
setupEnvForReverseScan(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput); setupEnvForReverseScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
// STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); // STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
// tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond); // tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond);
//qDebug("QInfo:0x%"PRIx64" start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, //qDebug("QInfo:0x%"PRIx64" start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
// GET_QID(pRuntimeEnv), cond.twindow.skey, cond.twindow.ekey); // GET_QID(pRuntimeEnv), cond.twindow.skey, cond.twindow.ekey);
pRuntimeEnv->scanFlag = REVERSE_SCAN;
pTableScanInfo->times = 1; pTableScanInfo->times = 1;
pTableScanInfo->current = 0; pTableScanInfo->current = 0;
pTableScanInfo->reverseTimes = 0; pTableScanInfo->reverseTimes = 0;
...@@ -4838,25 +4842,25 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) { ...@@ -4838,25 +4842,25 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
} }
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime) { SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime) {
assert(repeatTime > 0); assert(repeatTime > 0 && numOfOutput > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->times = repeatTime; pInfo->times = repeatTime;
pInfo->reverseTimes = 0; pInfo->reverseTimes = 0;
pInfo->order = pRuntimeEnv->pQueryAttr->order.order; pInfo->order = order;
pInfo->current = 0; pInfo->current = 0;
// pInfo->prevGroupId = -1; pInfo->scanFlag = MAIN_SCAN;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableScanOperator"; pOperator->name = "TableScanOperator";
// pOperator->operatorType = OP_TableScan; pOperator->operatorType = OP_TableScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = NULL;
pOperator->exec = doTableScan; pOperator->exec = doTableScan;
return pOperator; return pOperator;
...@@ -5080,7 +5084,7 @@ static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -5080,7 +5084,7 @@ static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
tfree(pInfo->prevRow); tfree(pInfo->prevRow);
} }
SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) { SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
...@@ -5143,7 +5147,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S ...@@ -5143,7 +5147,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
pOperator->exec = doGlobalAggregate; pOperator->exec = doGlobalAggregate;
pOperator->cleanup = destroyGlobalAggOperatorInfo; pOperator->cleanup = destroyGlobalAggOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, downstream);
return pOperator; return pOperator;
} }
...@@ -5259,7 +5263,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { ...@@ -5259,7 +5263,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
} }
SOperatorInfo *createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal) { SOperatorInfo *createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal) {
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo));
{ {
...@@ -5292,7 +5296,7 @@ SOperatorInfo *createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn ...@@ -5292,7 +5296,7 @@ SOperatorInfo *createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn
pOperator->cleanup = destroyOrderOperatorInfo; pOperator->cleanup = destroyOrderOperatorInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, downstream);
return pOperator; return pOperator;
} }
...@@ -5315,12 +5319,12 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { ...@@ -5315,12 +5319,12 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order; int32_t order = pQueryAttr->order.order;
SOperatorInfo* upstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) { while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = downstream->exec(downstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
...@@ -5330,8 +5334,8 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { ...@@ -5330,8 +5334,8 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
} }
// if (upstream->operatorType == OP_DataBlocksOptScan) { // if (downstream->operatorType == OP_DataBlocksOptScan) {
// STableScanInfo* pScanInfo = upstream->info; // STableScanInfo* pScanInfo = downstream->info;
// order = getTableScanOrder(pScanInfo); // order = getTableScanOrder(pScanInfo);
// } // }
...@@ -5372,12 +5376,12 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) { ...@@ -5372,12 +5376,12 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order; int32_t order = pQueryAttr->order.order;
SOperatorInfo* upstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) { while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = downstream->exec(downstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
...@@ -5385,8 +5389,8 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) { ...@@ -5385,8 +5389,8 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// if (upstream->operatorType == OP_DataBlocksOptScan) { // if (downstream->operatorType == OP_DataBlocksOptScan) {
// STableScanInfo* pScanInfo = upstream->info; // STableScanInfo* pScanInfo = downstream->info;
// order = getTableScanOrder(pScanInfo); // order = getTableScanOrder(pScanInfo);
// } // }
...@@ -5463,7 +5467,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { ...@@ -5463,7 +5467,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
while(1) { while(1) {
bool prevVal = *newgroup; bool prevVal = *newgroup;
// The upstream exec may change the value of the newgroup, so use a local variable instead. // The downstream exec may change the value of the newgroup, so use a local variable instead.
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); SSDataBlock* pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
...@@ -5472,7 +5476,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { ...@@ -5472,7 +5476,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
assert(*newgroup == false); assert(*newgroup == false);
*newgroup = prevVal; *newgroup = prevVal;
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
break; break;
} }
...@@ -5616,12 +5620,12 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { ...@@ -5616,12 +5620,12 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
int32_t order = pQueryAttr->order.order; int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window; STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) { while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = downstream->exec(downstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
...@@ -5640,7 +5644,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { ...@@ -5640,7 +5644,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pIntervalInfo->resultRowInfo); closeAllResultRows(&pIntervalInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo);
...@@ -5676,12 +5680,12 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { ...@@ -5676,12 +5680,12 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
int32_t order = pQueryAttr->order.order; int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window; STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) { while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = downstream->exec(downstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
...@@ -5700,7 +5704,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { ...@@ -5700,7 +5704,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pIntervalInfo->resultRowInfo); closeAllResultRows(&pIntervalInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo);
...@@ -5739,12 +5743,12 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -5739,12 +5743,12 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order; int32_t order = pQueryAttr->order.order;
SOperatorInfo* upstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) { while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = downstream->exec(downstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
...@@ -5763,7 +5767,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -5763,7 +5767,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
pQueryAttr->order.order = order; // TODO : restore the order pQueryAttr->order.order = order; // TODO : restore the order
doCloseAllTimeWindow(pRuntimeEnv); doCloseAllTimeWindow(pRuntimeEnv);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
...@@ -5794,12 +5798,12 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -5794,12 +5798,12 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order; int32_t order = pQueryAttr->order.order;
SOperatorInfo* upstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) { while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = downstream->exec(downstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
...@@ -5818,7 +5822,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -5818,7 +5822,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
pQueryAttr->order.order = order; // TODO : restore the order pQueryAttr->order.order = order; // TODO : restore the order
doCloseAllTimeWindow(pRuntimeEnv); doCloseAllTimeWindow(pRuntimeEnv);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
...@@ -5839,7 +5843,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -5839,7 +5843,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); bool masterScan = IS_MAIN_SCAN(pRuntimeEnv);
int16_t bytes = pColInfoData->info.bytes; int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type; int16_t type = pColInfoData->info.type;
...@@ -5930,11 +5934,11 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { ...@@ -5930,11 +5934,11 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order; int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window; STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) { while (1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = downstream->exec(downstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
...@@ -5952,7 +5956,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { ...@@ -5952,7 +5956,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo); closeAllResultRows(&pBInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
...@@ -5991,12 +5995,12 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { ...@@ -5991,12 +5995,12 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
int32_t order = pQueryAttr->order.order; int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window; STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) { while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = downstream->exec(downstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
...@@ -6012,7 +6016,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { ...@@ -6012,7 +6016,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo); closeAllResultRows(&pBInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); // setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
...@@ -6044,12 +6048,12 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { ...@@ -6044,12 +6048,12 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
SOperatorInfo* upstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) { while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = downstream->exec(downstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
...@@ -6066,7 +6070,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { ...@@ -6066,7 +6070,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->binfo.resultRowInfo); closeAllResultRows(&pInfo->binfo.resultRowInfo);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); // setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
if (!pRuntimeEnv->pQueryAttr->stableQuery) { // finalize include the update of result rows if (!pRuntimeEnv->pQueryAttr->stableQuery) { // finalize include the update of result rows
finalizeQueryResult(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); finalizeQueryResult(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
...@@ -6212,19 +6216,19 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { ...@@ -6212,19 +6216,19 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
} }
if (pOperator->pDownstream != NULL) { if (pOperator->pDownstream != NULL) {
for(int32_t i = 0; i < pOperator->numOfUpstream; ++i) { for(int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
destroyOperatorInfo(pOperator->pDownstream[i]); destroyOperatorInfo(pOperator->pDownstream[i]);
} }
tfree(pOperator->pDownstream); tfree(pOperator->pDownstream);
pOperator->numOfUpstream = 0; pOperator->numOfDownstream = 0;
} }
tfree(pOperator->info); tfree(pOperator->info);
tfree(pOperator); tfree(pOperator);
} }
SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
...@@ -6236,7 +6240,7 @@ SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperat ...@@ -6236,7 +6240,7 @@ SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperat
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->seed = rand(); pInfo->seed = rand();
setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MASTER_SCAN); setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MAIN_SCAN);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableAggregate"; pOperator->name = "TableAggregate";
...@@ -6250,7 +6254,7 @@ SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperat ...@@ -6250,7 +6254,7 @@ SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperat
pOperator->exec = doAggregate; pOperator->exec = doAggregate;
pOperator->cleanup = destroyAggOperatorInfo; pOperator->cleanup = destroyAggOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, downstream);
return pOperator; return pOperator;
} }
...@@ -6324,7 +6328,7 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -6324,7 +6328,7 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = destroyOutputBuf(pInfo->pRes); pInfo->pRes = destroyOutputBuf(pInfo->pRes);
} }
SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
...@@ -6345,12 +6349,12 @@ SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOp ...@@ -6345,12 +6349,12 @@ SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOp
pOperator->exec = doSTableAggregate; pOperator->exec = doSTableAggregate;
pOperator->cleanup = destroyAggOperatorInfo; pOperator->cleanup = destroyAggOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, downstream);
return pOperator; return pOperator;
} }
SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo)); SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo));
pInfo->seed = rand(); pInfo->seed = rand();
...@@ -6361,7 +6365,7 @@ SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator ...@@ -6361,7 +6365,7 @@ SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset);
initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN); setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MAIN_SCAN);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "ProjectOperator"; pOperator->name = "ProjectOperator";
...@@ -6375,7 +6379,7 @@ SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator ...@@ -6375,7 +6379,7 @@ SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pOperator->exec = doProjectOperation; pOperator->exec = doProjectOperation;
pOperator->cleanup = destroyProjectOperatorInfo; pOperator->cleanup = destroyProjectOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, downstream);
return pOperator; return pOperator;
} }
...@@ -6413,7 +6417,7 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3 ...@@ -6413,7 +6417,7 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3
return 0; return 0;
} }
SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter) { int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter) {
SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo)); SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo));
...@@ -6433,12 +6437,12 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -6433,12 +6437,12 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->cleanup = destroyConditionOperatorInfo; pOperator->cleanup = destroyConditionOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, downstream);
return pOperator; return pOperator;
} }
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream) {
SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo));
pInfo->limit = pRuntimeEnv->pQueryAttr->limit.limit; pInfo->limit = pRuntimeEnv->pQueryAttr->limit.limit;
...@@ -6451,12 +6455,12 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn ...@@ -6451,12 +6455,12 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn
pOperator->exec = doLimit; pOperator->exec = doLimit;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, downstream);
return pOperator; return pOperator;
} }
SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
...@@ -6476,12 +6480,12 @@ SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOpe ...@@ -6476,12 +6480,12 @@ SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOpe
pOperator->exec = doIntervalAgg; pOperator->exec = doIntervalAgg;
pOperator->cleanup = destroyBasicOperatorInfo; pOperator->cleanup = destroyBasicOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, downstream);
return pOperator; return pOperator;
} }
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
...@@ -6501,11 +6505,11 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S ...@@ -6501,11 +6505,11 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
pOperator->exec = doAllIntervalAgg; pOperator->exec = doAllIntervalAgg;
pOperator->cleanup = destroyBasicOperatorInfo; pOperator->cleanup = destroyBasicOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, downstream);
return pOperator; return pOperator;
} }
SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo)); SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo));
pInfo->colIndex = -1; pInfo->colIndex = -1;
pInfo->reptScan = false; pInfo->reptScan = false;
...@@ -6525,10 +6529,10 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper ...@@ -6525,10 +6529,10 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
pOperator->exec = doStateWindowAgg; pOperator->exec = doStateWindowAgg;
pOperator->cleanup = destroyStateWindowOperatorInfo; pOperator->cleanup = destroyStateWindowOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, downstream);
return pOperator; return pOperator;
} }
SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo)); SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo));
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
...@@ -6550,11 +6554,11 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator ...@@ -6550,11 +6554,11 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pOperator->exec = doSessionWindowAgg; pOperator->exec = doSessionWindowAgg;
pOperator->cleanup = destroySWindowOperatorInfo; pOperator->cleanup = destroySWindowOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, downstream);
return pOperator; return pOperator;
} }
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
...@@ -6574,11 +6578,11 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim ...@@ -6574,11 +6578,11 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim
pOperator->exec = doSTableIntervalAgg; pOperator->exec = doSTableIntervalAgg;
pOperator->cleanup = destroyBasicOperatorInfo; pOperator->cleanup = destroyBasicOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, downstream);
return pOperator; return pOperator;
} }
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
...@@ -6598,13 +6602,13 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun ...@@ -6598,13 +6602,13 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
pOperator->exec = doAllSTableIntervalAgg; pOperator->exec = doAllSTableIntervalAgg;
pOperator->cleanup = destroyBasicOperatorInfo; pOperator->cleanup = destroyBasicOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, downstream);
return pOperator; return pOperator;
} }
SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo)); SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo));
pInfo->colIndex = -1; // group by column index pInfo->colIndex = -1; // group by column index
...@@ -6631,11 +6635,11 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator ...@@ -6631,11 +6635,11 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pOperator->exec = hashGroupbyAggregate; pOperator->exec = hashGroupbyAggregate;
pOperator->cleanup = destroyGroupbyOperatorInfo; pOperator->cleanup = destroyGroupbyOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, downstream);
return pOperator; return pOperator;
} }
SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) { SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) {
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
pInfo->multigroupResult = multigroupResult; pInfo->multigroupResult = multigroupResult;
...@@ -6670,11 +6674,11 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf ...@@ -6670,11 +6674,11 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf
pOperator->exec = doFill; pOperator->exec = doFill;
pOperator->cleanup = destroySFillOperatorInfo; pOperator->cleanup = destroySFillOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, downstream);
return pOperator; return pOperator;
} }
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) { SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) {
SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo));
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
...@@ -6718,7 +6722,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -6718,7 +6722,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->cleanup = destroySlimitOperatorInfo; pOperator->cleanup = destroySlimitOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, downstream);
return pOperator; return pOperator;
} }
...@@ -6845,7 +6849,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { ...@@ -6845,7 +6849,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
} }
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setTaskStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
} }
pRes->info.rows = count; pRes->info.rows = count;
...@@ -6992,7 +6996,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { ...@@ -6992,7 +6996,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
} }
SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo)); SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo));
pInfo->totalBytes = 0; pInfo->totalBytes = 0;
pInfo->buf = NULL; pInfo->buf = NULL;
...@@ -7016,7 +7020,7 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato ...@@ -7016,7 +7020,7 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->cleanup = destroyDistinctOperatorInfo; pOperator->cleanup = destroyDistinctOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, downstream);
return pOperator; return pOperator;
} }
...@@ -7170,6 +7174,92 @@ static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t ...@@ -7170,6 +7174,92 @@ static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
/**
* {
"Id": {
"QueryId": 20,
"TemplateId": 0,
"SubplanId": 0
},
"Node": {
"Name": "TableScan",
"InputSchema": [{
"Type": 9,
"ColId": 1,
"Bytes": 8
}, {
"Type": 4,
"ColId": 2,
"Bytes": 4
}, {
"Type": 8,
"ColId": 3,
"Bytes": 20
}],
"TableScan": {
"TableId": 1,
"TableType": 3,
"Flag": 0,
"Window": {
"StartKey": 0,
"EndKey": 0
}
}
},
"DataSink": {
"Name": "Dispatch",
"Dispatch": {
}
}
}
*/
int32_t parseTaskInfo(const char* msg, int32_t len) {
cJSON* pJson = cJSON_Parse(msg);
if (NULL == pJson) {
return TSDB_CODE_INVALID_MSG;
}
cJSON* pSub = cJSON_GetObjectItem(pJson, "ID");
if (NULL != pSub) {
printf("Id : %s\n", pSub->valuestring);
}
cJSON* pNode = cJSON_GetObjectItem(pJson, "Node");
if (pNode == NULL) {
return TSDB_CODE_INVALID_MSG;
}
cJSON* pNodeName = cJSON_GetObjectItem(pNode, "name");
if (pNodeName == NULL) {
return TSDB_CODE_INVALID_MSG;
}
printf("node name is: %s\n", pNodeName->valuestring);
cJSON* pNodeSchema = cJSON_GetObjectItem(pNode, "InputSchema");
if (pNodeSchema == NULL) {
return TSDB_CODE_INVALID_MSG;
}
cJSON* pOperator = cJSON_GetObjectItem(pNode, pNodeName->valuestring);
if (pOperator == NULL) {
return TSDB_CODE_INVALID_MSG;
}
cJSON* pTableId = cJSON_GetObjectItem(pOperator, "tableId");
if (pTableId == NULL) {
return TSDB_CODE_INVALID_MSG;
}
cJSON* pTimeWindow = cJSON_GetObjectItem(pOperator, "window");
if (pTimeWindow == NULL) {
return TSDB_CODE_INVALID_MSG;
}
}
/** /**
* pQueryMsg->head has been converted before this function is called. * pQueryMsg->head has been converted before this function is called.
* *
...@@ -8242,7 +8332,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* ...@@ -8242,7 +8332,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo*
(!QUERY_IS_ASC_QUERY(pQueryAttr) && (pQueryAttr->window.ekey > pQueryAttr->window.skey))) { (!QUERY_IS_ASC_QUERY(pQueryAttr) && (pQueryAttr->window.ekey > pQueryAttr->window.skey))) {
//qDebug("QInfo:0x%"PRIx64" no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo->qId, pQueryAttr->window.skey, //qDebug("QInfo:0x%"PRIx64" no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo->qId, pQueryAttr->window.skey,
// pQueryAttr->window.ekey, pQueryAttr->order.order); // pQueryAttr->window.ekey, pQueryAttr->order.order);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); // setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0; pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0;
// todo free memory // todo free memory
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -8250,7 +8340,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* ...@@ -8250,7 +8340,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo*
if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) { if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) {
//qDebug("QInfo:0x%"PRIx64" no table qualified for tag filter, abort query", pQInfo->qId); //qDebug("QInfo:0x%"PRIx64" no table qualified for tag filter, abort query", pQInfo->qId);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); // setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -8416,7 +8506,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t ...@@ -8416,7 +8506,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t
// all data returned, set query over // all data returned, set query over
if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)) {
setQueryStatus(pRuntimeEnv, QUERY_OVER); // setTaskStatus(pOperator->pTaskInfo, QUERY_OVER);
} }
} else { } else {
doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data, compressed, compLen); doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data, compressed, compLen);
...@@ -8427,7 +8517,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t ...@@ -8427,7 +8517,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t
if (pQueryAttr->limit.limit > 0 && pQueryAttr->limit.limit == pRuntimeEnv->resultInfo.total) { if (pQueryAttr->limit.limit > 0 && pQueryAttr->limit.limit == pRuntimeEnv->resultInfo.total) {
//qDebug("QInfo:0x%"PRIx64" results limitation reached, limitation:%"PRId64, pQInfo->qId, pQueryAttr->limit.limit); //qDebug("QInfo:0x%"PRIx64" results limitation reached, limitation:%"PRId64, pQInfo->qId, pQueryAttr->limit.limit);
setQueryStatus(pRuntimeEnv, QUERY_OVER); // setTaskStatus(pOperator->pTaskInfo, QUERY_OVER);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -149,7 +149,7 @@ static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) { ...@@ -149,7 +149,7 @@ static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
static uint8_t getScanFlag(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { static uint8_t getScanFlag(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
// todo // todo
return MASTER_SCAN; return MAIN_SCAN;
} }
static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) { static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册