提交 c734d040 编写于 作者: L Liu Jicong

fix(query): projection for null input

上级 247342c4
...@@ -539,7 +539,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo ...@@ -539,7 +539,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
taosArrayPush(pTaskInfo->pResultBlockList, &p1); taosArrayPush(pTaskInfo->pResultBlockList, &p1);
p = p1; p = p1;
} else { } else {
p = *(SSDataBlock**) taosArrayGet(pTaskInfo->pResultBlockList, blockIndex); p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
copyDataBlock(p, pRes); copyDataBlock(p, pRes);
} }
...@@ -576,7 +576,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) { ...@@ -576,7 +576,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SArray* pList = pTaskInfo->pResultBlockList; SArray* pList = pTaskInfo->pResultBlockList;
size_t num = taosArrayGetSize(pList); size_t num = taosArrayGetSize(pList);
for(int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i); SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i);
blockDataDestroy(*p); blockDataDestroy(*p);
} }
...@@ -747,11 +747,11 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) { ...@@ -747,11 +747,11 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
} }
int32_t nOptrWithVal = 0; int32_t nOptrWithVal = 0;
// int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal); // int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
// if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) { // if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
// taosMemoryFreeClear(*pOutput); // taosMemoryFreeClear(*pOutput);
// *len = 0; // *len = 0;
// } // }
return 0; return 0;
} }
...@@ -763,7 +763,7 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le ...@@ -763,7 +763,7 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
} }
return 0; return 0;
// return decodeOperator(pTaskInfo->pRoot, pInput, len); // return decodeOperator(pTaskInfo->pRoot, pInput, len);
} }
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
...@@ -890,35 +890,35 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) { ...@@ -890,35 +890,35 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE); /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX); /*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); /*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE); /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX); /*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); /*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamStateAggOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE); /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX); /*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); /*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} }
...@@ -926,7 +926,7 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) { ...@@ -926,7 +926,7 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) { if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
if (pOperator->numOfDownstream > 1) { if (pOperator->numOfDownstream > 1) {
qError("unexpected stream, multiple downstream"); qError("unexpected stream, multiple downstream");
ASSERT(0); /*ASSERT(0);*/
return -1; return -1;
} }
return 0; return 0;
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "filter.h"
#include "executorimpl.h" #include "executorimpl.h"
#include "filter.h"
#include "functionMgt.h" #include "functionMgt.h"
typedef struct SProjectOperatorInfo { typedef struct SProjectOperatorInfo {
...@@ -90,7 +90,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -90,7 +90,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pFinalRes = createOneDataBlock(pResBlock, false); pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
pInfo->mergeDataBlocks = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM)? false:pProjPhyNode->mergeDataBlock; pInfo->mergeDataBlocks = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? false : pProjPhyNode->mergeDataBlock;
int32_t numOfRows = 4096; int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
...@@ -117,9 +117,10 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -117,9 +117,10 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo,
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, pTaskInfo);
destroyProjectOperatorInfo, optrDefaultBufFn, NULL); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo,
optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -414,8 +415,10 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -414,8 +415,10 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo,
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, optrDefaultBufFn, NULL); pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo,
optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -697,13 +700,30 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc ...@@ -697,13 +700,30 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
if (pResult->info.rows > 0 && !createNewColModel) { if (pResult->info.rows > 0 && !createNewColModel) {
if (pInputData->pData[0] == NULL) {
int32_t slotId = pfCtx->param[0].pCol->slotId;
SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInput,
pSrcBlock->info.rows);
} else {
colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInputData->pData[0], colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInputData->pData[0],
pInputData->numOfRows); pInputData->numOfRows);
} else {
colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
} }
} else {
if (pInputData->pData[0] == NULL) {
int32_t slotId = pfCtx->param[0].pCol->slotId;
SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
colDataAssign(pColInfoData, pInput, pSrcBlock->info.rows, &pResult->info);
numOfRows = pSrcBlock->info.rows;
} else {
colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
numOfRows = pInputData->numOfRows; numOfRows = pInputData->numOfRows;
}
}
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) { } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册