提交 b4875268 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 459bed5f
...@@ -244,7 +244,7 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColIn ...@@ -244,7 +244,7 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColIn
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId); SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId);
SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index); SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index);
void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress); int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols);
const char* blockDecode(SSDataBlock* pBlock, const char* pData); const char* blockDecode(SSDataBlock* pBlock, const char* pData);
void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag); void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag);
......
...@@ -442,8 +442,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) { ...@@ -442,8 +442,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
(*pRsp)->numOfRows = htonl(pBlock->info.rows); (*pRsp)->numOfRows = htonl(pBlock->info.rows);
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS); (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
int32_t len = 0; int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS);
blockEncode(pBlock, (*pRsp)->data, &len, SHOW_VARIABLES_RESULT_COLS, false);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp)); ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
......
...@@ -2197,7 +2197,9 @@ char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { ...@@ -2197,7 +2197,9 @@ char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
return rname.ctbShortName; return rname.ctbShortName;
} }
void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress) { int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
int32_t dataLen = 0;
// todo extract method // todo extract method
int32_t* version = (int32_t*)data; int32_t* version = (int32_t*)data;
*version = 1; *version = 1;
...@@ -2238,7 +2240,7 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_ ...@@ -2238,7 +2240,7 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_
int32_t* colSizes = (int32_t*)data; int32_t* colSizes = (int32_t*)data;
data += numOfCols * sizeof(int32_t); data += numOfCols * sizeof(int32_t);
*dataLen = blockDataGetSerialMetaSize(numOfCols); dataLen = blockDataGetSerialMetaSize(numOfCols);
int32_t numOfRows = pBlock->info.rows; int32_t numOfRows = pBlock->info.rows;
for (int32_t col = 0; col < numOfCols; ++col) { for (int32_t col = 0; col < numOfCols; ++col) {
...@@ -2255,26 +2257,23 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_ ...@@ -2255,26 +2257,23 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_
} }
data += metaSize; data += metaSize;
(*dataLen) += metaSize; dataLen += metaSize;
if (needCompress) { colSizes[col] = colDataGetLength(pColRes, numOfRows);
colSizes[col] = blockCompressColData(pColRes, numOfRows, data, needCompress); dataLen += colSizes[col];
data += colSizes[col]; memmove(data, pColRes->pData, colSizes[col]);
(*dataLen) += colSizes[col]; data += colSizes[col];
} else {
colSizes[col] = colDataGetLength(pColRes, numOfRows);
(*dataLen) += colSizes[col];
memmove(data, pColRes->pData, colSizes[col]);
data += colSizes[col];
}
colSizes[col] = htonl(colSizes[col]); colSizes[col] = htonl(colSizes[col]);
} }
*actualLen = *dataLen; *actualLen = dataLen;
*groupId = pBlock->info.groupId; *groupId = pBlock->info.groupId;
ASSERT(*dataLen > 0); ASSERT(dataLen > 0);
uDebug("build data block, actualLen:%d, rows:%d, cols:%d", *dataLen, *rows, *cols);
uDebug("build data block, actualLen:%d, rows:%d, cols:%d", dataLen, *rows, *cols);
return dataLen;
} }
const char* blockDecode(SSDataBlock* pBlock, const char* pData) { const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
......
...@@ -307,8 +307,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -307,8 +307,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
pStart += sizeof(SSysTableSchema); pStart += sizeof(SSysTableSchema);
} }
int32_t len = 0; int32_t len = blockEncode(pBlock, pStart, numOfCols);
blockEncode(pBlock, pStart, &len, numOfCols, false);
pRsp->numOfRows = htonl(pBlock->info.rows); pRsp->numOfRows = htonl(pBlock->info.rows);
pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision
......
...@@ -303,8 +303,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { ...@@ -303,8 +303,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
pStart += sizeof(SSysTableSchema); pStart += sizeof(SSysTableSchema);
} }
int32_t len = 0; int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns);
blockEncode(pBlock, pStart, &len, pShow->pMeta->numOfColumns, false);
} }
pRsp->numOfRows = htonl(rowsRead); pRsp->numOfRows = htonl(rowsRead);
......
...@@ -27,9 +27,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t ...@@ -27,9 +27,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
pRetrieve->completed = 1; pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pBlock->info.rows); pRetrieve->numOfRows = htonl(pBlock->info.rows);
// TODO enable compress int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
int32_t actualLen = 0;
blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp); actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen); ASSERT(actualLen <= dataStrLen);
taosArrayPush(pRsp->blockDataLen, &actualLen); taosArrayPush(pRsp->blockDataLen, &actualLen);
......
...@@ -39,8 +39,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe ...@@ -39,8 +39,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
(*pRsp)->numOfRows = htonl(pBlock->info.rows); (*pRsp)->numOfRows = htonl(pBlock->info.rows);
(*pRsp)->numOfCols = htonl(numOfCols); (*pRsp)->numOfCols = htonl(numOfCols);
int32_t len = 0; int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols);
blockEncode(pBlock, (*pRsp)->data, &len, numOfCols, false);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp)); ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -1610,8 +1610,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { ...@@ -1610,8 +1610,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
rsp->completed = 1; rsp->completed = 1;
rsp->numOfRows = htonl(rowNum); rsp->numOfRows = htonl(rowNum);
int32_t len = 0; int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock));
blockEncode(pBlock, rsp->data, &len, taosArrayGetSize(pBlock->pDataBlock), 0);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp)); ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
rsp->compLen = htonl(len); rsp->compLen = htonl(len);
......
...@@ -842,10 +842,8 @@ typedef struct SJoinOperatorInfo { ...@@ -842,10 +842,8 @@ typedef struct SJoinOperatorInfo {
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
void doDestroyExchangeOperatorInfo(void* param); SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
__optr_close_fn_t closeFn, __optr_explain_fn_t explain);
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_explain_fn_t explain);
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
...@@ -881,6 +879,8 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter ...@@ -881,6 +879,8 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
void doDestroyExchangeOperatorInfo(void* param);
void doSetOperatorCompleted(SOperatorInfo* pOperator); void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr,
......
...@@ -102,7 +102,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -102,7 +102,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, NULL, destroyLastrowScanOperator, NULL); createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, destroyLastrowScanOperator, NULL);
pOperator->cost.openCost = 0; pOperator->cost.openCost = 0;
return pOperator; return pOperator;
......
...@@ -76,7 +76,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn ...@@ -76,7 +76,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
pEntry->dataLen = 0; pEntry->dataLen = 0;
pBuf->useSize = sizeof(SDataCacheEntry); pBuf->useSize = sizeof(SDataCacheEntry);
blockEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed); pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols);
ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
......
...@@ -1106,3 +1106,24 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1106,3 +1106,24 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} }
return 0; return 0;
} }
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
assert(pMsg->info.ahandle != NULL);
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};
if (pMsg->contLen > 0) {
buf.pData = taosMemoryCalloc(1, pMsg->contLen);
if (buf.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
} else {
memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
}
}
pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pSendInfo);
}
\ No newline at end of file
...@@ -446,7 +446,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* ...@@ -446,7 +446,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL, destroyGroupOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
...@@ -831,7 +831,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition ...@@ -831,7 +831,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
...@@ -1111,7 +1111,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr ...@@ -1111,7 +1111,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL,
destroyStreamPartitionOperatorInfo, NULL); destroyStreamPartitionOperatorInfo, NULL);
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup); initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
......
...@@ -121,8 +121,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t ...@@ -121,8 +121,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pInfo->inputOrder = TSDB_ORDER_DESC; pInfo->inputOrder = TSDB_ORDER_DESC;
} }
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, NULL);
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL);
code = appendDownstream(pOperator, pDownstream, numOfDownstream); code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
...@@ -372,13 +371,13 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) ...@@ -372,13 +371,13 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
if (leftTs == rightTs) { if (leftTs == rightTs) {
mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows); mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows);
} else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) { } else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) {
pJoinInfo->leftPos += 1; pJoinInfo->leftPos += 1;
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
continue; continue;
} }
} else if (asc && leftTs > rightTs || !asc && leftTs < rightTs) { } else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) {
pJoinInfo->rightPos += 1; pJoinInfo->rightPos += 1;
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
continue; continue;
......
...@@ -104,7 +104,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -104,7 +104,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL,
destroyProjectOperatorInfo, NULL); destroyProjectOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
...@@ -406,8 +406,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -406,8 +406,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, NULL);
destroyIndefinitOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -962,7 +962,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -962,7 +962,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
} }
taosLRUCacheSetStrictCapacity(pInfo->metaCache.pTableMetaEntryCache, false); taosLRUCacheSetStrictCapacity(pInfo->metaCache.pTableMetaEntryCache, false);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
getTableScannerExecInfo); getTableScannerExecInfo);
// for non-blocking operator, the open cost is always 0 // for non-blocking operator, the open cost is always 0
...@@ -993,7 +993,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* ...@@ -993,7 +993,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL);
return pOperator; return pOperator;
} }
...@@ -1155,8 +1155,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi ...@@ -1155,8 +1155,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL);
createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, destroyBlockDistScanOperatorInfo, NULL);
return pOperator; return pOperator;
_error: _error:
...@@ -2372,7 +2371,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT ...@@ -2372,7 +2371,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL); pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, NULL);
return pOperator; return pOperator;
_end: _end:
...@@ -2565,7 +2564,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2565,7 +2564,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, NULL);
return pOperator; return pOperator;
...@@ -4207,7 +4206,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan ...@@ -4207,7 +4206,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, NULL);
return pOperator; return pOperator;
_error: _error:
...@@ -4346,7 +4345,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -4346,7 +4345,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, NULL);
return pOperator; return pOperator;
...@@ -4861,7 +4860,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -4861,7 +4860,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL,
destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo); destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo);
pOperator->cost.openCost = 0; pOperator->cost.openCost = 0;
return pOperator; return pOperator;
......
...@@ -67,7 +67,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* ...@@ -67,7 +67,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
// TODO dynamic set the available sort buffer // TODO dynamic set the available sort buffer
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroySortOperatorInfo, getExplainExecInfo); createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, getExplainExecInfo);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -517,7 +517,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort ...@@ -517,7 +517,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo,
getGroupSortExplainExecInfo); getGroupSortExplainExecInfo);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
...@@ -781,7 +781,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size ...@@ -781,7 +781,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL, NULL, pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL,
destroyMultiwayMergeOperatorInfo, getMultiwayMergeExplainExecInfo); destroyMultiwayMergeOperatorInfo, getMultiwayMergeExplainExecInfo);
code = appendDownstream(pOperator, downStreams, numStreams); code = appendDownstream(pOperator, downStreams, numStreams);
......
...@@ -1698,7 +1698,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi ...@@ -1698,7 +1698,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, NULL, destroyStreamFillOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -1785,7 +1785,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh ...@@ -1785,7 +1785,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL, destroyIntervalOperatorInfo, NULL); createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2565,7 +2565,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode ...@@ -2565,7 +2565,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyTimeSliceOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, NULL);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
...@@ -2641,7 +2641,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi ...@@ -2641,7 +2641,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo, NULL); createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2718,7 +2718,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW ...@@ -2718,7 +2718,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, destroySWindowOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, NULL);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -3403,7 +3403,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3403,7 +3403,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, NULL); createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, NULL);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
} }
...@@ -4196,7 +4196,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -4196,7 +4196,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL,
destroyStreamSessionAggOperatorInfo, NULL); destroyStreamSessionAggOperatorInfo, NULL);
if (downstream) { if (downstream) {
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType, initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
...@@ -4342,7 +4342,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream ...@@ -4342,7 +4342,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
blockDataEnsureCapacity(pInfo->pUpdateRes, 128); blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pOperator->name = "StreamSessionSemiAggOperator"; pOperator->name = "StreamSessionSemiAggOperator";
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL,
destroyStreamSessionAggOperatorInfo, NULL); destroyStreamSessionAggOperatorInfo, NULL);
} }
...@@ -4713,7 +4713,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4713,7 +4713,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL, destroyStreamStateOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, NULL);
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType, initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
pInfo->primaryTsIndex); pInfo->primaryTsIndex);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
...@@ -4995,7 +4995,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -4995,7 +4995,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->info = miaInfo; pOperator->info = miaInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL, destroyMAIOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -5307,7 +5307,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge ...@@ -5307,7 +5307,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
pOperator->info = pMergeIntervalInfo; pOperator->info = pMergeIntervalInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL, destroyMergeIntervalOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -5540,7 +5540,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5540,7 +5540,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL,
destroyStreamFinalIntervalOperatorInfo, NULL); destroyStreamFinalIntervalOperatorInfo, NULL);
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
......
...@@ -118,8 +118,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -118,8 +118,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version); pRetrieve->version = htobe64(pBlock->info.version);
int32_t actualLen = 0; int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
SStreamRetrieveReq req = { SStreamRetrieveReq req = {
.streamId = pTask->streamId, .streamId = pTask->streamId,
...@@ -200,8 +199,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis ...@@ -200,8 +199,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->numOfCols = htonl(numOfCols); pRetrieve->numOfCols = htonl(numOfCols);
int32_t actualLen = 0; int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp); actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen); ASSERT(actualLen <= dataStrLen);
taosArrayPush(pReq->dataLen, &actualLen); taosArrayPush(pReq->dataLen, &actualLen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册