提交 33e51fda 编写于 作者: H Haojun Liao

ehn(query): refactor the executor module.

上级 1704147a
......@@ -461,6 +461,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
if (res == NULL) {
return 0;
if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res;
......@@ -48,8 +48,6 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
#define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL ? 0 : ((_r)->outputBuf)->info.rows)
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
enum {
......@@ -84,21 +82,6 @@ typedef struct SResultInfo { // TODO refactor
int32_t threshold; // result size threshold in rows.
} SResultInfo;
typedef struct SColumnFilterElem {
int16_t bytes; // column length
__filter_func_t fp;
SColumnFilterInfo filterInfo;
void* q;
} SColumnFilterElem;
typedef struct SSingleColumnFilterInfo {
void* pData;
void* pData2; // used for nchar column
int32_t numOfFilters;
SColumnInfo info;
SColumnFilterElem* pFilters;
} SSingleColumnFilterInfo;
typedef struct STableQueryInfo {
TSKEY lastKey; // last check ts
uint64_t uid; // table uid
......@@ -177,58 +160,34 @@ typedef struct SOrder {
// The basic query information extracted from the SQueryInfo tree to support the
// execution of query in a data node.
typedef struct STaskAttr {
SLimit limit;
SLimit slimit;
// todo comment it
bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag
bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo; // if the time window start/end required interpolation
bool queryBlockDist; // if query data block distribution
bool stabledev; // super table stddev query
bool tsCompQuery; // is tscomp query
bool diffQuery; // is diff query
bool simpleAgg;
bool pointInterpQuery; // point interpolation query
bool needReverseScan; // need reverse scan
bool distinct; // distinct query or not
bool stateWindow; // window State on sub/normal table
bool createFilterOperator; // if filter operator is needed
bool multigroupResult; // multigroup result can exist in one SSDataBlock
int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number
SOrder order;
int16_t numOfCols;
int16_t numOfTags;
STimeWindow window;
SInterval interval;
int16_t precision;
int16_t numOfOutput;
int16_t fillType;
int32_t srcRowSize; // todo extract struct
int32_t resultRowSize;
int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query.
int32_t maxTableColumnWidth;
int32_t tagLen; // tag value length of current query
SExprInfo* pExpr1;
SColumnInfo* tableCols;
SColumnInfo* tagColList;
int32_t numOfFilterCols;
int64_t* fillVal;
SSingleColumnFilterInfo* pFilterInfo;
SLimit limit;
SLimit slimit;
bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag
bool groupbyColumn; // denote if this is a groupby normal column query
bool timeWindowInterpo; // if the time window start/end required interpolation
bool tsCompQuery; // is tscomp query
bool diffQuery; // is diff query
bool pointInterpQuery; // point interpolation query
int32_t havingNum; // having expr number
SOrder order;
int16_t numOfCols;
int16_t numOfTags;
STimeWindow window;
SInterval interval;
int16_t precision;
int16_t numOfOutput;
int16_t fillType;
int32_t resultRowSize;
int32_t tagLen; // tag value length of current query
SExprInfo *pExpr1;
SColumnInfo* tagColList;
int32_t numOfFilterCols;
int64_t* fillVal;
void* tsdb;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
int32_t vgId;
SArray* pUdfInfo; // no need to free
} STaskAttr;
struct SOperatorInfo;
......@@ -252,7 +211,6 @@ typedef struct STaskIdInfo {
typedef struct SExecTaskInfo {
STaskIdInfo id;
char* content;
uint32_t status;
STimeWindow window;
STaskCostInfo cost;
......@@ -262,7 +220,7 @@ typedef struct SExecTaskInfo {
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
struct SOperatorInfo* pRoot;
} SExecTaskInfo;
......@@ -297,7 +255,7 @@ typedef struct STaskRuntimeEnv {
int64_t currentOffset; // dynamic offset value
STableQueryInfo* current;
SResultInfo resultInfo;
SResultInfo resultInfo;
SHashObj* pTableRetrieveTsMap;
struct SUdfInfo* pUdfInfo;
} STaskRuntimeEnv;
......@@ -339,25 +297,6 @@ typedef struct {
SColumnInfo* colList;
} SQueriedTableInfo;
typedef struct SQInfo {
void* signature;
uint64_t qId;
int32_t code; // error code to returned to client
int64_t owner; // if it is in execution
STaskRuntimeEnv runtimeEnv;
STaskAttr query;
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
TdThreadMutex lock; // used to synchronize the rsp/query threads
tsem_t ready;
int32_t dataReady; // denote if query result is ready or not
void* rspContext; // response context
int64_t startExecTs; // start to exec timestamp
char* sql; // query sql string
STaskCostInfo summary;
} SQInfo;
typedef enum {
......@@ -523,24 +462,6 @@ typedef struct SProjectOperatorInfo {
int64_t curOutput;
} SProjectOperatorInfo;
typedef struct SSLimitOperatorInfo {
int64_t groupTotal;
int64_t currentGroupOffset;
int64_t rowsTotal;
int64_t currentOffset;
SLimit limit;
SLimit slimit;
char** prevRow;
SArray* orderColumnList;
bool hasPrev;
bool ignoreCurrentGroup;
bool multigroupResult;
SSDataBlock* pRes; // result buffer
SSDataBlock* pPrevBlock;
int64_t capacity;
int64_t threshold;
} SSLimitOperatorInfo;
typedef struct SFillOperatorInfo {
struct SFillInfo* pFillInfo;
SSDataBlock* pRes;
......@@ -638,18 +559,13 @@ typedef struct SSortedMergeOperatorInfo {
SArray* pSortInfo;
int32_t numOfSources;
SSortHandle *pSortHandle;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
int32_t resultRowFactor;
bool hasGroupVal;
SDiskbasedBuf *pTupleStore; // keep the final results
int32_t numOfResPerPage;
char** groupVal;
SArray *groupInfo;
SAggSupporter aggSup;
......@@ -748,7 +664,6 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo);
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType);
void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code);
void calculateOperatorProfResults(SQInfo* pQInfo);
void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo);
......@@ -2257,57 +2257,6 @@ static int32_t doTSJoinFilter(STaskRuntimeEnv* pRuntimeEnv, TSKEY key, bool ascQ
bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p) {
bool all = true;
for (int32_t i = 0; i < numOfRows; ++i) {
bool qualified = false;
for (int32_t k = 0; k < numOfFilterCols; ++k) {
char* pElem = (char*)pFilterInfo[k].pData + pFilterInfo[k].info.bytes * i;
qualified = false;
for (int32_t j = 0; j < pFilterInfo[k].numOfFilters; ++j) {
SColumnFilterElem* pFilterElem = NULL;
// SColumnFilterElem* pFilterElem = &pFilterInfo[k].pFilters[j];
bool isnull = isNull(pElem, pFilterInfo[k].info.type);
if (isnull) {
// if (pFilterElem->fp == isNullOperator) {
// qualified = true;
// break;
// } else {
// continue;
// }
} else {
// if (pFilterElem->fp == notNullOperator) {
// qualified = true;
// break;
// } else if (pFilterElem->fp == isNullOperator) {
// continue;
// }
if (pFilterElem->fp(pFilterElem, pElem, pElem, pFilterInfo[k].info.type)) {
qualified = true;
if (!qualified) {
p[i] = qualified ? 1 : 0;
if (!qualified) {
all = false;
return all;
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) {
int32_t len = 0;
int32_t start = 0;
......@@ -2357,49 +2306,6 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) {
void filterRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols,
SSDataBlock* pBlock, bool ascQuery) {
int32_t numOfRows = pBlock->info.rows;
int8_t* p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
bool all = true;
#if 0
if (pRuntimeEnv->pTsBuf != NULL) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
TSKEY* k = (TSKEY*) pColInfoData->pData;
for (int32_t i = 0; i < numOfRows; ++i) {
int32_t offset = ascQuery? i:(numOfRows - i - 1);
int32_t ret = doTSJoinFilter(pRuntimeEnv, k[offset], ascQuery);
if (ret == TS_JOIN_TAG_NOT_EQUALS) {
} else if (ret == TS_JOIN_TS_NOT_EQUALS) {
all = false;
} else {
assert(ret == TS_JOIN_TS_EQUAL);
p[offset] = true;
if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) {
// save the cursor status
pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
} else {
all = doFilterDataBlock(pFilterInfo, numOfFilterCols, numOfRows, p);
if (!all) {
doCompactSDataBlock(pBlock, numOfRows, p);
void filterColRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock, bool ascQuery) {
int32_t numOfRows = pBlock->info.rows;
......@@ -3509,22 +3415,22 @@ static void doOperatorExecProfOnce(SOperatorStackItem* item, SQueryProfEvent* ev
void calculateOperatorProfResults(SQInfo* pQInfo) {
if (pQInfo->summary.queryProfEvents == NULL) {
// qDebug("QInfo:0x%"PRIx64" query prof events array is null", pQInfo->qId);
if (pQInfo->summary.operatorProfResults == NULL) {
// qDebug("QInfo:0x%"PRIx64" operator prof results hash is null", pQInfo->qId);
void calculateOperatorProfResults(void) {
// if (pQInfo->summary.queryProfEvents == NULL) {
// // qDebug("QInfo:0x%"PRIx64" query prof events array is null", pQInfo->qId);
// return;
// }
// if (pQInfo->summary.operatorProfResults == NULL) {
// // qDebug("QInfo:0x%"PRIx64" operator prof results hash is null", pQInfo->qId);
// return;
// }
SArray* opStack = taosArrayInit(32, sizeof(SOperatorStackItem));
if (opStack == NULL) {
#if 0
size_t size = taosArrayGetSize(pQInfo->summary.queryProfEvents);
SHashObj* profResults = pQInfo->summary.operatorProfResults;
......@@ -3547,7 +3453,7 @@ void calculateOperatorProfResults(SQInfo* pQInfo) {
......@@ -4507,13 +4413,6 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
SSLimitOperatorInfo* pInfo = (SSLimitOperatorInfo*)param;
pInfo->pRes = blockDataDestroy(pInfo->pRes);
static void assignExprInfo(SExprInfo* dst, const SExprInfo* src) {
assert(dst != NULL && src != NULL);
......@@ -258,16 +258,14 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
return pOperator;
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) {
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) {
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
pInfo->dataReader = pTsdbReadHandle;
pInfo->times = 1;
pInfo->reverseTimes = 0;
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
pInfo->current = 0;
pInfo->prevGroupId = -1;
pRuntimeEnv->enableGroupData = true;
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableSeqScanOperator";
......@@ -275,8 +273,6 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->getNextFn = doTableScanImpl;
return pOperator;
......@@ -683,71 +679,70 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
return NULL;
int64_t startTs = taosGetTimestampUs();
while (1) {
int64_t startTs = taosGetTimestampUs();
pInfo->req.type = pInfo->type;
strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
if (pInfo->showRewrite) {
char dbName[TSDB_DB_NAME_LEN] = {0};
getDBNameFromCondition(pInfo->pCondition, dbName);
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
pInfo->req.type = pInfo->type;
strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
char* buf1 = taosMemoryCalloc(1, contLen);
tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);
if (pInfo->showRewrite) {
char dbName[TSDB_DB_NAME_LEN] = {0};
getDBNameFromCondition(pInfo->pCondition, dbName);
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
return NULL;
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
char* buf1 = taosMemoryCalloc(1, contLen);
tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
return NULL;
pMsgSendInfo->param = pOperator;
pMsgSendInfo->msgInfo.pData = buf1;
pMsgSendInfo->msgInfo.len = contLen;
pMsgSendInfo->fp = loadSysTableContentCb;
pMsgSendInfo->param = pOperator;
pMsgSendInfo->msgInfo.pData = buf1;
pMsgSendInfo->msgInfo.len = contLen;
pMsgSendInfo->fp = loadSysTableContentCb;
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo);
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo);
if (pTaskInfo->code) {
return NULL;
if (pTaskInfo->code) {
qDebug("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo),
pInfo->loadInfo.totalRows, tstrerror(pTaskInfo->code));
return NULL;
SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
pInfo->req.showId = pRsp->handle;
SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
pInfo->req.showId = pRsp->handle;
if (pRsp->numOfRows == 0 || pRsp->completed) {
pOperator->status = OP_EXEC_DONE;
if (pRsp->numOfRows == 0 || pRsp->completed) {
pOperator->status = OP_EXEC_DONE;
qDebug("%s load meta data from mnode completed, rowsOfSource:%d, totalRows:%" PRIu64 " ", GET_TASKID(pTaskInfo),
pRsp->numOfRows, pInfo->loadInfo.totalRows);
if (pRsp->numOfRows == 0) {
// qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64"
// try next",
// GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
// pDataInfo->totalRows, pExchangeInfo->totalRows);
return NULL;
if (pRsp->numOfRows == 0) {
return NULL;
SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen,
pOperator->numOfOutput, startTs, NULL, pInfo->scanCols);
SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data,
pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL, pInfo->scanCols);
if (pInfo->pRes->info.rows == 0) {
goto _retry;
// todo log the filter info
if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes;
return pInfo->pRes;
return NULL;
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
......@@ -1100,6 +1100,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SSubmitRsp *rsp = (SSubmitRsp *)msg;
SSubmitRsp *rsp = (SSubmitRsp *)msg;
......@@ -1298,7 +1299,6 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
if (pJob) {
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册