提交 fcb058ce 编写于 作者: H Haojun Liao

[td-13039] support scan ssdatablock.

上级 7163b99c
...@@ -32,6 +32,9 @@ typedef struct SReadHandle { ...@@ -32,6 +32,9 @@ typedef struct SReadHandle {
void* meta; void* meta;
} SReadHandle; } SReadHandle;
#define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1
#define STREAM_DATA_TYPE_SSDAT_BLOCK 0x2
/** /**
* Create the exec task for streaming mode * Create the exec task for streaming mode
* @param pMsg * @param pMsg
...@@ -44,9 +47,10 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle); ...@@ -44,9 +47,10 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle);
* Set the input data block for the stream scan. * Set the input data block for the stream scan.
* @param tinfo * @param tinfo
* @param input * @param input
* @param type
* @return * @return
*/ */
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input); int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type);
/** /**
* Update the table id list, add or remove. * Update the table id list, add or remove.
......
...@@ -282,7 +282,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -282,7 +282,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (pHead->head.msgType == TDMT_VND_SUBMIT) { if (pHead->head.msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body; SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body;
qTaskInfo_t task = pTopic->buffer.output[pos].task; qTaskInfo_t task = pTopic->buffer.output[pos].task;
qSetStreamInput(task, pCont); qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) { while (1) {
SSDataBlock* pDataBlock; SSDataBlock* pDataBlock;
......
...@@ -422,6 +422,8 @@ typedef struct STagScanInfo { ...@@ -422,6 +422,8 @@ typedef struct STagScanInfo {
typedef struct SStreamBlockScanInfo { typedef struct SStreamBlockScanInfo {
SSDataBlock* pRes; // result SSDataBlock SSDataBlock* pRes; // result SSDataBlock
int32_t blockType; // current block type
bool blockValid; // Is current data has returned?
SColumnInfo* pCols; // the output column info SColumnInfo* pCols; // the output column info
uint64_t numOfRows; // total scanned rows uint64_t numOfRows; // total scanned rows
uint64_t numOfExec; // execution times uint64_t numOfExec; // execution times
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "planner.h" #include "planner.h"
#include "tq.h" #include "tq.h"
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t type, char* id) {
ASSERT(pOperator != NULL); ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
...@@ -31,18 +31,40 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) ...@@ -31,18 +31,40 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id)
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
return doSetStreamBlock(pOperator->pDownstream[0], input, id); return doSetStreamBlock(pOperator->pDownstream[0], input, type, id);
} else { } else {
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id); // the block type can not be changed in the streamscan operators
if (pInfo->blockType == 0) {
pInfo->blockType = type;
} else if (pInfo->blockType != type) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR;
}
} else {
ASSERT(!pInfo->blockValid);
SSDataBlock* pDataBlock = input;
pInfo->pRes->info = pDataBlock->info;
for(int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
pInfo->pRes->pDataBlock = pDataBlock->pDataBlock;
}
// set current block valid.
pInfo->blockValid = true;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) { int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
...@@ -53,7 +75,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) { ...@@ -53,7 +75,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, GET_TASKID(pTaskInfo)); int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, type, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
} else { } else {
......
...@@ -4869,17 +4869,26 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo *pOperator, bool* newgroup) { ...@@ -4869,17 +4869,26 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo *pOperator, bool* newgroup) {
} }
static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup) { static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup) {
// NOTE: this operator never check if current status is done or not // NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
if (pInfo->blockType == STREAM_DATA_TYPE_SSDAT_BLOCK) {
if (pInfo->blockValid) {
pInfo->blockValid = false; // this block can only be used once.
return pInfo->pRes;
} else {
return NULL;
}
}
pTaskInfo->code = pOperator->_openFn(pOperator); pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) { if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
pBlockInfo->rows = 0; blockDataClearup(pInfo->pRes);
while (tqNextDataBlock(pInfo->readerHandle)) { while (tqNextDataBlock(pInfo->readerHandle)) {
pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册