未验证 提交 b067079d 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18007 from taosdata/feature/3_liaohj

refactor: do some internal refactor.
...@@ -65,13 +65,14 @@ ELSE () ...@@ -65,13 +65,14 @@ ELSE ()
ENDIF () ENDIF ()
MESSAGE(STATUS "============= compile version parameter information start ============= ") MESSAGE(STATUS "============= compile version parameter information start ============= ")
MESSAGE(STATUS "ver number:" ${TD_VER_NUMBER}) MESSAGE(STATUS "version: " ${TD_VER_NUMBER})
MESSAGE(STATUS "compatible ver number:" ${TD_VER_COMPATIBLE}) MESSAGE(STATUS "compatible: " ${TD_VER_COMPATIBLE})
MESSAGE(STATUS "communit commit id:" ${TD_VER_GIT}) MESSAGE(STATUS "commit id: " ${TD_VER_GIT})
MESSAGE(STATUS "build date:" ${TD_VER_DATE}) MESSAGE(STATUS "build date: " ${TD_VER_DATE})
MESSAGE(STATUS "ver type:" ${TD_VER_VERTYPE}) MESSAGE(STATUS "build type: " ${CMAKE_BUILD_TYPE})
MESSAGE(STATUS "ver cpu:" ${TD_VER_CPUTYPE}) MESSAGE(STATUS "type: " ${TD_VER_VERTYPE})
MESSAGE(STATUS "os type:" ${TD_VER_OSTYPE}) MESSAGE(STATUS "cpu: " ${TD_VER_CPUTYPE})
MESSAGE(STATUS "os: " ${TD_VER_OSTYPE})
MESSAGE(STATUS "============= compile version parameter information end ============= ") MESSAGE(STATUS "============= compile version parameter information end ============= ")
STRING(REPLACE "." "_" TD_LIB_VER_NUMBER ${TD_VER_NUMBER}) STRING(REPLACE "." "_" TD_LIB_VER_NUMBER ${TD_VER_NUMBER})
...@@ -244,7 +244,7 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColIn ...@@ -244,7 +244,7 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColIn
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId); SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId);
SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index); SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index);
void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress); int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols);
const char* blockDecode(SSDataBlock* pBlock, const char* pData); const char* blockDecode(SSDataBlock* pBlock, const char* pData);
void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag); void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag);
......
...@@ -442,8 +442,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) { ...@@ -442,8 +442,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
(*pRsp)->numOfRows = htonl(pBlock->info.rows); (*pRsp)->numOfRows = htonl(pBlock->info.rows);
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS); (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
int32_t len = 0; int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS);
blockEncode(pBlock, (*pRsp)->data, &len, SHOW_VARIABLES_RESULT_COLS, false);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp)); ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
......
...@@ -2197,7 +2197,9 @@ char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { ...@@ -2197,7 +2197,9 @@ char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
return rname.ctbShortName; return rname.ctbShortName;
} }
void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress) { int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
int32_t dataLen = 0;
// todo extract method // todo extract method
int32_t* version = (int32_t*)data; int32_t* version = (int32_t*)data;
*version = 1; *version = 1;
...@@ -2238,7 +2240,7 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_ ...@@ -2238,7 +2240,7 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_
int32_t* colSizes = (int32_t*)data; int32_t* colSizes = (int32_t*)data;
data += numOfCols * sizeof(int32_t); data += numOfCols * sizeof(int32_t);
*dataLen = blockDataGetSerialMetaSize(numOfCols); dataLen = blockDataGetSerialMetaSize(numOfCols);
int32_t numOfRows = pBlock->info.rows; int32_t numOfRows = pBlock->info.rows;
for (int32_t col = 0; col < numOfCols; ++col) { for (int32_t col = 0; col < numOfCols; ++col) {
...@@ -2255,26 +2257,23 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_ ...@@ -2255,26 +2257,23 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_
} }
data += metaSize; data += metaSize;
(*dataLen) += metaSize; dataLen += metaSize;
if (needCompress) { colSizes[col] = colDataGetLength(pColRes, numOfRows);
colSizes[col] = blockCompressColData(pColRes, numOfRows, data, needCompress); dataLen += colSizes[col];
data += colSizes[col]; memmove(data, pColRes->pData, colSizes[col]);
(*dataLen) += colSizes[col]; data += colSizes[col];
} else {
colSizes[col] = colDataGetLength(pColRes, numOfRows);
(*dataLen) += colSizes[col];
memmove(data, pColRes->pData, colSizes[col]);
data += colSizes[col];
}
colSizes[col] = htonl(colSizes[col]); colSizes[col] = htonl(colSizes[col]);
} }
*actualLen = *dataLen; *actualLen = dataLen;
*groupId = pBlock->info.groupId; *groupId = pBlock->info.groupId;
ASSERT(*dataLen > 0); ASSERT(dataLen > 0);
uDebug("build data block, actualLen:%d, rows:%d, cols:%d", *dataLen, *rows, *cols);
uDebug("build data block, actualLen:%d, rows:%d, cols:%d", dataLen, *rows, *cols);
return dataLen;
} }
const char* blockDecode(SSDataBlock* pBlock, const char* pData) { const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
......
...@@ -307,8 +307,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -307,8 +307,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
pStart += sizeof(SSysTableSchema); pStart += sizeof(SSysTableSchema);
} }
int32_t len = 0; int32_t len = blockEncode(pBlock, pStart, numOfCols);
blockEncode(pBlock, pStart, &len, numOfCols, false);
pRsp->numOfRows = htonl(pBlock->info.rows); pRsp->numOfRows = htonl(pBlock->info.rows);
pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision
......
...@@ -303,8 +303,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { ...@@ -303,8 +303,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
pStart += sizeof(SSysTableSchema); pStart += sizeof(SSysTableSchema);
} }
int32_t len = 0; int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns);
blockEncode(pBlock, pStart, &len, pShow->pMeta->numOfColumns, false);
} }
pRsp->numOfRows = htonl(rowsRead); pRsp->numOfRows = htonl(rowsRead);
......
...@@ -27,9 +27,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t ...@@ -27,9 +27,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
pRetrieve->completed = 1; pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pBlock->info.rows); pRetrieve->numOfRows = htonl(pBlock->info.rows);
// TODO enable compress int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
int32_t actualLen = 0;
blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp); actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen); ASSERT(actualLen <= dataStrLen);
taosArrayPush(pRsp->blockDataLen, &actualLen); taosArrayPush(pRsp->blockDataLen, &actualLen);
......
...@@ -973,7 +973,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo ...@@ -973,7 +973,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
int32_t mid = dumpedRows >> 1u; int32_t mid = dumpedRows >> 1u;
int8_t* pts = (int8_t*)pColData->pData; int8_t* pts = (int8_t*)pColData->pData;
for (int32_t j = 0; j < mid; ++j) { for (int32_t j = 0; j < mid; ++j) {
int64_t t = pts[j]; int8_t t = pts[j];
pts[j] = pts[dumpedRows - j - 1]; pts[j] = pts[dumpedRows - j - 1];
pts[dumpedRows - j - 1] = t; pts[dumpedRows - j - 1] = t;
} }
...@@ -998,7 +998,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo ...@@ -998,7 +998,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
int32_t mid = dumpedRows >> 1u; int32_t mid = dumpedRows >> 1u;
int32_t* pts = (int32_t*)pColData->pData; int32_t* pts = (int32_t*)pColData->pData;
for (int32_t j = 0; j < mid; ++j) { for (int32_t j = 0; j < mid; ++j) {
int64_t t = pts[j]; int32_t t = pts[j];
pts[j] = pts[dumpedRows - j - 1]; pts[j] = pts[dumpedRows - j - 1];
pts[dumpedRows - j - 1] = t; pts[dumpedRows - j - 1] = t;
} }
......
...@@ -39,8 +39,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe ...@@ -39,8 +39,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
(*pRsp)->numOfRows = htonl(pBlock->info.rows); (*pRsp)->numOfRows = htonl(pBlock->info.rows);
(*pRsp)->numOfCols = htonl(numOfCols); (*pRsp)->numOfCols = htonl(numOfCols);
int32_t len = 0; int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols);
blockEncode(pBlock, (*pRsp)->data, &len, numOfCols, false);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp)); ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -1610,8 +1610,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { ...@@ -1610,8 +1610,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
rsp->completed = 1; rsp->completed = 1;
rsp->numOfRows = htonl(rowNum); rsp->numOfRows = htonl(rowNum);
int32_t len = 0; int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock));
blockEncode(pBlock, rsp->data, &len, taosArrayGetSize(pBlock->pDataBlock), 0);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp)); ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
rsp->compLen = htonl(len); rsp->compLen = htonl(len);
......
...@@ -842,10 +842,8 @@ typedef struct SJoinOperatorInfo { ...@@ -842,10 +842,8 @@ typedef struct SJoinOperatorInfo {
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
void doDestroyExchangeOperatorInfo(void* param); SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
__optr_close_fn_t closeFn, __optr_explain_fn_t explain);
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_explain_fn_t explain);
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
...@@ -881,7 +879,11 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter ...@@ -881,7 +879,11 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
void doSetOperatorCompleted(SOperatorInfo* pOperator); void doDestroyExchangeOperatorInfo(void* param);
void setOperatorCompleted(SOperatorInfo* pOperator);
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, void* pInfo,
SExecTaskInfo* pTaskInfo);
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr,
SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo * pCache); SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo * pCache);
......
...@@ -93,16 +93,11 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -93,16 +93,11 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset); p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset);
} }
pOperator->name = "LastrowScanOperator"; setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, NULL, destroyLastrowScanOperator, NULL); createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, destroyLastrowScanOperator, NULL);
pOperator->cost.openCost = 0; pOperator->cost.openCost = 0;
return pOperator; return pOperator;
...@@ -126,7 +121,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -126,7 +121,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
uint64_t suid = tableListGetSuid(pTableList); uint64_t suid = tableListGetSuid(pTableList);
int32_t size = tableListGetSize(pTableList); int32_t size = tableListGetSize(pTableList);
if (size == 0) { if (size == 0) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -182,7 +177,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -182,7 +177,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pInfo->indexOfBufferedRes += 1; pInfo->indexOfBufferedRes += 1;
return pRes; return pRes;
} else { } else {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
} else { } else {
...@@ -234,7 +229,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -234,7 +229,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
} }
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
} }
......
...@@ -76,7 +76,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn ...@@ -76,7 +76,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
pEntry->dataLen = 0; pEntry->dataLen = 0;
pBuf->useSize = sizeof(SDataCacheEntry); pBuf->useSize = sizeof(SDataCacheEntry);
blockEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed); pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols);
ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
......
此差异已折叠。
...@@ -1106,3 +1106,24 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1106,3 +1106,24 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} }
return 0; return 0;
} }
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
assert(pMsg->info.ahandle != NULL);
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};
if (pMsg->contLen > 0) {
buf.pData = taosMemoryCalloc(1, pMsg->contLen);
if (buf.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
} else {
memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
}
}
pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pSendInfo);
}
\ No newline at end of file
...@@ -316,7 +316,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) { ...@@ -316,7 +316,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
if (!hasRemainResults(&pInfo->groupResInfo)) { if (!hasRemainResults(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -438,15 +438,10 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* ...@@ -438,15 +438,10 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
} }
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->name = "GroupbyAggOperator";
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL, destroyGroupOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
...@@ -654,7 +649,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { ...@@ -654,7 +649,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
// try next group data // try next group data
++pInfo->groupIndex; ++pInfo->groupIndex;
if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) { if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
clearPartitionOperator(pInfo); clearPartitionOperator(pInfo);
return NULL; return NULL;
} }
...@@ -821,17 +816,12 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition ...@@ -821,17 +816,12 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
goto _error; goto _error;
} }
pOperator->name = "PartitionOperator"; setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
...@@ -965,7 +955,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { ...@@ -965,7 +955,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
pInfo->pInputDataBlock = NULL; pInfo->pInputDataBlock = NULL;
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
printDataBlock(pBlock, "stream partitionby recv"); printDataBlock(pBlock, "stream partitionby recv");
...@@ -1106,15 +1096,10 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr ...@@ -1106,15 +1096,10 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols);
pOperator->name = "StreamPartitionOperator"; setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL,
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, NULL,
destroyStreamPartitionOperatorInfo, NULL); destroyStreamPartitionOperatorInfo, NULL);
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup); initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
......
...@@ -73,14 +73,10 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t ...@@ -73,14 +73,10 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
pOperator->name = "MergeJoinOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode); extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode);
...@@ -121,8 +117,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t ...@@ -121,8 +117,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pInfo->inputOrder = TSDB_ORDER_DESC; pInfo->inputOrder = TSDB_ORDER_DESC;
} }
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, NULL);
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL);
code = appendDownstream(pOperator, pDownstream, numOfDownstream); code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
...@@ -372,13 +367,13 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) ...@@ -372,13 +367,13 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
if (leftTs == rightTs) { if (leftTs == rightTs) {
mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows); mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows);
} else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) { } else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) {
pJoinInfo->leftPos += 1; pJoinInfo->leftPos += 1;
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
continue; continue;
} }
} else if (asc && leftTs > rightTs || !asc && leftTs < rightTs) { } else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) {
pJoinInfo->rightPos += 1; pJoinInfo->rightPos += 1;
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
continue; continue;
......
...@@ -98,13 +98,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -98,13 +98,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
} }
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
pOperator->name = "ProjectOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL,
destroyProjectOperatorInfo, NULL); destroyProjectOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
...@@ -153,7 +149,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S ...@@ -153,7 +149,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) { if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) {
pLimitInfo->numOfOutputGroups += 1; pLimitInfo->numOfOutputGroups += 1;
if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return PROJECT_RETRIEVE_DONE; return PROJECT_RETRIEVE_DONE;
} }
...@@ -187,7 +183,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS ...@@ -187,7 +183,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
// TODO: optimize it later when partition by + limit // TODO: optimize it later when partition by + limit
if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) || if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) ||
(pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }
} }
...@@ -252,7 +248,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -252,7 +248,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
} }
qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status, qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status,
pFinalRes->info.rows); pFinalRes->info.rows);
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
...@@ -400,14 +396,8 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -400,14 +396,8 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pOperator->name = "IndefinitOperator"; setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, NULL);
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
destroyIndefinitOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -499,7 +489,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { ...@@ -499,7 +489,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
// The downstream exec may change the value of the newgroup, so use a local variable instead. // The downstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -628,7 +618,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) { ...@@ -628,7 +618,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += pRes->info.rows; pOperator->resultInfo.totalRows += pRes->info.rows;
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
if (pOperator->cost.openCost == 0) { if (pOperator->cost.openCost == 0) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
} }
......
...@@ -820,7 +820,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -820,7 +820,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
} else { // scan table group by group sequentially } else { // scan table group by group sequentially
if (pInfo->currentGroupId == -1) { if (pInfo->currentGroupId == -1) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) { if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -843,7 +843,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -843,7 +843,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
} }
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) { if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -865,7 +865,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -865,7 +865,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return result; return result;
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
} }
...@@ -947,13 +947,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -947,13 +947,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->currentGroupId = -1; pInfo->currentGroupId = -1;
pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
pOperator->name = "TableScanOperator"; // for debug purpose setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024*128, -1, .5); pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024*128, -1, .5);
if (pInfo->metaCache.pTableMetaEntryCache == NULL) { if (pInfo->metaCache.pTableMetaEntryCache == NULL) {
...@@ -962,7 +957,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -962,7 +957,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
} }
taosLRUCacheSetStrictCapacity(pInfo->metaCache.pTableMetaEntryCache, false); taosLRUCacheSetStrictCapacity(pInfo->metaCache.pTableMetaEntryCache, false);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
getTableScannerExecInfo); getTableScannerExecInfo);
// for non-blocking operator, the open cost is always 0 // for non-blocking operator, the open cost is always 0
...@@ -986,14 +981,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* ...@@ -986,14 +981,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
pInfo->dataReader = pReadHandle; pInfo->dataReader = pReadHandle;
// pInfo->prevGroupId = -1; // pInfo->prevGroupId = -1;
pOperator->name = "TableSeqScanOperator"; setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL);
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL);
return pOperator; return pOperator;
} }
...@@ -1148,15 +1137,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi ...@@ -1148,15 +1137,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
goto _error; goto _error;
} }
pOperator->name = "DataBlockDistScanOperator"; setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL);
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, destroyBlockDistScanOperatorInfo, NULL);
return pOperator; return pOperator;
_error: _error:
...@@ -2368,11 +2350,9 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT ...@@ -2368,11 +2350,9 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
pInfo->vnode = pHandle->vnode; pInfo->vnode = pHandle->vnode;
pInfo->sContext = pHandle->sContext; pInfo->sContext = pHandle->sContext;
pOperator->name = "RawScanOperator"; setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL); pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, NULL);
return pOperator; return pOperator;
_end: _end:
...@@ -2556,16 +2536,11 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2556,16 +2536,11 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
pInfo->partitionSup.needCalc = false; pInfo->partitionSup.needCalc = false;
pOperator->name = "StreamScanOperator"; setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->pTaskInfo = pTaskInfo;
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, NULL);
return pOperator; return pOperator;
...@@ -2900,7 +2875,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { ...@@ -2900,7 +2875,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
} }
blockDataDestroy(dataBlock); blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
} }
...@@ -2953,7 +2928,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { ...@@ -2953,7 +2928,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
if (ret != 0) { if (ret != 0) {
metaCloseTbCursor(pInfo->pCur); metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL; pInfo->pCur = NULL;
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
...@@ -3743,7 +3718,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { ...@@ -3743,7 +3718,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
} }
if (i >= taosArrayGetSize(pIdx->uids)) { if (i >= taosArrayGetSize(pIdx->uids)) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} else { } else {
pIdx->lastIdx = i + 1; pIdx->lastIdx = i + 1;
} }
...@@ -3925,7 +3900,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { ...@@ -3925,7 +3900,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
if (ret != 0) { if (ret != 0) {
metaCloseTbCursor(pInfo->pCur); metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL; pInfo->pCur = NULL;
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
...@@ -3947,7 +3922,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { ...@@ -3947,7 +3922,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
} else { } else {
if (pInfo->showRewrite == false) { if (pInfo->showRewrite == false) {
...@@ -4199,15 +4174,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan ...@@ -4199,15 +4174,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pInfo->readHandle = *(SReadHandle*)readHandle; pInfo->readHandle = *(SReadHandle*)readHandle;
} }
pOperator->name = "SysTableScanOperator"; setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, NULL);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL);
return pOperator; return pOperator;
_error: _error:
...@@ -4283,7 +4252,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -4283,7 +4252,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
count += 1; count += 1;
if (++pInfo->curPos >= size) { if (++pInfo->curPos >= size) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }
} }
...@@ -4335,18 +4304,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -4335,18 +4304,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
pInfo->readHandle = *pReadHandle; pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0; pInfo->curPos = 0;
pOperator->name = "TagScanOperator"; setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, NULL);
return pOperator; return pOperator;
...@@ -4713,7 +4675,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -4713,7 +4675,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
pInfo->hasGroupId = true; pInfo->hasGroupId = true;
if (tableListSize == 0) { if (tableListSize == 0) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
pInfo->tableStartIndex = 0; pInfo->tableStartIndex = 0;
...@@ -4732,7 +4694,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -4732,7 +4694,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
} else { } else {
stopGroupTableMergeScan(pOperator); stopGroupTableMergeScan(pOperator);
if (pInfo->tableEndIndex >= tableListSize - 1) { if (pInfo->tableEndIndex >= tableListSize - 1) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
pInfo->tableStartIndex = pInfo->tableEndIndex + 1; pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
...@@ -4853,15 +4815,10 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -4853,15 +4815,10 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
int32_t rowSize = pInfo->pResBlock->info.rowSize; int32_t rowSize = pInfo->pResBlock->info.rowSize;
pInfo->bufPageSize = getProperSortPageSize(rowSize); pInfo->bufPageSize = getProperSortPageSize(rowSize);
pOperator->name = "TableMergeScanOperator"; setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL,
destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo); destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo);
pOperator->cost.openCost = 0; pOperator->cost.openCost = 0;
return pOperator; return pOperator;
......
...@@ -53,11 +53,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* ...@@ -53,11 +53,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
pOperator->name = "SortOperator"; setOperatorInfo(pOperator, "SortOperator", QUERY_NODE_PHYSICAL_PLAN_SORT, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
...@@ -67,7 +63,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* ...@@ -67,7 +63,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
// TODO dynamic set the available sort buffer // TODO dynamic set the available sort buffer
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroySortOperatorInfo, getExplainExecInfo); createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, getExplainExecInfo);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -214,7 +210,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { ...@@ -214,7 +210,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
pInfo->matchInfo.pList, pInfo); pInfo->matchInfo.pList, pInfo);
if (pBlock == NULL) { if (pBlock == NULL) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -428,7 +424,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { ...@@ -428,7 +424,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) {
pInfo->prefetchedSortInput = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]); pInfo->prefetchedSortInput = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]);
if (pInfo->prefetchedSortInput == NULL) { if (pInfo->prefetchedSortInput == NULL) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId; pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId;
...@@ -453,7 +449,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { ...@@ -453,7 +449,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) {
beginSortGroup(pOperator); beginSortGroup(pOperator);
} else if (pInfo->childOpStatus == CHILD_OP_FINISHED) { } else if (pInfo->childOpStatus == CHILD_OP_FINISHED) {
finishSortGroup(pOperator); finishSortGroup(pOperator);
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
} }
...@@ -509,15 +505,8 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort ...@@ -509,15 +505,8 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
} }
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->name = "GroupSortOperator"; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo,
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo,
getGroupSortExplainExecInfo); getGroupSortExplainExecInfo);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
...@@ -705,7 +694,7 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { ...@@ -705,7 +694,7 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
if (pBlock != NULL) { if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
} else { } else {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }
return pBlock; return pBlock;
...@@ -774,14 +763,8 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size ...@@ -774,14 +763,8 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
pInfo->bufPageSize = getProperSortPageSize(rowSize); pInfo->bufPageSize = getProperSortPageSize(rowSize);
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result.
pOperator->name = "MultiwayMerge"; setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL,
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL, NULL,
destroyMultiwayMergeOperatorInfo, getMultiwayMergeExplainExecInfo); destroyMultiwayMergeOperatorInfo, getMultiwayMergeExplainExecInfo);
code = appendDownstream(pOperator, downStreams, numStreams); code = appendDownstream(pOperator, downStreams, numStreams);
......
...@@ -1443,7 +1443,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { ...@@ -1443,7 +1443,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
printDataBlock(pInfo->pRes, "stream fill"); printDataBlock(pInfo->pRes, "stream fill");
return pInfo->pRes; return pInfo->pRes;
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
resetStreamFillInfo(pInfo); resetStreamFillInfo(pInfo);
return NULL; return NULL;
} }
...@@ -1512,7 +1512,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { ...@@ -1512,7 +1512,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
} }
if (pInfo->pRes->info.rows == 0) { if (pInfo->pRes->info.rows == 0) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
resetStreamFillInfo(pInfo); resetStreamFillInfo(pInfo);
return NULL; return NULL;
} }
...@@ -1690,15 +1690,9 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi ...@@ -1690,15 +1690,9 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
} }
pInfo->srcRowIndex = 0; pInfo->srcRowIndex = 0;
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->name = "StreamFillOperator";
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, NULL, destroyStreamFillOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -1221,7 +1221,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -1221,7 +1221,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
pTaskInfo->code = pOperator->fpSet._openFn(pOperator); pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) { if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -1232,7 +1232,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -1232,7 +1232,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
bool hasRemain = hasRemainResults(&pInfo->groupResInfo); bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -1269,7 +1269,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { ...@@ -1269,7 +1269,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
bool hasRemain = hasRemainResults(&pInfo->groupResInfo); bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -1739,7 +1739,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh ...@@ -1739,7 +1739,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY); ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY);
pOperator->pTaskInfo = pTaskInfo;
pInfo->win = pTaskInfo->window; pInfo->win = pTaskInfo->window;
pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
...@@ -1777,15 +1776,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh ...@@ -1777,15 +1776,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
} }
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->name = "TimeIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL, destroyIntervalOperatorInfo, NULL); createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -1890,7 +1884,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -1890,7 +1884,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
bool hasRemain = hasRemainResults(&pInfo->groupResInfo); bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -1933,7 +1927,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -1933,7 +1927,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
bool hasRemain = hasRemainResults(&pInfo->groupResInfo); bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -2281,7 +2275,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2281,7 +2275,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
if (pSliceInfo->current > pSliceInfo->win.ekey) { if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -2330,7 +2324,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2330,7 +2324,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
if (pSliceInfo->current > pSliceInfo->win.ekey) { if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
} }
...@@ -2342,7 +2336,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2342,7 +2336,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
pSliceInfo->current = pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) { if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
} }
...@@ -2365,7 +2359,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2365,7 +2359,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
if (pSliceInfo->current > pSliceInfo->win.ekey) { if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
} }
...@@ -2386,7 +2380,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2386,7 +2380,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
if (pSliceInfo->current > pSliceInfo->win.ekey) { if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
} else { } else {
...@@ -2448,7 +2442,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2448,7 +2442,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
if (pSliceInfo->current > pSliceInfo->win.ekey) { if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
} }
...@@ -2463,7 +2457,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2463,7 +2457,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
if (pSliceInfo->current > pSliceInfo->win.ekey) { if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
} }
...@@ -2557,15 +2551,9 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode ...@@ -2557,15 +2551,9 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pScanInfo->cond.twindows = pInfo->win; pScanInfo->cond.twindows = pInfo->win;
pScanInfo->cond.type = TIMEWINDOW_RANGE_EXTERNAL; pScanInfo->cond.type = TIMEWINDOW_RANGE_EXTERNAL;
pOperator->name = "TimeSliceOperator"; setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyTimeSliceOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, NULL);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
...@@ -2633,15 +2621,10 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi ...@@ -2633,15 +2621,10 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->tsSlotId = tsSlotId; pInfo->tsSlotId = tsSlotId;
pOperator->name = "StateWindowOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pInfo;
setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo, NULL); createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2711,14 +2694,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW ...@@ -2711,14 +2694,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
goto _error; goto _error;
} }
pOperator->name = "SessionWindowAggOperator"; setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, destroySWindowOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, NULL);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -3134,7 +3112,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3134,7 +3112,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
if (!IS_FINAL_OP(pInfo)) { if (!IS_FINAL_OP(pInfo)) {
clearFunctionContext(&pOperator->exprSupp); clearFunctionContext(&pOperator->exprSupp);
// semi interval operator clear disk buffer // semi interval operator clear disk buffer
...@@ -3403,7 +3381,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3403,7 +3381,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, NULL); createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, NULL);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
} }
...@@ -4027,7 +4005,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -4027,7 +4005,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes; return pBInfo->pRes;
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -4133,7 +4111,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -4133,7 +4111,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes; return pBInfo->pRes;
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -4200,13 +4178,11 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -4200,13 +4178,11 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
pOperator->name = "StreamSessionWindowAggOperator"; setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->blocking = true; pOperator->fpSet =
pOperator->status = OP_NOT_OPENED; createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, NULL);
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL,
destroyStreamSessionAggOperatorInfo, NULL);
if (downstream) { if (downstream) {
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType, initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
pInfo->primaryTsIndex); pInfo->primaryTsIndex);
...@@ -4257,7 +4233,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { ...@@ -4257,7 +4233,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
clearFunctionContext(&pOperator->exprSupp); clearFunctionContext(&pOperator->exprSupp);
// semi interval operator clear disk buffer // semi interval operator clear disk buffer
clearStreamSessionOperator(pInfo); clearStreamSessionOperator(pInfo);
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
} }
...@@ -4336,7 +4312,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { ...@@ -4336,7 +4312,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
clearFunctionContext(&pOperator->exprSupp); clearFunctionContext(&pOperator->exprSupp);
// semi interval operator clear disk buffer // semi interval operator clear disk buffer
clearStreamSessionOperator(pInfo); clearStreamSessionOperator(pInfo);
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -4347,20 +4323,21 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream ...@@ -4347,20 +4323,21 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
if (pOperator == NULL) { if (pOperator == NULL) {
goto _error; goto _error;
} }
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pInfo->isFinal = (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION);
pInfo->isFinal = true; char* name = (pInfo->isFinal)? "StreamSessionFinalAggOperator":"StreamSessionSemiAggOperator";
pOperator->name = "StreamSessionFinalAggOperator";
} else { if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
pInfo->isFinal = false;
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
blockDataEnsureCapacity(pInfo->pUpdateRes, 128); blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pOperator->name = "StreamSessionSemiAggOperator"; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL,
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL,
destroyStreamSessionAggOperatorInfo, NULL); destroyStreamSessionAggOperatorInfo, NULL);
} }
setOperatorInfo(pOperator, name, pPhyNode->type , false, OP_NOT_OPENED, pInfo, pTaskInfo);
pInfo->pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
...@@ -4590,7 +4567,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -4590,7 +4567,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes; return pBInfo->pRes;
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -4656,7 +4633,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -4656,7 +4633,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
printDataBlock(pBInfo->pRes, "single state"); printDataBlock(pBInfo->pRes, "single state");
return pBInfo->pRes; return pBInfo->pRes;
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -4721,14 +4698,9 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4721,14 +4698,9 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
pOperator->name = "StreamStateAggOperator"; setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL, destroyStreamStateOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, NULL);
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType, initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
pInfo->primaryTsIndex); pInfo->primaryTsIndex);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
...@@ -4876,7 +4848,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4876,7 +4848,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
cleanupAfterGroupResultGen(pMiaInfo, pRes); cleanupAfterGroupResultGen(pMiaInfo, pRes);
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -5001,16 +4973,10 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -5001,16 +4973,10 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
initResultRowInfo(&iaInfo->binfo.resultRowInfo); initResultRowInfo(&iaInfo->binfo.resultRowInfo);
blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL, false, OP_NOT_OPENED, miaInfo, pTaskInfo);
pOperator->name = "TimeMergeAlignedIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->pTaskInfo = pTaskInfo;
pOperator->info = miaInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL, destroyMAIOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -5254,7 +5220,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5254,7 +5220,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
} }
if (pRes->info.rows == 0) { if (pRes->info.rows == 0) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }
size_t rows = pRes->info.rows; size_t rows = pRes->info.rows;
...@@ -5313,16 +5279,9 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge ...@@ -5313,16 +5279,9 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
} }
initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo); initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false, OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
pOperator->name = "TimeMergeIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pMergeIntervalInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL, destroyMergeIntervalOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -5366,7 +5325,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5366,7 +5325,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
} }
deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval, deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval,
&pInfo->delKey); &pInfo->delKey);
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
streamStateCommit(pTaskInfo->streamInfo.pState); streamStateCommit(pTaskInfo->streamInfo.pState);
return NULL; return NULL;
} }
...@@ -5550,12 +5509,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5550,12 +5509,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
pOperator->name = "StreamIntervalOperator"; setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL,
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL,
destroyStreamFinalIntervalOperatorInfo, NULL); destroyStreamFinalIntervalOperatorInfo, NULL);
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
......
...@@ -3096,27 +3096,86 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -3096,27 +3096,86 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
} }
#else #else
int64_t* pts = (int64_t*)pInput->pPTS->pData; int64_t* pts = (int64_t*)pInput->pPTS->pData;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { #if 0
continue; for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
continue;
}
numOfElems++;
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
char* data = colDataGetData(pInputCol, i);
doSaveCurrentVal(pCtx, i, pts[i], type, data);
pResInfo->numOfRes = 1;
}
} }
#else
if (!pInputCol->hasNull) {
numOfElems = 1;
numOfElems++; int32_t round = pInput->numOfRows >> 2;
int32_t reminder = pInput->numOfRows & 0x03;
char* data = colDataGetData(pInputCol, i); int32_t tick = 0;
TSKEY cts = pts[i]; for (int32_t i = pInput->startRowIndex; tick < round; i += 4, tick += 1) {
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { int64_t cts = pts[i];
doSaveCurrentVal(pCtx, i, cts, type, data); int32_t chosen = i;
pResInfo->numOfRes = 1;
if (cts < pts[i + 1]) {
cts = pts[i + 1];
chosen = i + 1;
}
if (cts < pts[i + 2]) {
cts = pts[i + 2];
chosen = i + 2;
}
if (cts < pts[i + 3]) {
cts = pts[i + 3];
chosen = i + 3;
}
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
char* data = colDataGetData(pInputCol, chosen);
doSaveCurrentVal(pCtx, i, cts, type, data);
pResInfo->numOfRes = 1;
}
}
for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
char* data = colDataGetData(pInputCol, i);
doSaveCurrentVal(pCtx, i, pts[i], type, data);
pResInfo->numOfRes = 1;
}
}
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
continue;
}
numOfElems++;
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
char* data = colDataGetData(pInputCol, i);
doSaveCurrentVal(pCtx, i, pts[i], type, data);
pResInfo->numOfRes = 1;
}
}
} }
} #endif
#endif #endif
// save selectivity value for column consisted of all null values // save selectivity value for column consisted of all null values
if (numOfElems == 0) { if (numOfElems == 0) {
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
} }
SET_VAL(pResInfo, numOfElems, 1);
// SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3266,8 +3325,8 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { ...@@ -3266,8 +3325,8 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
#if 0 #if 0
int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
// the optimized version only function if all tuples in one block are monotonious increasing or descreasing. // the optimized version only valid if all tuples in one block are monotonious increasing or descreasing.
// this is NOT always works if project operator exists in downstream. // this assumption is NOT always works if project operator exists in downstream.
if (blockDataOrder == TSDB_ORDER_ASC) { if (blockDataOrder == TSDB_ORDER_ASC) {
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
......
...@@ -118,8 +118,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -118,8 +118,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version); pRetrieve->version = htobe64(pBlock->info.version);
int32_t actualLen = 0; int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
SStreamRetrieveReq req = { SStreamRetrieveReq req = {
.streamId = pTask->streamId, .streamId = pTask->streamId,
...@@ -200,8 +199,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis ...@@ -200,8 +199,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->numOfCols = htonl(numOfCols); pRetrieve->numOfCols = htonl(numOfCols);
int32_t actualLen = 0; int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp); actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen); ASSERT(actualLen <= dataStrLen);
taosArrayPush(pReq->dataLen, &actualLen); taosArrayPush(pReq->dataLen, &actualLen);
......
...@@ -586,7 +586,7 @@ static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msg ...@@ -586,7 +586,7 @@ static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msg
int32_t end = 0; int32_t end = 0;
int32_t remainSize = 0; int32_t remainSize = 0;
static int64_t lostLine = 0; static int64_t lostLine = 0;
char tmpBuf[40] = {0}; char tmpBuf[128] = {0};
int32_t tmpBufLen = 0; int32_t tmpBufLen = 0;
if (pLogBuf == NULL || pLogBuf->stop) return -1; if (pLogBuf == NULL || pLogBuf->stop) return -1;
......
...@@ -733,6 +733,7 @@ sql select avg(f1),count(tb1.*),sum(f1),stddev(f1),LEASTSQUARES(f1,1,1) from tb1 ...@@ -733,6 +733,7 @@ sql select avg(f1),count(tb1.*),sum(f1),stddev(f1),LEASTSQUARES(f1,1,1) from tb1
sql select avg(f1),count(tb1.*),sum(f1),stddev(f1),LEASTSQUARES(f1,1,1) from tb1 group by f1 having sum(f1) > 2 order by f1; sql select avg(f1),count(tb1.*),sum(f1),stddev(f1),LEASTSQUARES(f1,1,1) from tb1 group by f1 having sum(f1) > 2 order by f1;
if $rows != 3 then if $rows != 3 then
print expect 3 , actual: $rows
return -1 return -1
endi endi
if $data00 != 2.000000000 then if $data00 != 2.000000000 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册