提交 0354ac2f 编写于 作者: H Haojun Liao

[td-13039] refactor.

上级 ae6224d9
......@@ -253,11 +253,6 @@ typedef struct STaskIdInfo {
char* str;
} STaskIdInfo;
typedef struct STaskBufInfo {
int32_t bufSize; // total available buffer size in bytes
int32_t remainBuf; // remain buffer size
} STaskBufInfo;
typedef struct SExecTaskInfo {
STaskIdInfo id;
char* content;
......@@ -269,8 +264,7 @@ typedef struct SExecTaskInfo {
uint64_t totalRows; // total number of rows
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
char* sql; // query sql string
jmp_buf env; // when error occurs, abort
STaskBufInfo bufInfo; // available buffer info this task
jmp_buf env; //
struct SOperatorInfo* pRoot;
} SExecTaskInfo;
......@@ -313,11 +307,9 @@ typedef struct STaskRuntimeEnv {
} STaskRuntimeEnv;
enum {
OP_NOT_OPENED = 0x0,
OP_OPENED = 0x1,
OP_IN_EXECUTING = 0x3,
OP_RES_TO_RETURN = 0x5,
OP_EXEC_DONE = 0x9,
OP_IN_EXECUTING = 1,
OP_RES_TO_RETURN = 2,
OP_EXEC_DONE = 3,
};
typedef struct SOperatorInfo {
......@@ -330,14 +322,12 @@ typedef struct SOperatorInfo {
SExprInfo* pExpr;
STaskRuntimeEnv* pRuntimeEnv; // todo remove it
SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost;
struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
__optr_fn_t getNextFn;
__optr_fn_t cleanupFn;
__optr_open_fn_t openFn;
__optr_fn_t nextDataFn;
__optr_close_fn_t closeFn;
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
} SOperatorInfo;
typedef struct {
......@@ -366,9 +356,9 @@ typedef struct SQInfo {
} SQInfo;
enum {
EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2,
EX_SOURCE_DATA_EXHAUSTED = 0x3,
DATA_NOT_READY = 0x1,
DATA_READY = 0x2,
DATA_EXHAUSTED = 0x3,
};
typedef struct SSourceDataInfo {
......@@ -385,6 +375,12 @@ typedef struct SLoadRemoteDataInfo {
uint64_t totalElapsed; // total elapsed time
} SLoadRemoteDataInfo;
enum {
EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2,
EX_SOURCE_DATA_EXHAUSTED = 0x3,
};
typedef struct SExchangeInfo {
SArray* pSources;
SArray* pSourceDataInfo;
......@@ -439,13 +435,14 @@ typedef struct SSysTableScanInfo {
};
SRetrieveMetaTableRsp *pRsp;
void *pCur; // cursor
SRetrieveTableReq req;
SEpSet epSet;
int32_t type; // show type
SName name;
tsem_t ready;
SSchema* pSchema;
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
void *pCur; // cursor for iterate the local table meta store.
int32_t type; // show type, TODO remove it
SName name;
SSDataBlock* pRes;
int32_t capacity;
int64_t numOfBlocks; // extract basic running information.
......@@ -630,14 +627,12 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SEpSet epset,
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SNode* pCondition, SEpSet epset,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
......@@ -682,6 +677,7 @@ void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFil
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo,
int32_t* rowCellInfoOffset);
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity, int32_t numOfInputRows);
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
......
......@@ -158,7 +158,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int64_t st = 0;
st = taosGetTimestampUs();
*pRes = pTaskInfo->pRoot->getNextFn(pTaskInfo->pRoot, &newgroup);
*pRes = pTaskInfo->pRoot->nextDataFn(pTaskInfo->pRoot, &newgroup);
uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el;
......
......@@ -13,8 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <filter.h>
#include <function.h>
#include <functionMgt.h>
#include <querynodes.h>
#include <tname.h>
#include "os.h"
......@@ -211,9 +213,6 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput);
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOperatorInfo(SOperatorInfo* pOperator);
static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput);
......@@ -224,17 +223,6 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) {
}
}
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
static int32_t operatorDummyOpenFn(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS;
}
static void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity);
static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
......@@ -4737,11 +4725,6 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
STableScanInfo *pTableScanInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
// The read handle is not initialized yet, since no qualified tables exists
if (pTableScanInfo->pTsdbReadHandle == NULL) {
return NULL;
......@@ -4859,11 +4842,6 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamBlockScanInfo* pInfo = pOperator->info;
pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
pBlockInfo->rows = 0;
......@@ -4977,20 +4955,15 @@ static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInf
static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen,
int32_t numOfOutput, int64_t startTs, uint64_t* total) {
// char* pData = pRsp->data;
blockDataEnsureCapacity(pRes, numOfRows);
for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * numOfRows);
if (tmp == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
for(int32_t j = 0; j < numOfRows; ++j) {
colDataAppend(pColInfoData, j, pData, false);
pData += pColInfoData->info.bytes;
}
size_t len = numOfRows * pColInfoData->info.bytes;
memcpy(tmp, pData, len);
pColInfoData->pData = tmp;
pData += len;
}
pRes->info.rows = numOfRows;
......@@ -5034,12 +5007,12 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
for (int32_t i = 0; i < totalSources; ++i) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
if (pDataInfo->status == DATA_EXHAUSTED) {
completed += 1;
continue;
}
if (pDataInfo->status != EX_SOURCE_DATA_READY) {
if (pDataInfo->status != DATA_READY) {
continue;
}
......@@ -5052,7 +5025,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
pDataInfo->status = DATA_EXHAUSTED;
completed += 1;
continue;
}
......@@ -5070,15 +5043,15 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows,
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, i + 1,
totalSources);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
pDataInfo->status = DATA_EXHAUSTED;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows,
pLoadInfo->totalSize);
}
if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
if (pDataInfo->status != DATA_EXHAUSTED) {
pDataInfo->status = DATA_NOT_READY;
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
......@@ -5098,9 +5071,13 @@ _error:
return NULL;
}
static int32_t prepareConcurrentlyLoad(SOperatorInfo *pOperator) {
static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo *pOperator) {
SExchangeInfo *pExchangeInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_RES_TO_RETURN) {
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
}
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
int64_t startTs = taosGetTimestampUs();
......@@ -5109,8 +5086,7 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo *pOperator) {
for(int32_t i = 0; i < totalSources; ++i) {
int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return code;
return NULL;
}
}
......@@ -5118,9 +5094,9 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo *pOperator) {
qDebug("%s send all fetch request to %"PRIzu" sources completed, elapsed:%"PRId64, GET_TASKID(pTaskInfo), totalSources, endTs - startTs);
tsem_wait(&pExchangeInfo->ready);
pOperator->cost.openCost = taosGetTimestampUs() - startTs;
return TSDB_CODE_SUCCESS;
pOperator->status = OP_RES_TO_RETURN;
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
}
static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
......@@ -5150,7 +5126,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
pDataInfo->totalRows, pLoadInfo->totalRows);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
pDataInfo->status = DATA_EXHAUSTED;
pExchangeInfo->current += 1;
continue;
}
......@@ -5167,7 +5143,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
totalSources);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
pDataInfo->status = DATA_EXHAUSTED;
pExchangeInfo->current += 1;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
......@@ -5178,26 +5154,6 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
}
}
static int32_t prepareLoadRemoteData(void* param) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
}
SExchangeInfo *pExchangeInfo = pOperator->info;
if (pExchangeInfo->seqLoadData) {
// do nothing for sequentially load data
} else {
int32_t code = prepareConcurrentlyLoad(pOperator);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
......@@ -5206,6 +5162,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pOperator->status == OP_EXEC_DONE) {
qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
pLoadInfo->totalRows, pLoadInfo->totalSize, pLoadInfo->totalElapsed/1000.0);
......@@ -5213,10 +5170,11 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
}
*newgroup = false;
if (pExchangeInfo->seqLoadData) {
return seqLoadRemoteData(pOperator);
} else {
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
return concurrentlyLoadRemoteData(pOperator);
}
#if 0
......@@ -5229,35 +5187,16 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
#endif
}
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for(int32_t i = 0; i < numOfSources; ++i) {
SSourceDataInfo dataInfo = {0};
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
dataInfo.pEx = pInfo;
dataInfo.index = i;
void* ret = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
if (ret == NULL) {
taosArrayDestroy(pInfo->pSourceDataInfo);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
// TODO handle the error
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
size_t numOfSources = LIST_LENGTH(pSources);
......@@ -5284,28 +5223,29 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
return NULL;
}
int32_t code = initDataSource(numOfSources, pInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
for(int32_t i = 0; i < numOfSources; ++i) {
SSourceDataInfo dataInfo = {0};
dataInfo.status = DATA_NOT_READY;
dataInfo.pEx = pInfo;
dataInfo.index = i;
taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
}
size_t size = pBlock->info.numOfCols;
pInfo->pResult = pBlock;
pInfo->seqLoadData = true;
pInfo->seqLoadData = true; // sequentially load data from the source node
tsem_init(&pInfo->ready, 0, 0);
pOperator->name = "ExchangeOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = size;
pOperator->nextDataFn = doLoadRemoteData;
pOperator->pTaskInfo = pTaskInfo;
pOperator->_openFn = prepareLoadRemoteData; // assign a dummy function.
pOperator->getNextFn = doLoadRemoteData;
pOperator->closeFn = destroyExchangeOperatorInfo;
#if 1
{ // todo refactor
......@@ -5331,16 +5271,6 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
#endif
return pOperator;
_error:
if (pInfo != NULL) {
destroyExchangeOperatorInfo(pInfo, 0);
}
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
SSDataBlock* createResultDataBlock(const SArray* pExprInfo) {
......@@ -5391,12 +5321,10 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
pOperator->name = "TableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doTableScan;
pOperator->closeFn = operatorDummyCloseFn;
pOperator->nextDataFn = doTableScan;
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
......@@ -5417,11 +5345,11 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
pOperator->name = "TableSeqScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->getNextFn = doTableScanImpl;
pOperator->nextDataFn = doTableScanImpl;
return pOperator;
}
......@@ -5442,10 +5370,10 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt
pOperator->name = "TableBlockInfoScanOperator";
// pOperator->operatorType = OP_TableBlockInfoScan;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
// pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->getNextFn = doBlockInfoScan;
pOperator->nextDataFn = doBlockInfoScan;
return pOperator;
}
......@@ -5475,13 +5403,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock*
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = pResBlock->info.numOfCols;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doStreamBlockScan;
pOperator->closeFn = operatorDummyCloseFn;
pOperator->nextDataFn = doStreamBlockScan;
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
}
......@@ -5571,6 +5496,41 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows,
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL);
// do filter the qualified results
{
SFilterInfo *filter = NULL;
code = filterInitFromNode(pInfo->pCondition, &filter, 0);
SFilterColumnParam param1 = {.numOfCols= pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock};
code = filterSetDataFromSlotId(filter, &param1);
int8_t *rowRes = NULL;
bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
printf("%d, %d\n", rowRes[0], rowRes[1]);
SSDataBlock* px = createOneDataBlock(pInfo->pRes);
blockDataEnsureCapacity(px, pInfo->pRes->info.rows);
int32_t numOfRow = 0;
for(int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i);
numOfRow = 0;
for(int32_t j = 0; j < pInfo->pRes->info.rows; ++j) {
if (rowRes[j] == 0) {
continue;
}
colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false);
numOfRow += 1;
}
}
px->info.rows = numOfRow;
pInfo->pRes = px;
}
return pInfo->pRes;
}
......@@ -5578,7 +5538,7 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
}
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SEpSet epset, SExecTaskInfo* pTaskInfo) {
SNode* pCondition, SEpSet epset, SExecTaskInfo* pTaskInfo) {
SSysTableScanInfo* pInfo = calloc(1, sizeof(SSysTableScanInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -5588,9 +5548,11 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
return NULL;
}
pInfo->pRes = pResBlock;
pInfo->capacity = 4096;
pInfo->pRes = pResBlock;
pInfo->capacity = 4096;
pInfo->pCondition = pCondition;
// TODO remove it
int32_t tableType = 0;
const char* name = tNameGetTableName(pName);
if (strncasecmp(name, TSDB_INS_TABLE_USER_DATABASES, tListLen(pName->tname)) == 0) {
......@@ -5640,7 +5602,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = pResBlock->info.numOfCols;
pOperator->getNextFn = doSysTableScan;
pOperator->nextDataFn = doSysTableScan;
pOperator->closeFn = destroySysTableScannerOperatorInfo;
pOperator->pTaskInfo = pTaskInfo;
......@@ -5833,7 +5795,7 @@ SSDataBlock* loadNextDataBlock(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
bool newgroup = false;
return pOperator->getNextFn(pOperator, &newgroup);
return pOperator->nextDataFn(pOperator, &newgroup);
}
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char **buf, int32_t rowIndex) {
......@@ -6147,13 +6109,13 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
pOperator->name = "SortedMerge";
// pOperator->operatorType = OP_SortedMerge;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doSortedMerge;
pOperator->nextDataFn = doSortedMerge;
pOperator->closeFn = destroySortedMergeOperatorInfo;
code = appendDownstream(pOperator, downstream, numOfDownstream);
......@@ -6245,11 +6207,11 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
pOperator->name = "Order";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doSort;
pOperator->nextDataFn = doSort;
pOperator->closeFn = destroyOrderOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
......@@ -6275,7 +6237,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -6325,7 +6287,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -6411,7 +6373,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
// The downstream exec may change the value of the newgroup, so use a local variable instead.
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -6469,7 +6431,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL;
while (1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -6520,7 +6482,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
while (1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock *pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
SSDataBlock *pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -6563,7 +6525,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -6623,7 +6585,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -6686,7 +6648,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -6741,7 +6703,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -6876,7 +6838,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -6938,7 +6900,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
......@@ -6991,7 +6953,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
......@@ -7076,7 +7038,7 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (*newgroup) {
......@@ -7233,13 +7195,13 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE
pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doAggregate;
pOperator->nextDataFn = doAggregate;
pOperator->closeFn = destroyAggOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
......@@ -7288,17 +7250,17 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
tfree(pInfo->prevData);
}
void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
}
void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
STagScanInfo* pInfo = (STagScanInfo*) param;
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
......@@ -7328,17 +7290,6 @@ static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput)
}
}
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
SExchangeInfo* pExInfo = (SExchangeInfo*) param;
taosArrayDestroy(pExInfo->pSources);
taosArrayDestroy(pExInfo->pSourceDataInfo);
if (pExInfo->pResult != NULL) {
blockDataDestroy(pExInfo->pResult);
}
tsem_destroy(&pExInfo->ready);
}
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
......@@ -7353,13 +7304,13 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
pOperator->name = "MultiTableAggregate";
// pOperator->operatorType = OP_MultiTableAggregate;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = numOfOutput;
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doMultiTableAggregate;
pOperator->nextDataFn = doMultiTableAggregate;
pOperator->closeFn = destroyAggOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
......@@ -7379,12 +7330,12 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExp
pOperator->name = "ProjectOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->getNextFn = doProjectOperation;
pOperator->nextDataFn = doProjectOperation;
pOperator->pTaskInfo = pTaskInfo;
pOperator->closeFn = destroyProjectOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
......@@ -7434,11 +7385,11 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn
pOperator->name = "LimitOperator";
// pOperator->operatorType = OP_Limit;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->getNextFn = doLimit;
pOperator->status = OP_IN_EXECUTING;
pOperator->nextDataFn = doLimit;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
int32_t code = appendDownstream(pOperator, &downstream, 1);
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
}
......@@ -7466,13 +7417,13 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pEx
pOperator->name = "TimeIntervalAggOperator";
// pOperator->operatorType = OP_TimeWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->info = pInfo;
pOperator->getNextFn = doIntervalAgg;
pOperator->nextDataFn = doIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo;
code = appendDownstream(pOperator, &downstream, 1);
......@@ -7491,12 +7442,12 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
pOperator->name = "AllTimeIntervalAggOperator";
// pOperator->operatorType = OP_AllTimeWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->getNextFn = doAllIntervalAgg;
pOperator->nextDataFn = doAllIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
......@@ -7515,12 +7466,12 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
pOperator->name = "StateWindowOperator";
// pOperator->operatorType = OP_StateWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->getNextFn = doStateWindowAgg;
pOperator->nextDataFn = doStateWindowAgg;
pOperator->closeFn = destroyStateWindowOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
......@@ -7540,12 +7491,12 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pOperator->name = "SessionWindowAggOperator";
// pOperator->operatorType = OP_SessionWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->getNextFn = doSessionWindowAgg;
pOperator->nextDataFn = doSessionWindowAgg;
pOperator->closeFn = destroySWindowOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
......@@ -7563,13 +7514,13 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim
pOperator->name = "MultiTableTimeIntervalOperator";
// pOperator->operatorType = OP_MultiTableTimeInterval;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->getNextFn = doSTableIntervalAgg;
pOperator->nextDataFn = doSTableIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
......@@ -7587,13 +7538,13 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
pOperator->name = "AllMultiTableTimeIntervalOperator";
// pOperator->operatorType = OP_AllMultiTableTimeInterval;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->getNextFn = doAllSTableIntervalAgg;
pOperator->nextDataFn = doAllSTableIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
......@@ -7619,13 +7570,13 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "GroupbyAggOperator";
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
// pOperator->operatorType = OP_Groupby;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->getNextFn = hashGroupbyAggregate;
pOperator->nextDataFn = hashGroupbyAggregate;
pOperator->closeFn = destroyGroupbyOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
......@@ -7658,13 +7609,13 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf
pOperator->name = "FillOperator";
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
// pOperator->operatorType = OP_Fill;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->getNextFn = doFill;
pOperator->nextDataFn = doFill;
pOperator->closeFn = destroySFillOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
......@@ -7709,12 +7660,13 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
pOperator->name = "SLimitOperator";
// pOperator->operatorType = OP_SLimit;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
// pOperator->exec = doSLimit;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->closeFn = destroySlimitOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
}
......@@ -7848,7 +7800,6 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
return (pRes->info.rows == 0)? NULL:pInfo->pRes;
#endif
return 0;
}
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
......@@ -7865,9 +7816,9 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo
pOperator->name = "SeqTableTagScan";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->getNextFn = doTagScan;
pOperator->nextDataFn = doTagScan;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv;
......@@ -7937,7 +7888,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -8003,13 +7954,13 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "DistinctOperator";
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->status = OP_IN_EXECUTING;
// pOperator->operatorType = OP_Distinct;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->getNextFn = hashDistinct;
pOperator->nextDataFn = hashDistinct;
pOperator->pExpr = pExpr;
pOperator->closeFn = destroyDistinctOperatorInfo;
......@@ -8174,9 +8125,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols);
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId);
if (NULL == pDataReader) {
return NULL;
}
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
pScanPhyNode->reverse, pTaskInfo);
......@@ -8202,7 +8151,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SSystemTableScanPhysiNode * pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc);
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(NULL, pResBlock, &pSysScanPhyNode->scan.tableName, pSysScanPhyNode->mgmtEpSet, pTaskInfo);
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(NULL, pResBlock, &pSysScanPhyNode->scan.tableName, pSysScanPhyNode->scan.node.pConditions, pSysScanPhyNode->mgmtEpSet, pTaskInfo);
return pOperator;
} else {
ASSERT(0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册