未验证 提交 562a736d 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18255 from taosdata/fix/liao_cov

refactor: do some internal refactor.
...@@ -163,7 +163,7 @@ typedef struct { ...@@ -163,7 +163,7 @@ typedef struct {
SArray* pStopInfo; SArray* pStopInfo;
} STaskStopInfo; } STaskStopInfo;
typedef struct SExecTaskInfo { struct SExecTaskInfo {
STaskIdInfo id; STaskIdInfo id;
uint32_t status; uint32_t status;
STimeWindow window; STimeWindow window;
...@@ -182,7 +182,7 @@ typedef struct SExecTaskInfo { ...@@ -182,7 +182,7 @@ typedef struct SExecTaskInfo {
struct SOperatorInfo* pRoot; struct SOperatorInfo* pRoot;
SLocalFetch localFetch; SLocalFetch localFetch;
STaskStopInfo stopInfo; STaskStopInfo stopInfo;
} SExecTaskInfo; };
enum { enum {
OP_NOT_OPENED = 0x0, OP_NOT_OPENED = 0x0,
...@@ -315,37 +315,39 @@ typedef struct STableMetaCacheInfo { ...@@ -315,37 +315,39 @@ typedef struct STableMetaCacheInfo {
uint64_t cacheHit; uint64_t cacheHit;
} STableMetaCacheInfo; } STableMetaCacheInfo;
typedef struct STableScanInfo { typedef struct STableScanBase {
STsdbReader* dataReader; STsdbReader* dataReader;
SReadHandle readHandle;
SLimitInfo limitInfo;
SFileBlockLoadRecorder readRecorder; SFileBlockLoadRecorder readRecorder;
SScanInfo scanInfo; SQueryTableDataCond cond;
int32_t scanTimes; SAggOptrPushDownInfo pdInfo;
SSDataBlock* pResBlock;
SColMatchInfo matchInfo; SColMatchInfo matchInfo;
SReadHandle readHandle;
SExprSupp pseudoSup; SExprSupp pseudoSup;
SQueryTableDataCond cond; STableMetaCacheInfo metaCache;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag; int32_t dataBlockLoadFlag;
SLimitInfo limitInfo;
} STableScanBase;
typedef struct STableScanInfo {
STableScanBase base;
SScanInfo scanInfo;
int32_t scanTimes;
SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
int32_t currentGroupId; int32_t currentGroupId;
int32_t currentTable; int32_t currentTable;
int8_t scanMode; int8_t scanMode;
SAggOptrPushDownInfo pdInfo;
int8_t assignBlockUid; int8_t assignBlockUid;
STableMetaCacheInfo metaCache;
} STableScanInfo; } STableScanInfo;
typedef struct STableMergeScanInfo { typedef struct STableMergeScanInfo {
STableListInfo* tableListInfo;
int32_t tableStartIndex; int32_t tableStartIndex;
int32_t tableEndIndex; int32_t tableEndIndex;
bool hasGroupId; bool hasGroupId;
uint64_t groupId; uint64_t groupId;
SArray* queryConds; // array of queryTableDataCond SArray* queryConds; // array of queryTableDataCond
STsdbReader* pReader; STableScanBase base;
SReadHandle readHandle;
int32_t bufPageSize; int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo; SArray* pSortInfo;
...@@ -354,27 +356,12 @@ typedef struct STableMergeScanInfo { ...@@ -354,27 +356,12 @@ typedef struct STableMergeScanInfo {
int64_t startTs; // sort start time int64_t startTs; // sort start time
SArray* sortSourceParams; SArray* sortSourceParams;
SLimitInfo limitInfo; SLimitInfo limitInfo;
SFileBlockLoadRecorder readRecorder;
int64_t numOfRows; int64_t numOfRows;
SScanInfo scanInfo; SScanInfo scanInfo;
int32_t scanTimes; int32_t scanTimes;
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo;
int32_t* rowEntryInfoOffset;
SExprInfo* pExpr;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
SColMatchInfo matchInfo; SSampleExecInfo sample; // sample execution info
int32_t numOfOutput; SSortExecInfo sortExecInfo;
SExprSupp pseudoSup;
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
SInterval interval;
SSampleExecInfo sample; // sample execution info
SSortExecInfo sortExecInfo;
} STableMergeScanInfo; } STableMergeScanInfo;
typedef struct STagScanInfo { typedef struct STagScanInfo {
...@@ -387,17 +374,17 @@ typedef struct STagScanInfo { ...@@ -387,17 +374,17 @@ typedef struct STagScanInfo {
} STagScanInfo; } STagScanInfo;
typedef struct SLastrowScanInfo { typedef struct SLastrowScanInfo {
SSDataBlock* pRes; SSDataBlock* pRes;
SReadHandle readHandle; SReadHandle readHandle;
void* pLastrowReader; void* pLastrowReader;
SColMatchInfo matchInfo; SColMatchInfo matchInfo;
int32_t* pSlotIds; int32_t* pSlotIds;
SExprSupp pseudoExprSup; SExprSupp pseudoExprSup;
int32_t retrieveType; int32_t retrieveType;
int32_t currentGroupIndex; int32_t currentGroupIndex;
SSDataBlock* pBufferredRes; SSDataBlock* pBufferredRes;
SArray* pUidList; SArray* pUidList;
int32_t indexOfBufferedRes; int32_t indexOfBufferedRes;
} SLastrowScanInfo; } SLastrowScanInfo;
typedef enum EStreamScanMode { typedef enum EStreamScanMode {
...@@ -414,13 +401,6 @@ enum { ...@@ -414,13 +401,6 @@ enum {
PROJECT_RETRIEVE_DONE = 0x2, PROJECT_RETRIEVE_DONE = 0x2,
}; };
typedef struct SCatchSupporter {
SHashObj* pWindowHashTable; // quick locate the window object for each window
SDiskbasedBuf* pDataBuf; // buffer based on blocked-wised disk file
int32_t keySize;
int64_t* pKeyBuf;
} SCatchSupporter;
typedef struct SStreamAggSupporter { typedef struct SStreamAggSupporter {
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
SSDataBlock* pScanBlock; SSDataBlock* pScanBlock;
...@@ -1042,8 +1022,8 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos ...@@ -1042,8 +1022,8 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo, SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
......
...@@ -1026,8 +1026,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1026,8 +1026,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->dataReader); tsdbReaderClose(pTSInfo->base.dataReader);
pTSInfo->dataReader = NULL; pTSInfo->base.dataReader = NULL;
#if 0 #if 0
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) && if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
pInfo->tqReader->pWalReader->curVersion != pOffset->version) { pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
...@@ -1079,23 +1079,23 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1079,23 +1079,23 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
// TODO after dropping table, table may not found // TODO after dropping table, table may not found
ASSERT(found); ASSERT(found);
if (pTableScanInfo->dataReader == NULL) { if (pTableScanInfo->base.dataReader == NULL) {
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
int32_t num = tableListGetSize(pTaskInfo->pTableInfoList); int32_t num = tableListGetSize(pTaskInfo->pTableInfoList);
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num, if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num,
&pTableScanInfo->dataReader, NULL) < 0 || &pTableScanInfo->base.dataReader, NULL) < 0 ||
pTableScanInfo->dataReader == NULL) { pTableScanInfo->base.dataReader == NULL) {
ASSERT(0); ASSERT(0);
} }
} }
STableKeyInfo tki = {.uid = uid}; STableKeyInfo tki = {.uid = uid};
tsdbSetTableList(pTableScanInfo->dataReader, &tki, 1); tsdbSetTableList(pTableScanInfo->base.dataReader, &tki, 1);
int64_t oldSkey = pTableScanInfo->cond.twindows.skey; int64_t oldSkey = pTableScanInfo->base.cond.twindows.skey;
pTableScanInfo->cond.twindows.skey = ts + 1; pTableScanInfo->base.cond.twindows.skey = ts + 1;
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
pTableScanInfo->cond.twindows.skey = oldSkey; pTableScanInfo->base.cond.twindows.skey = oldSkey;
pTableScanInfo->scanTimes = 0; pTableScanInfo->scanTimes = 0;
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
......
...@@ -1000,12 +1000,6 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc ...@@ -1000,12 +1000,6 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) {
if (pTableQueryInfo == NULL) {
return;
}
}
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
if (status == TASK_NOT_COMPLETED) { if (status == TASK_NOT_COMPLETED) {
pTaskInfo->status = status; pTaskInfo->status = status;
...@@ -1652,55 +1646,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t ...@@ -1652,55 +1646,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
const char* pKey); const char* pKey);
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
size_t size = taosArrayGetSize(groupInfo);
if (size == 0) {
return true;
}
for (int32_t i = 0; i < size; ++i) {
int32_t* index = taosArrayGet(groupInfo, i);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
bool isNull = colDataIsNull(pColInfo, rowIndex, pBlock->info.rows, NULL);
if ((isNull && buf[i] != NULL) || (!isNull && buf[i] == NULL)) {
return false;
}
char* pCell = colDataGetData(pColInfo, rowIndex);
if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
if (varDataLen(pCell) != varDataLen(buf[i])) {
return false;
} else {
if (memcmp(varDataVal(pCell), varDataVal(buf[i]), varDataLen(pCell)) != 0) {
return false;
}
}
} else {
if (memcmp(pCell, buf[i], pColInfo->info.bytes) != 0) {
return false;
}
}
}
return 0;
}
static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) {
int32_t size = (int32_t)taosArrayGetSize(pColumnList);
for (int32_t i = 0; i < size; ++i) {
int32_t* index = taosArrayGet(pColumnList, i);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
char* data = colDataGetData(pColInfo, rowIndex);
memcpy(rowColData[i], data, colDataGetLength(pColInfo, rowIndex));
}
return true;
}
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) { int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
// todo add more information about exchange operation // todo add more information about exchange operation
int32_t type = pOperator->operatorType; int32_t type = pOperator->operatorType;
...@@ -1712,13 +1657,13 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan ...@@ -1712,13 +1657,13 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pTableScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pOperator->info;
*order = pTableScanInfo->cond.order; *order = pTableScanInfo->base.cond.order;
*scanFlag = pTableScanInfo->scanFlag; *scanFlag = pTableScanInfo->base.scanFlag;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) { } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
STableMergeScanInfo* pTableScanInfo = pOperator->info; STableMergeScanInfo* pTableScanInfo = pOperator->info;
*order = pTableScanInfo->cond.order; *order = pTableScanInfo->base.cond.order;
*scanFlag = pTableScanInfo->scanFlag; *scanFlag = pTableScanInfo->base.scanFlag;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
...@@ -2365,8 +2310,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN ...@@ -2365,8 +2310,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pTableScanInfo = downstream->info; STableScanInfo* pTableScanInfo = downstream->info;
pTableScanInfo->pdInfo.pExprSup = &pOperator->exprSupp; pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
pTableScanInfo->pdInfo.pAggSup = &pInfo->aggSup; pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
} }
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
...@@ -2731,7 +2676,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -2731,7 +2676,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} }
STableScanInfo* pScanInfo = pOperator->info; STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
...@@ -2749,14 +2694,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -2749,14 +2694,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo); pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
if (NULL == pOperator) { if (NULL == pOperator) {
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
} }
STableScanInfo* pScanInfo = pOperator->info; STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode, pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
pTaskInfo); pTaskInfo);
......
...@@ -2547,8 +2547,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode ...@@ -2547,8 +2547,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
pScanInfo->cond.twindows = pInfo->win; pScanInfo->base.cond.twindows = pInfo->win;
pScanInfo->cond.type = TIMEWINDOW_RANGE_EXTERNAL; pScanInfo->base.cond.type = TIMEWINDOW_RANGE_EXTERNAL;
} }
setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo, setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册