提交 5f376274 编写于 作者: H Haojun Liao

[td-13039] fix bug in project query.

上级 c62dfe64
......@@ -334,9 +334,10 @@ typedef struct SOperatorInfo {
struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
__optr_open_fn_t openFn;
__optr_fn_t getNextFn;
__optr_fn_t cleanupFn;
__optr_close_fn_t closeFn;
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
} SOperatorInfo;
typedef struct {
......
......@@ -224,9 +224,12 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) {
}
}
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
static int32_t operatorDummyOpenFn(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
pOperator->status = OP_OPENED;
OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS;
}
......@@ -4734,6 +4737,11 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
STableScanInfo *pTableScanInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
// The read handle is not initialized yet, since no qualified tables exists
if (pTableScanInfo->pTsdbReadHandle == NULL) {
return NULL;
......@@ -4851,6 +4859,11 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamBlockScanInfo* pInfo = pOperator->info;
pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
pBlockInfo->rows = 0;
......@@ -5159,7 +5172,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
static int32_t prepareLoadRemoteData(void* param) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
if ((pOperator->status & OP_OPENED) == OP_OPENED) {
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
}
......@@ -5173,7 +5186,7 @@ static int32_t prepareLoadRemoteData(void* param) {
}
}
pOperator->status = OP_OPENED;
OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS;
}
......@@ -5183,12 +5196,13 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SExchangeInfo *pExchangeInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
if ((pOperator->status & OP_OPENED) != OP_OPENED) {
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; // TODO add a new error code
int32_t code = pOperator->_openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
}
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
if (pOperator->status == OP_EXEC_DONE) {
qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0);
......@@ -5286,7 +5300,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
pOperator->info = pInfo;
pOperator->numOfOutput = size;
pOperator->pTaskInfo = pTaskInfo;
pOperator->openFn = prepareLoadRemoteData; // assign a dummy function.
pOperator->_openFn = prepareLoadRemoteData; // assign a dummy function.
pOperator->getNextFn = doLoadRemoteData;
pOperator->closeFn = destroyExchangeOperatorInfo;
......@@ -5377,7 +5391,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->openFn = operatorDummyOpenFn;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doTableScan;
pOperator->closeFn = operatorDummyCloseFn;
pOperator->pTaskInfo = pTaskInfo;
......@@ -5461,7 +5475,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock*
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = pResBlock->info.numOfCols;
pOperator->openFn = operatorDummyOpenFn;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doStreamBlockScan;
pOperator->closeFn = operatorDummyCloseFn;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册