提交 1c94bbb5 编写于 作者: H Haojun Liao

enh(query): set the status of stream scan operator.

上级 2288d351
...@@ -52,6 +52,10 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { ...@@ -52,6 +52,10 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
if (pIter->len == 0) { if (pIter->len == 0) {
pIter->len += sizeof(SSubmitReq); pIter->len += sizeof(SSubmitReq);
} else { } else {
if (pIter->len >= pIter->totalLen) {
ASSERT(0);
}
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen); pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
ASSERT(pIter->len > 0); ASSERT(pIter->len > 0);
......
...@@ -34,6 +34,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -34,6 +34,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id); return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
} else { } else {
pOperator->status = OP_NOT_OPENED;
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
// the block type can not be changed in the streamscan operators // the block type can not be changed in the streamscan operators
......
...@@ -4931,6 +4931,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) ...@@ -4931,6 +4931,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes); blockDataCleanup(pRes);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
#if 0 #if 0
if (pProjectInfo->existDataBlock) { // TODO refactor if (pProjectInfo->existDataBlock) { // TODO refactor
......
...@@ -539,7 +539,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) ...@@ -539,7 +539,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
pTaskInfo->code = pOperator->_openFn(pOperator); pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) { if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
...@@ -547,6 +547,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) ...@@ -547,6 +547,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
size_t total = taosArrayGetSize(pInfo->pBlockLists); size_t total = taosArrayGetSize(pInfo->pBlockLists);
if (pInfo->validBlockIndex >= total) { if (pInfo->validBlockIndex >= total) {
doClearBufferedBlocks(pInfo); doClearBufferedBlocks(pInfo);
pOperator->status = OP_EXEC_DONE;
return NULL; return NULL;
} }
...@@ -560,11 +561,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) ...@@ -560,11 +561,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) { if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
terrno = pTaskInfo->code; terrno = pTaskInfo->code;
pOperator->status = OP_EXEC_DONE;
return NULL; return NULL;
} }
if (pBlockInfo->rows == 0) { if (pBlockInfo->rows == 0) {
return NULL; break;
} }
SArray* pCols = tqRetrieveDataBlock(pInfo->readerHandle); SArray* pCols = tqRetrieveDataBlock(pInfo->readerHandle);
...@@ -583,6 +585,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) ...@@ -583,6 +585,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
if (pInfo->pRes->pDataBlock == NULL) { if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log // TODO add log
pOperator->status = OP_EXEC_DONE;
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
} }
...@@ -594,6 +597,10 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) ...@@ -594,6 +597,10 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
pInfo->numOfExec++; pInfo->numOfExec++;
pInfo->numOfRows += pBlockInfo->rows; pInfo->numOfRows += pBlockInfo->rows;
if (pBlockInfo->rows == 0) {
pOperator->status = OP_EXEC_DONE;
}
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
} }
} }
......
...@@ -866,10 +866,6 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) { ...@@ -866,10 +866,6 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
} }
int32_t lastFunction(SqlFunctionCtx *pCtx) { int32_t lastFunction(SqlFunctionCtx *pCtx) {
if (pCtx->order != TSDB_ORDER_DESC) {
return 0;
}
int32_t numOfElems = 0; int32_t numOfElems = 0;
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册