提交 6fcbcabd 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 338f6eea
......@@ -727,7 +727,11 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
......@@ -738,9 +742,6 @@ SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList,
SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
......@@ -758,9 +759,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
STimeWindowAggSupp* pTwSup);
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList,
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
......@@ -776,7 +776,6 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableListInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap,
......
......@@ -28,18 +28,6 @@ typedef struct SCompSupporter {
int32_t order;
} SCompSupporter;
int32_t getRowNumForMultioutput(STaskAttr* pQueryAttr, bool topBottomQuery, bool stable) {
if (pQueryAttr && (!stable)) {
for (int16_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
// if (pQueryAttr->pExpr1[i].base. == FUNCTION_TOP || pQueryAttr->pExpr1[i].base.functionId == FUNCTION_BOTTOM) {
// return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i;
// }
}
}
return 1;
}
int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
int32_t size = 0;
......@@ -113,35 +101,6 @@ void closeResultRow(SResultRow* pResultRow) {
pResultRow->closed = true;
}
void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow) {
if (pResultRow == NULL) {
return;
}
// the result does not put into the SDiskbasedBuf, ignore it.
if (pResultRow->pageId >= 0) {
SFilePage *page = getBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId);
int16_t offset = 0;
for (int32_t i = 0; i < pRuntimeEnv->pQueryAttr->numOfOutput; ++i) {
struct SResultRowEntryInfo *pEntryInfo = NULL;//pResultRow->pEntryInfo[i];
// int16_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resSchema.bytes;
// char * s = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResultRow->offset, offset);
// memset(s, 0, size);
// offset += size;
cleanupResultRowEntry(pEntryInfo);
}
}
pResultRow->numOfRows = 0;
pResultRow->pageId = -1;
pResultRow->offset = -1;
pResultRow->closed = false;
pResultRow->win = TSWINDOW_INITIALIZER;
}
// TODO refactor: use macro
SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, const int32_t* offset) {
assert(index >= 0 && offset != NULL);
......
......@@ -2896,34 +2896,38 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* pSources, SSDataBlock* pBlock,
SExecTaskInfo* pTaskInfo) {
SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo) {
size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
size_t numOfSources = LIST_LENGTH(pSources);
pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) {
goto _error;
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < numOfSources; ++i) {
SNodeListNode* pNode = nodesListGetNode((SNodeList*)pSources, i);
SNodeListNode* pNode = nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
taosArrayPush(pInfo->pSources, pNode);
}
int32_t code = initDataSource(numOfSources, pInfo);
return initDataSource(numOfSources, pInfo);
}
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo) {
SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
int32_t code = initExchangeOperator(pExNode, pInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->pResult = pBlock;
pInfo->seqLoadData = false;
pInfo->pTransporter = pTransporter;
pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
tsem_init(&pInfo->ready, 0, 0);
pOperator->name = "ExchangeOperator";
......@@ -2931,17 +2935,16 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* p
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pBlock->info.numOfCols;
pOperator->numOfExprs = pInfo->pResult->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
destroyExchangeOperatorInfo, NULL, NULL, NULL);
pInfo->pTransporter = pTransporter;
return pOperator;
_error:
if (pInfo != NULL) {
destroyExchangeOperatorInfo(pInfo, numOfSources);
destroyExchangeOperatorInfo(pInfo, LIST_LENGTH(pExNode->pSrcEndPoints));
}
taosMemoryFreeClear(pInfo);
......@@ -4394,9 +4397,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, pExchange->pSrcEndPoints, pResBlock, pTaskInfo);
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
......@@ -4424,41 +4425,16 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan;
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t numOfOutputCols = 0;
SArray* colList =
extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList,
pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
return pOperator;
return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo, pTagCond);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pScanPhyNode->pScanPseudoCols, NULL, &num);
int32_t numOfOutputCols = 0;
SArray* colList = extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, pTaskInfo,
COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator =
createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableListInfo, pTaskInfo);
return pOperator;
return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
} else {
ASSERT(0);
}
......@@ -5016,45 +4992,6 @@ _complete:
return code;
}
void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo) {
const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512);
// the minimum number of rows for projection query
const int32_t MIN_ROWS_FOR_PRJ_QUERY = 8192;
const int32_t DEFAULT_MIN_ROWS = 4096;
const float THRESHOLD_RATIO = 0.85f;
// if (isProjQuery(pQueryAttr)) {
// int32_t numOfRes = DEFAULT_RESULT_MSG_SIZE / pQueryAttr->resultRowSize;
// if (numOfRes < MIN_ROWS_FOR_PRJ_QUERY) {
// numOfRes = MIN_ROWS_FOR_PRJ_QUERY;
// }
//
// pResultInfo->capacity = numOfRes;
// } else { // in case of non-prj query, a smaller output buffer will be used.
// pResultInfo->capacity = DEFAULT_MIN_ROWS;
// }
pResultInfo->threshold = (int32_t)(pResultInfo->capacity * THRESHOLD_RATIO);
pResultInfo->totalRows = 0;
}
// TODO refactor
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) {
if (pFilter == NULL || numOfFilters == 0) {
return;
}
for (int32_t i = 0; i < numOfFilters; i++) {
if (pFilter[i].filterstr && pFilter[i].pz) {
taosMemoryFree((void*)(pFilter[i].pz));
}
}
taosMemoryFree(pFilter);
}
static void doDestroyTableList(STableListInfo* pTableqinfoList) {
taosArrayDestroy(pTableqinfoList->pTableList);
taosHashCleanup(pTableqinfoList->map);
......
......@@ -36,11 +36,6 @@
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
typedef struct SWindowPosition {
int32_t pageId;
int32_t rowId;
} SWindowPosition;
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
const char* dbName);
......@@ -1430,34 +1425,38 @@ int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbT
return numOfRows;
}
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList,
SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId) {
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo) {
SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
goto _error;
}
pInfo->accountId = accountId;
pInfo->showRewrite = showRewrite;
SScanPhysiNode* pScanNode = &pScanPhyNode->scan;
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t num = 0;
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, pTaskInfo, COL_MATCH_FROM_COL_ID);
pInfo->accountId = pScanPhyNode->accountId;
pInfo->showRewrite = pScanPhyNode->showRewrite;
pInfo->pRes = pResBlock;
pInfo->pCondition = pCondition;
pInfo->pCondition = pScanNode->node.pConditions;
pInfo->scanCols = colList;
initResultSizeInfo(pOperator, 4096);
tNameAssign(&pInfo->name, pName);
tNameAssign(&pInfo->name, &pScanNode->tableName);
const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
pInfo->readHandle = *(SReadHandle*)readHandle;
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
} else {
tsem_init(&pInfo->ready, 0, 0);
pInfo->epSet = epset;
pInfo->epSet = pScanPhyNode->mgmtEpSet;
pInfo->readHandle = *(SReadHandle*)readHandle;
}
......@@ -1467,11 +1466,18 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
_error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
......@@ -1626,18 +1632,24 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput,
SSDataBlock* pResBlock, SArray* pColMatchInfo, STableListInfo* pTableListInfo,
SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;
int32_t numOfExprs = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
int32_t num = 0;
SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, pTaskInfo, COL_MATCH_FROM_COL_ID);
pInfo->pTableList = pTableListInfo;
pInfo->pColMatchInfo = pColMatchInfo;
pInfo->pRes = pResBlock;
pInfo->pColMatchInfo = colList;
pInfo->pRes = createResDataBlock(pDescNode);;
pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0;
pOperator->name = "TagScanOperator";
......@@ -1645,8 +1657,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pE
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = pExpr;
pOperator->numOfExprs = numOfOutput;
pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfExprs;
pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(pOperator, 4096);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdatablock.h"
#include "executorimpl.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册