提交 3cd5a350 编写于 作者: L Liu Jicong

refactor(stream): internal refactor

上级 866e4c4b
......@@ -55,7 +55,8 @@ enum {
enum {
STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK,
STREAM_INPUT__DATA_SCAN,
STREAM_INPUT__TABLE_SCAN,
STREAM_INPUT__TQ_SCAN,
STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__TRIGGER,
STREAM_INPUT__CHECKPOINT,
......@@ -124,7 +125,8 @@ enum {
};
typedef struct {
int8_t fetchType;
int8_t fetchType;
STqOffsetVal offset;
union {
SSDataBlock data;
void* meta;
......
......@@ -231,7 +231,7 @@ SSDataBlock* createDataBlock();
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId);
SColumnInfoData* bdGetColumnInfoData(SSDataBlock* pBlock, int32_t index);
SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index);
void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress);
const char* blockDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);
......
......@@ -174,7 +174,13 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
*/
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
int32_t qStreamPrepareScan1(qTaskInfo_t tinfo, const STqOffsetVal* pOffset);
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
void* qExtractReaderFromStreamScanner(void* scanner);
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
......
......@@ -194,6 +194,7 @@ int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
void walCloseReader(SWalReader *pRead);
int32_t walReadVer(SWalReader *pRead, int64_t ver);
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead);
// only for tq usage
......
......@@ -1356,7 +1356,7 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId)
return col;
}
SColumnInfoData* bdGetColumnInfoData(SSDataBlock* pBlock, int32_t index) {
SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index) {
ASSERT(pBlock != NULL);
if (index >= taosArrayGetSize(pBlock->pDataBlock)) {
return NULL;
......
......@@ -546,7 +546,11 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
ASSERT(pTopic);
/*ASSERT(pTopic);*/
if (pTopic == NULL) {
mError("rebalance %s failed since topic %s was dropped, abort", pRebInfo->key, topic);
continue;
}
taosRLockLatch(&pTopic->lock);
rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key);
......
......@@ -173,6 +173,9 @@ int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqSeekVer(STqReader *pReader, int64_t ver);
int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret);
int32_t tqReaderSetDataMsg(STqReader *pReader, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReader *pReader);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
......
......@@ -129,6 +129,7 @@ typedef struct {
static STqMgmt tqMgmt = {0};
// tqRead
int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* offset);
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
// tqExec
......
......@@ -307,7 +307,22 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
if (!pHandle->fetchMeta && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
if (tqScanLog(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) {
ASSERT(0);
code = -1;
goto OVER;
}
if (dataRsp.blockNum == 0) {
// add to async task
}
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
goto OVER;
}
if (pHandle->fetchMeta && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
int64_t fetchVer = fetchOffsetNew.version + 1;
SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
......
......@@ -46,7 +46,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerI
return 0;
}
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t workerId) {
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
if (metaGetTableEntryByUid(&mr, uid) < 0) {
......@@ -59,6 +59,46 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i
return 0;
}
int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
qTaskInfo_t task = pExec->execCol.task[0];
if (qStreamPrepareScan1(task, pOffset) < 0) {
ASSERT(0);
}
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
if (pDataBlock != NULL) {
tqAddBlockDataToRsp(pDataBlock, pRsp);
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[0]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp);
}
pRsp->blockNum++;
continue;
}
void* meta = qStreamExtractMetaMsg(task);
if (meta != NULL) {
// tq add meta to rsp
}
if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) {
ASSERT(0);
}
ASSERT(pRsp->rspOffset.type != 0);
break;
}
return 0;
}
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) {
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
qTaskInfo_t task = pExec->execCol.task[workerId];
......@@ -67,7 +107,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
/*ASSERT(0);*/
/*}*/
if (qStreamPrepareScan(task, offset.uid, offset.ts) < 0) {
if (qStreamPrepareTsdbScan(task, offset.uid, offset.ts) < 0) {
ASSERT(0);
}
......@@ -93,7 +133,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
ASSERT(0);
}
tqAddTbNameToRsp(pTq, uid, pRsp, workerId);
tqAddTbNameToRsp(pTq, uid, pRsp);
#endif
}
pRsp->blockNum++;
......@@ -129,7 +169,7 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
tqAddBlockDataToRsp(pDataBlock, pRsp);
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp, workerId);
tqAddTbNameToRsp(pTq, uid, pRsp);
}
pRsp->blockNum++;
}
......@@ -146,7 +186,7 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
tqAddBlockDataToRsp(&block, pRsp);
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp, workerId);
tqAddTbNameToRsp(pTq, uid, pRsp);
}
tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
pRsp->blockNum++;
......@@ -164,7 +204,7 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
tqAddBlockDataToRsp(&block, pRsp);
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp, workerId);
tqAddTbNameToRsp(pTq, uid, pRsp);
}
tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
pRsp->blockNum++;
......
......@@ -15,11 +15,6 @@
#include "tq.h"
int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset) {
/*if ()*/
return 0;
}
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
int32_t code = 0;
taosThreadMutexLock(&pHandle->pWalReader->mutex);
......@@ -84,8 +79,10 @@ STqReader* tqOpenReader(SVnode* pVnode) {
return NULL;
}
// TODO open
/*pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);*/
pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);
if (pReader->pWalReader == NULL) {
return NULL;
}
pReader->pVnodeMeta = pVnode->pMeta;
pReader->pMsg = NULL;
......@@ -106,12 +103,19 @@ void tqCloseReader(STqReader* pReader) {
taosMemoryFree(pReader);
}
int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
//
return walReadSeekVer(pReader->pWalReader, ver);
}
int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
bool fromProcessedMsg = pReader->pMsg != NULL;
while (1) {
if (!fromProcessedMsg) {
if (walNextValidMsg(pReader->pWalReader) < 0) {
ret->offset.type = TMQ_OFFSET__LOG;
ret->offset.version = pReader->ver;
ret->fetchType = FETCH_TYPE__NONE;
return -1;
}
......@@ -130,19 +134,23 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
memset(&ret->data, 0, sizeof(SSDataBlock));
int32_t code = tqRetrieveDataBlock(&ret->data, pReader);
if (code != 0 || ret->data.info.rows == 0) {
ASSERT(0);
#if 0
if (fromProcessedMsg) {
ret->fetchType = FETCH_TYPE__NONE;
return 0;
} else {
break;
}
#endif
}
ret->fetchType = FETCH_TYPE__DATA;
return 0;
}
if (fromProcessedMsg) {
ret->offset.type = TMQ_OFFSET__LOG;
ret->offset.version = pReader->ver;
ret->fetchType = FETCH_TYPE__NONE;
return 0;
}
......
......@@ -40,8 +40,8 @@ extern "C" {
#include "tpagedbuf.h"
#include "tstreamUpdate.h"
#include "vnode.h"
#include "executorInt.h"
#include "vnode.h"
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
......@@ -51,13 +51,12 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
#define START_TS_COLUMN_INDEX 0
#define END_TS_COLUMN_INDEX 1
#define UID_COLUMN_INDEX 2
#define GROUPID_COLUMN_INDEX UID_COLUMN_INDEX
#define START_TS_COLUMN_INDEX 0
#define END_TS_COLUMN_INDEX 1
#define UID_COLUMN_INDEX 2
#define GROUPID_COLUMN_INDEX UID_COLUMN_INDEX
#define DELETE_GROUPID_COLUMN_INDEX 2
enum {
// when this task starts to execute, this status will set
TASK_NOT_COMPLETED = 0x1u,
......@@ -81,8 +80,8 @@ typedef struct SResultInfo { // TODO refactor
} SResultInfo;
typedef struct STableQueryInfo {
TSKEY lastKey; // last check ts, todo remove it later
SResultRowPosition pos; // current active time window
TSKEY lastKey; // last check ts, todo remove it later
SResultRowPosition pos; // current active time window
} STableQueryInfo;
typedef struct SLimit {
......@@ -105,7 +104,7 @@ typedef struct STaskCostInfo {
uint64_t loadDataTime;
SFileBlockLoadRecorder* pRecoder;
uint64_t elapsedTime;
uint64_t elapsedTime;
uint64_t firstStageMergeTime;
uint64_t winInfoSize;
......@@ -118,8 +117,8 @@ typedef struct STaskCostInfo {
} STaskCostInfo;
typedef struct SOperatorCostInfo {
double openCost;
double totalCost;
double openCost;
double totalCost;
} SOperatorCostInfo;
struct SOperatorInfo;
......@@ -139,70 +138,81 @@ typedef struct STaskIdInfo {
char* str;
} STaskIdInfo;
typedef struct {
STqOffsetVal prepareStatus; // for tmq
STqOffsetVal lastStatus; // for tmq
void* metaBlk; // for tmq fetching meta
SSDataBlock* pullOverBlk; // for streaming
SWalFilterCond cond;
} SStreamTaskInfo;
typedef struct SExecTaskInfo {
STaskIdInfo id;
uint32_t status;
STimeWindow window;
STaskCostInfo cost;
int64_t owner; // if it is in execution
int32_t code;
STaskIdInfo id;
uint32_t status;
STimeWindow window;
STaskCostInfo cost;
int64_t owner; // if it is in execution
int32_t code;
SStreamTaskInfo streamInfo;
struct {
char *tablename;
char *dbname;
int32_t tversion;
SSchemaWrapper*sw;
char* tablename;
char* dbname;
int32_t tversion;
SSchemaWrapper* sw;
} schemaVer;
STableListInfo tableqinfoList; // this is a table list
const char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
STableListInfo tableqinfoList; // this is a table list
const char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
struct SOperatorInfo* pRoot;
} SExecTaskInfo;
enum {
OP_NOT_OPENED = 0x0,
OP_OPENED = 0x1,
OP_NOT_OPENED = 0x0,
OP_OPENED = 0x1,
OP_RES_TO_RETURN = 0x5,
OP_EXEC_DONE = 0x9,
OP_EXEC_DONE = 0x9,
};
typedef struct SOperatorFpSet {
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
__optr_fn_t getNextFn;
__optr_fn_t getStreamResFn; // execute the aggregate in the stream model, todo remove it
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
__optr_close_fn_t closeFn;
__optr_encode_fn_t encodeResultRow;
__optr_decode_fn_t decodeResultRow;
__optr_explain_fn_t getExplainFn;
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
__optr_fn_t getNextFn;
__optr_fn_t getStreamResFn; // execute the aggregate in the stream model, todo remove it
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
__optr_close_fn_t closeFn;
__optr_encode_fn_t encodeResultRow;
__optr_decode_fn_t decodeResultRow;
__optr_explain_fn_t getExplainFn;
} SOperatorFpSet;
typedef struct SExprSupp {
SExprInfo* pExprInfo;
int32_t numOfExprs; // the number of scalar expression in group operator
int32_t numOfExprs; // the number of scalar expression in group operator
SqlFunctionCtx* pCtx;
int32_t* rowEntryInfoOffset; // offset value for each row result cell info
} SExprSupp;
typedef struct SOperatorInfo {
uint8_t operatorType;
bool blocking; // block operator or not
uint8_t status; // denote if current operator is completed
char* name; // name, for debug purpose
void* info; // extension attribution
SExprSupp exprSupp;
SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost;
SResultInfo resultInfo;
struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
SOperatorFpSet fpSet;
uint8_t operatorType;
bool blocking; // block operator or not
uint8_t status; // denote if current operator is completed
char* name; // name, for debug purpose
void* info; // extension attribution
SExprSupp exprSupp;
SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost;
SResultInfo resultInfo;
struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
SOperatorFpSet fpSet;
} SOperatorInfo;
typedef enum {
EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2,
EX_SOURCE_DATA_READY = 0x2,
EX_SOURCE_DATA_EXHAUSTED = 0x3,
} EX_SOURCE_STATUS;
......@@ -210,12 +220,12 @@ typedef enum {
#define COL_MATCH_FROM_SLOT_ID 0x2
typedef struct SSourceDataInfo {
int32_t index;
SRetrieveTableRsp* pRsp;
uint64_t totalRows;
int32_t code;
EX_SOURCE_STATUS status;
const char* taskId;
int32_t index;
SRetrieveTableRsp* pRsp;
uint64_t totalRows;
int32_t code;
EX_SOURCE_STATUS status;
const char* taskId;
} SSourceDataInfo;
typedef struct SLoadRemoteDataInfo {
......@@ -237,22 +247,22 @@ typedef struct SExchangeInfo {
} SExchangeInfo;
typedef struct SColMatchInfo {
int32_t srcSlotId; // source slot id
int32_t srcSlotId; // source slot id
int32_t colId;
int32_t targetSlotId;
bool output; // todo remove this?
bool output; // todo remove this?
bool reserved;
int32_t matchType; // determinate the source according to col id or slot id
int32_t matchType; // determinate the source according to col id or slot id
} SColMatchInfo;
typedef struct SScanInfo {
int32_t numOfAsc;
int32_t numOfDesc;
int32_t numOfAsc;
int32_t numOfDesc;
} SScanInfo;
typedef struct SSampleExecInfo {
double sampleRatio; // data block sample ratio, 1 by default
uint32_t seed; // random seed value
double sampleRatio; // data block sample ratio, 1 by default
uint32_t seed; // random seed value
} SSampleExecInfo;
enum {
......@@ -261,36 +271,38 @@ enum {
};
typedef struct STableScanInfo {
STsdbReader* dataReader;
SReadHandle readHandle;
STsdbReader* dataReader;
SReadHandle readHandle;
SFileBlockLoadRecorder readRecorder;
SScanInfo scanInfo;
int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context,todo: remove this by using SExprSup
int32_t* rowEntryInfoOffset; // todo: remove this by using SExprSup
SExprInfo* pExpr;// todo: remove this by using SExprSup
SSDataBlock* pResBlock;
SArray* pColMatchInfo;
SExprSupp pseudoSup;
SScanInfo scanInfo;
int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx*
pCtx; // which belongs to the direct upstream operator operator query context,todo: remove this by using SExprSup
int32_t* rowEntryInfoOffset; // todo: remove this by using SExprSup
SExprInfo* pExpr; // todo: remove this by using SExprSup
SSDataBlock* pResBlock;
SArray* pColMatchInfo;
SExprSupp pseudoSup;
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
SSampleExecInfo sample; // sample execution info
SSampleExecInfo sample; // sample execution info
int32_t curTWinIdx;
int32_t currentGroupId;
int32_t currentTable;
uint64_t queryId; // todo remove it
uint64_t taskId; // todo remove it
int32_t currentGroupId;
int32_t currentTable;
uint64_t queryId; // todo remove it
uint64_t taskId; // todo remove it
struct {
uint64_t uid;
int64_t ts;
int64_t ts;
} lastStatus;
int8_t scanMode;
......@@ -298,37 +310,37 @@ typedef struct STableScanInfo {
} STableScanInfo;
typedef struct STagScanInfo {
SColumnInfo *pCols;
SSDataBlock *pRes;
SArray *pColMatchInfo;
int32_t curPos;
SReadHandle readHandle;
STableListInfo *pTableList;
SColumnInfo* pCols;
SSDataBlock* pRes;
SArray* pColMatchInfo;
int32_t curPos;
SReadHandle readHandle;
STableListInfo* pTableList;
} STagScanInfo;
typedef struct SLastrowScanInfo {
SSDataBlock *pRes;
SArray *pTableList;
SReadHandle readHandle;
void *pLastrowReader;
SArray *pColMatchInfo;
int32_t *pSlotIds;
SSDataBlock* pRes;
SArray* pTableList;
SReadHandle readHandle;
void* pLastrowReader;
SArray* pColMatchInfo;
int32_t* pSlotIds;
} SLastrowScanInfo;
typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE = 1,
STREAM_SCAN_FROM_RES,
STREAM_SCAN_FROM_UPDATERES,
STREAM_SCAN_FROM_DATAREADER, // todo(liuyao) delete it
STREAM_SCAN_FROM_DATAREADER, // todo(liuyao) delete it
STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
STREAM_SCAN_FROM_DATAREADER_RANGE,
} EStreamScanMode;
typedef struct SCatchSupporter {
SHashObj* pWindowHashTable; // quick locate the window object for each window
SDiskbasedBuf* pDataBuf; // buffer based on blocked-wised disk file
int32_t keySize;
int64_t* pKeyBuf;
SHashObj* pWindowHashTable; // quick locate the window object for each window
SDiskbasedBuf* pDataBuf; // buffer based on blocked-wised disk file
int32_t keySize;
int64_t* pKeyBuf;
} SCatchSupporter;
typedef struct SStreamAggSupporter {
......@@ -336,56 +348,56 @@ typedef struct SStreamAggSupporter {
SArray* pCurWins;
int32_t valueSize;
int32_t keySize;
char* pKeyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
char* pKeyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
SArray* pScanWindow;
} SStreamAggSupporter;
typedef struct SessionWindowSupporter {
SStreamAggSupporter* pStreamAggSup;
int64_t gap;
uint8_t parentType;
int64_t gap;
uint8_t parentType;
} SessionWindowSupporter;
typedef struct SStreamScanInfo {
uint64_t tableUid; // queried super table uid
SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr;
int32_t primaryTsIndex; // primary time stamp slot id
SReadHandle readHandle;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SArray* pColMatchInfo; //
SNode* pCondition;
SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock
SSDataBlock* pUpdateRes; // update SSDataBlock
int32_t updateResIndex;
int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned?
uint64_t numOfExec; // execution times
STqReader* tqReader;
int32_t tsArrayIndex;
SArray* tsArray;
uint64_t groupId;
SUpdateInfo* pUpdateInfo;
EStreamScanMode scanMode;
SOperatorInfo* pStreamScanOp;
SOperatorInfo* pTableScanOp;
SArray* childIds;
uint64_t tableUid; // queried super table uid
SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr;
int32_t primaryTsIndex; // primary time stamp slot id
SReadHandle readHandle;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SArray* pColMatchInfo; //
SNode* pCondition;
SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock
SSDataBlock* pUpdateRes; // update SSDataBlock
int32_t updateResIndex;
int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned?
uint64_t numOfExec; // execution times
STqReader* tqReader;
int32_t tsArrayIndex;
SArray* tsArray;
uint64_t groupId;
SUpdateInfo* pUpdateInfo;
EStreamScanMode scanMode;
SOperatorInfo* pStreamScanOp;
SOperatorInfo* pTableScanOp;
SArray* childIds;
SessionWindowSupporter sessionSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t scanWinIndex; // for state operator
int32_t pullDataResIndex;
SSDataBlock* pPullDataRes; // pull data SSDataBlock
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
int32_t deleteDataIndex;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t scanWinIndex; // for state operator
int32_t pullDataResIndex;
SSDataBlock* pPullDataRes; // pull data SSDataBlock
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
int32_t deleteDataIndex;
// status for tmq
//SSchemaWrapper schema;
// SSchemaWrapper schema;
STqOffset offset;
} SStreamScanInfo;
......@@ -412,27 +424,27 @@ typedef struct SBlockDistInfo {
SSDataBlock* pResBlock;
void* pHandle;
SReadHandle readHandle;
uint64_t uid; // table uid
uint64_t uid; // table uid
} SBlockDistInfo;
// todo remove this
typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo;
SSDataBlock* pRes;
SResultRowInfo resultRowInfo;
SSDataBlock* pRes;
} SOptrBasicInfo;
typedef struct SAggSupporter {
SHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
} SAggSupporter;
typedef struct STimeWindowSupp {
int8_t calTrigger;
int64_t waterMark;
TSKEY maxTs;
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
int8_t calTrigger;
int64_t waterMark;
TSKEY maxTs;
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp;
typedef struct SIntervalAggOperatorInfo {
......@@ -451,78 +463,78 @@ typedef struct SIntervalAggOperatorInfo {
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
STimeWindowAggSupp twAggSup;
bool invertible;
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
bool ignoreExpiredData;
SArray* pRecycledPages;
SArray* pDelWins; // SWinRes
SArray* pDelWins; // SWinRes
int32_t delIndex;
SSDataBlock* pDelRes;
SNode *pCondition;
SNode* pCondition;
} SIntervalAggOperatorInfo;
typedef struct SStreamFinalIntervalOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
int32_t order; // current SSDataBlock scan order
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SArray* pChildren;
SSDataBlock* pUpdateRes;
bool returnUpdate;
SPhysiNode* pPhyNode; // create new child
SPhysiNode* pPhyNode; // create new child
bool isFinal;
SHashObj* pPullDataMap;
SArray* pPullWins; // SPullWindowInfo
SArray* pPullWins; // SPullWindowInfo
int32_t pullIndex;
SSDataBlock* pPullDataRes;
bool ignoreExpiredData;
SArray* pRecycledPages;
SArray* pDelWins; // SWinRes
SArray* pDelWins; // SWinRes
int32_t delIndex;
SSDataBlock* pDelRes;
} SStreamFinalIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SOptrBasicInfo binfo;
SAggSupporter aggSup;
STableQueryInfo *current;
uint64_t groupId;
SGroupResInfo groupResInfo;
SExprSupp scalarExprSup;
STableQueryInfo* current;
uint64_t groupId;
SGroupResInfo groupResInfo;
SExprSupp scalarExprSup;
SNode *pCondition;
SNode* pCondition;
} SAggOperatorInfo;
typedef struct SProjectOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SNode* pFilterNode; // filter info, which is push down by optimizer
SSDataBlock* existDataBlock;
SArray* pPseudoColInfo;
SLimit limit;
SLimit slimit;
uint64_t groupId;
int64_t curSOffset;
int64_t curGroupOutput;
int64_t curOffset;
int64_t curOutput;
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SNode* pFilterNode; // filter info, which is push down by optimizer
SSDataBlock* existDataBlock;
SArray* pPseudoColInfo;
SLimit limit;
SLimit slimit;
uint64_t groupId;
int64_t curSOffset;
int64_t curGroupOutput;
int64_t curOffset;
int64_t curOutput;
} SProjectOperatorInfo;
typedef struct SIndefOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pPseudoColInfo;
SExprSupp scalarSup;
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pPseudoColInfo;
SExprSupp scalarSup;
} SIndefOperatorInfo;
typedef struct SFillOperatorInfo {
......@@ -536,23 +548,23 @@ typedef struct SFillOperatorInfo {
typedef struct SGroupbyOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
SNode* pCondition;
bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo;
SExprSupp scalarSup;
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
SNode* pCondition;
bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo;
SExprSupp scalarSup;
} SGroupbyOperatorInfo;
typedef struct SDataGroupInfo {
uint64_t groupId;
int64_t numOfRows;
SArray* pPageList;
uint64_t groupId;
int64_t numOfRows;
SArray* pPageList;
} SDataGroupInfo;
// The sort in partition may be needed later.
......@@ -564,12 +576,12 @@ typedef struct SPartitionOperatorInfo {
int32_t groupKeyLen; // total group by column width
SHashObj* pGroupSet; // quick locate the window object for each result
SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file
int32_t rowCapacity; // maximum number of rows for each buffer page
int32_t* columnOffset; // start position for each column data
SArray* sortedGroupArray; // SDataGroupInfo sorted by group id
int32_t groupIndex; // group index
int32_t pageIndex; // page index of current group
SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file
int32_t rowCapacity; // maximum number of rows for each buffer page
int32_t* columnOffset; // start position for each column data
SArray* sortedGroupArray; // SDataGroupInfo sorted by group id
int32_t groupIndex; // group index
int32_t pageIndex; // page index of current group
SSDataBlock* pUpdateRes;
SExprSupp scalarSup;
} SPartitionOperatorInfo;
......@@ -583,67 +595,67 @@ typedef struct SWindowRowsSup {
typedef struct SSessionAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
bool reptScan; // next round scan
int64_t gap; // session window gap
int32_t tsSlotId; // primary timestamp slot id
bool reptScan; // next round scan
int64_t gap; // session window gap
int32_t tsSlotId; // primary timestamp slot id
STimeWindowAggSupp twAggSup;
} SSessionAggOperatorInfo;
typedef struct SResultWindowInfo {
SResultRowPosition pos;
STimeWindow win;
bool isOutput;
bool isClosed;
STimeWindow win;
bool isOutput;
bool isClosed;
} SResultWindowInfo;
typedef struct SStateWindowInfo {
SResultWindowInfo winInfo;
SStateKeys stateKey;
SStateKeys stateKey;
} SStateWindowInfo;
typedef struct SStreamSessionAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SGroupResInfo groupResInfo;
int64_t gap; // session window gap
int32_t primaryTsIndex; // primary timestamp slot id
int32_t endTsIndex; // window end timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SSDataBlock* pWinBlock; // window result
SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes; // delete result
bool returnDelete;
SSDataBlock* pUpdateRes; // update window
SHashObj* pStDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child
bool isFinal;
bool ignoreExpiredData;
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SGroupResInfo groupResInfo;
int64_t gap; // session window gap
int32_t primaryTsIndex; // primary timestamp slot id
int32_t endTsIndex; // window end timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SSDataBlock* pWinBlock; // window result
SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes; // delete result
bool returnDelete;
SSDataBlock* pUpdateRes; // update window
SHashObj* pStDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child
bool isFinal;
bool ignoreExpiredData;
} SStreamSessionAggOperatorInfo;
typedef struct STimeSliceOperatorInfo {
SSDataBlock* pRes;
STimeWindow win;
SInterval interval;
int64_t current;
SArray* pPrevRow; // SArray<SGroupValue>
int32_t fillType; // fill type
SColumn tsCol; // primary timestamp column
SExprSupp scalarSup; // scalar calculation
struct SFillColInfo* pFillColInfo; // fill column info
SSDataBlock* pRes;
STimeWindow win;
SInterval interval;
int64_t current;
SArray* pPrevRow; // SArray<SGroupValue>
int32_t fillType; // fill type
SColumn tsCol; // primary timestamp column
SExprSupp scalarSup; // scalar calculation
struct SFillColInfo* pFillColInfo; // fill column info
} STimeSliceOperatorInfo;
typedef struct SStateWindowOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
......@@ -656,52 +668,52 @@ typedef struct SStateWindowOperatorInfo {
} SStateWindowOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SGroupResInfo groupResInfo;
int32_t primaryTsIndex; // primary timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SColumn stateCol; // start row index
SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes;
SHashObj* pSeDeleted;
void* pDelIterator;
SArray* pScanWindow;
SArray* pChildren; // cache for children's result;
bool ignoreExpiredData;
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SGroupResInfo groupResInfo;
int32_t primaryTsIndex; // primary timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SColumn stateCol; // start row index
SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes;
SHashObj* pSeDeleted;
void* pDelIterator;
SArray* pScanWindow;
SArray* pChildren; // cache for children's result;
bool ignoreExpiredData;
} SStreamStateAggOperatorInfo;
typedef struct SSortedMergeOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pSortInfo;
int32_t numOfSources;
SSortHandle *pSortHandle;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
int32_t resultRowFactor;
bool hasGroupVal;
SDiskbasedBuf *pTupleStore; // keep the final results
int32_t numOfResPerPage;
char** groupVal;
SArray *groupInfo;
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pSortInfo;
int32_t numOfSources;
SSortHandle* pSortHandle;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
int32_t resultRowFactor;
bool hasGroupVal;
SDiskbasedBuf* pTupleStore; // keep the final results
int32_t numOfResPerPage;
char** groupVal;
SArray* groupInfo;
} SSortedMergeOperatorInfo;
typedef struct SSortOperatorInfo {
SOptrBasicInfo binfo;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo;
SSortHandle* pSortHandle;
SArray* pColMatchInfo; // for index map from table scan output
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo;
SSortHandle* pSortHandle;
SArray* pColMatchInfo; // for index map from table scan output
int32_t bufPageSize;
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
SNode* pCondition;
SNode* pCondition;
} SSortOperatorInfo;
typedef struct STagFilterOperatorInfo {
......@@ -709,17 +721,17 @@ typedef struct STagFilterOperatorInfo {
} STagFilterOperatorInfo;
typedef struct SJoinOperatorInfo {
SSDataBlock *pRes;
int32_t joinType;
SSDataBlock* pRes;
int32_t joinType;
SSDataBlock *pLeft;
int32_t leftPos;
SColumnInfo leftCol;
SSDataBlock* pLeft;
int32_t leftPos;
SColumnInfo leftCol;
SSDataBlock *pRight;
int32_t rightPos;
SColumnInfo rightCol;
SNode *pCondAfterMerge;
SSDataBlock* pRight;
int32_t rightPos;
SColumnInfo rightCol;
SNode* pCondAfterMerge;
} SJoinOperatorInfo;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
......@@ -728,8 +740,8 @@ typedef struct SJoinOperatorInfo {
void doDestroyExchangeOperatorInfo(void* param);
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
__optr_decode_fn_t decode, __optr_explain_fn_t explain);
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
__optr_decode_fn_t decode, __optr_explain_fn_t explain);
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
void operatorDummyCloseFn(void* param, int32_t numOfCols);
......@@ -739,28 +751,30 @@ void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSupp(SExprSupp* pSup);
int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey);
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf);
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin,
SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
int32_t numOfTotal, int32_t numOfOutput, int32_t order);
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList);
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList);
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
void cleanupAggSup(SAggSupporter* pAggSup);
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
void cleanupAggSup(SAggSupporter* pAggSup);
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts);
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts);
......@@ -769,92 +783,108 @@ SSDataBlock* loadNextDataBlock(void* param);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo,
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
bool isIntervalQuery, SAggSupporter* pSup);
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
const char* pUser, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams,
SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
int32_t num, SArray* pSortInfo, SArray* pGroupInfo,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
SArray* pTableList, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream);
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, bool isStream);
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
SExecTaskInfo* pTaskInfo);
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo,
int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
int32_t primaryTsSlotId, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
SExecTaskInfo* pTaskInfo);
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SBlockDistScanPhysiNode* pBlockScanNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle,
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId);
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
uint64_t taskId);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo);
SSDataBlock* pResBlock, STimeWindowAggSupp* pTwAggSupp, int32_t tsSlotId,
SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, /*SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, */SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode,
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, /*SExprInfo* pExprInfo, int32_t
numOfCols, SSDataBlock* pResultBlock, const SNodeListNode* pValNode, */
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo);
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
#endif
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput, SArray* pPseudoList);
int32_t numOfOutput, SArray* pPseudoList);
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol);
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
int32_t scanFlag, bool createDummyCol);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables);
void setTaskKilled(SExecTaskInfo* pTaskInfo);
void queryCostStatis(SExecTaskInfo* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo);
void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo);
int32_t getMaximumIdleDurationSec();
......@@ -866,7 +896,7 @@ int32_t getMaximumIdleDurationSec();
* nOptrWithVal: *nOptrWithVal save the number of optr with value
* return: result code, 0 means success
*/
int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t *length, int32_t *nOptrWithVal);
int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t* length, int32_t* nOptrWithVal);
/*
* ops: root operator, created by caller
......@@ -879,7 +909,7 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
const char* sql, EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo);
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
int32_t* resNum);
......@@ -889,35 +919,37 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
int32_t precision, STimeWindow* win);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t size);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex);
SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex);
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
int32_t size);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
int64_t gap, int32_t* pIndex);
SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
int64_t gap, int32_t* pIndex);
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, int32_t rows,
int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs,
const int32_t* rowCellOffset, SSDataBlock* pBlock,
SExecTaskInfo* pTaskInfo);
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId);
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId);
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo *pTableListInfo,
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId, uint64_t taskId);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId,
uint64_t taskId);
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
SSDataBlock* createPullDataBlock();
#ifdef __cplusplus
......
......@@ -60,9 +60,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
taosArrayPush(pInfo->pBlockLists, &p);
}
} else if (type == STREAM_INPUT__DATA_SCAN) {
} else if (type == STREAM_INPUT__TABLE_SCAN) {
// do nothing
ASSERT(pInfo->blockType == STREAM_INPUT__DATA_SCAN);
ASSERT(pInfo->blockType == STREAM_INPUT__TABLE_SCAN);
} else {
ASSERT(0);
}
......@@ -76,7 +76,7 @@ int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
return TSDB_CODE_QRY_APP_ERROR;
}
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_INPUT__DATA_SCAN, 0, NULL);
return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_INPUT__TABLE_SCAN, 0, NULL);
}
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {
......
......@@ -267,7 +267,44 @@ const STqOffset* qExtractStatusFromStreamScanner(void* scanner) {
return &pInfo->offset;
}
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
return pTaskInfo->streamInfo.metaBlk;
}
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
return 0;
}
int32_t qStreamPrepareScan1(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
pTaskInfo->streamInfo.prepareStatus = *pOffset;
// TODO: optimize
/*if (pTaskInfo->streamInfo.lastStatus.type != pOffset->type ||*/
/*pTaskInfo->streamInfo.prepareStatus.version != pTaskInfo->streamInfo.lastStatus.version) {*/
while (1) {
uint8_t type = pOperator->operatorType;
pOperator->status = OP_OPENED;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pInfo = pOperator->info;
tqSeekVer(pInfo->tqReader, pOffset->version);
return 0;
} else {
ASSERT(pOperator->numOfDownstream == 1);
pOperator = pOperator->pDownstream[0];
}
}
/*}*/
return 0;
}
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (uid == 0) {
......
......@@ -2848,7 +2848,7 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pScanInfo = pOperator->info;
pScanInfo->blockType = STREAM_INPUT__DATA_SCAN;
pScanInfo->blockType = STREAM_INPUT__TABLE_SCAN;
pScanInfo->pTableScanOp->status = OP_OPENED;
......@@ -3283,7 +3283,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// The downstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
// TODO optimize
/*if (pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) {*/
doSetOperatorCompleted(pOperator);
/*}*/
break;
}
if (pBlock->info.type == STREAM_RETRIEVE) {
......
......@@ -1123,15 +1123,116 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32
uidCol[i] = getGroupId(pOperator, uidCol[i]);
}
}
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
SOperatorInfo* pOperator = pInfo->pStreamScanOp;
SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo;
pInfo->pRes->info.rows = pBlock->info.rows;
pInfo->pRes->info.uid = pBlock->info.uid;
pInfo->pRes->info.type = STREAM_NORMAL;
pInfo->pRes->info.capacity = pBlock->info.rows;
// for generating rollup SMA result, each time is an independent time serie.
// TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
if (pInfo->assignBlockUid) {
pInfo->pRes->info.groupId = pBlock->info.uid;
}
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
if (groupIdPre) {
pInfo->pRes->info.groupId = *groupIdPre;
} else {
pInfo->pRes->info.groupId = 0;
}
// todo extract method
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
if (!pColMatchInfo->output) {
continue;
}
bool colExists = false;
for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
if (pResCol->info.colId == pColMatchInfo->colId) {
taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
colExists = true;
break;
}
}
// the required column does not exists in submit block, let's set it to be all null value
if (!colExists) {
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
}
}
taosArrayDestroy(pBlock->pDataBlock);
ASSERT(pInfo->pRes->pDataBlock != NULL);
#if 0
if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
pOperator->status = OP_EXEC_DONE;
pTaskInfo->code = terrno;
return -1;
}
#endif
// currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) {
addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
}
doFilter(pInfo->pCondition, pInfo->pRes);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pBlockInfo->rows > 0) {
return 0;
}
return 0;
}
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
return NULL;
/*pTaskInfo->code = pOperator->fpSet._openFn(pOperator);*/
/*if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {*/
/*return NULL;*/
/*}*/
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
while (1) {
SFetchRet ret = {0};
tqNextBlock(pInfo->tqReader, &ret);
if (ret.fetchType == FETCH_TYPE__DATA) {
blockDataCleanup(pInfo->pRes);
if (setBlockIntoRes(pInfo, &ret.data) < 0) {
ASSERT(0);
}
pTaskInfo->streamInfo.lastStatus = ret.offset;
if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes;
} else {
tDeleteSSDataBlock(&ret.data);
}
} else if (ret.fetchType == FETCH_TYPE__META) {
ASSERT(0);
pTaskInfo->streamInfo.lastStatus = ret.offset;
pTaskInfo->streamInfo.metaBlk = ret.meta;
return NULL;
} else if (ret.fetchType == FETCH_TYPE__NONE) {
pTaskInfo->streamInfo.lastStatus = ret.offset;
return NULL;
} else {
ASSERT(0);
}
}
}
size_t total = taosArrayGetSize(pInfo->pBlockLists);
......@@ -1139,7 +1240,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
if (pInfo->validBlockIndex >= total) {
/*doClearBufferedBlocks(pInfo);*/
pOperator->status = OP_EXEC_DONE;
/*pOperator->status = OP_EXEC_DONE;*/
return NULL;
}
......@@ -1286,6 +1387,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
taosArrayDestroy(block.pDataBlock);
ASSERT(pInfo->pRes->pDataBlock != NULL);
#if 0
if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
......@@ -1293,6 +1397,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTaskInfo->code = terrno;
return NULL;
}
#endif
// currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) {
......@@ -1312,7 +1417,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pBlockInfo->rows == 0) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
pOperator->status = OP_EXEC_DONE;
/*pOperator->status = OP_EXEC_DONE;*/
} else if (pInfo->pUpdateInfo) {
pInfo->tsArrayIndex = 0;
checkUpdateData(pInfo, true, pInfo->pRes, true);
......@@ -1330,7 +1435,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SCAN) {
} else if (pInfo->blockType == STREAM_INPUT__TABLE_SCAN) {
// check reader last status
// if not match, reset status
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
......
......@@ -147,7 +147,7 @@ static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
return 0;
}
static int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
SWal *pWal = pRead->pWal;
if (ver == pRead->curVersion) {
return 0;
......@@ -355,22 +355,6 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
return 0;
}
int32_t walReadWithHandle_s(SWalReader *pRead, int64_t ver, SWalCont **ppHead) {
taosThreadMutexLock(&pRead->mutex);
if (walReadVer(pRead, ver) < 0) {
taosThreadMutexUnlock(&pRead->mutex);
return -1;
}
*ppHead = taosMemoryMalloc(sizeof(SWalCont) + pRead->pHead->head.bodyLen);
if (*ppHead == NULL) {
taosThreadMutexUnlock(&pRead->mutex);
return -1;
}
memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalCont) + pRead->pHead->head.bodyLen);
taosThreadMutexUnlock(&pRead->mutex);
return 0;
}
int32_t walReadVer(SWalReader *pRead, int64_t ver) {
int64_t code;
......
......@@ -80,7 +80,7 @@
./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic2.sim
./test.sh -f tsim/mnode/basic3.sim
./test.sh -f tsim/mnode/basic4.sim
#./test.sh -f tsim/mnode/basic4.sim
./test.sh -f tsim/mnode/basic5.sim
# ---- show
......
#python3 ./test.py -f 2-query/last.py -Q 3
#./test.sh -f tsim/mnode/basic4.sim
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册