提交 875989aa 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 2055879d
...@@ -163,7 +163,7 @@ typedef struct { ...@@ -163,7 +163,7 @@ typedef struct {
SArray* pStopInfo; SArray* pStopInfo;
} STaskStopInfo; } STaskStopInfo;
typedef struct SExecTaskInfo { struct SExecTaskInfo {
STaskIdInfo id; STaskIdInfo id;
uint32_t status; uint32_t status;
STimeWindow window; STimeWindow window;
...@@ -182,7 +182,7 @@ typedef struct SExecTaskInfo { ...@@ -182,7 +182,7 @@ typedef struct SExecTaskInfo {
struct SOperatorInfo* pRoot; struct SOperatorInfo* pRoot;
SLocalFetch localFetch; SLocalFetch localFetch;
STaskStopInfo stopInfo; STaskStopInfo stopInfo;
} SExecTaskInfo; };
enum { enum {
OP_NOT_OPENED = 0x0, OP_NOT_OPENED = 0x0,
...@@ -343,7 +343,6 @@ typedef struct STableScanInfo { ...@@ -343,7 +343,6 @@ typedef struct STableScanInfo {
} STableScanInfo; } STableScanInfo;
typedef struct STableMergeScanInfo { typedef struct STableMergeScanInfo {
STableListInfo* tableListInfo;
int32_t tableStartIndex; int32_t tableStartIndex;
int32_t tableEndIndex; int32_t tableEndIndex;
bool hasGroupId; bool hasGroupId;
...@@ -363,7 +362,6 @@ typedef struct STableMergeScanInfo { ...@@ -363,7 +362,6 @@ typedef struct STableMergeScanInfo {
int32_t scanTimes; int32_t scanTimes;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
int32_t numOfOutput; int32_t numOfOutput;
int32_t dataBlockLoadFlag;
// if the upstream is an interval operator, the interval info is also kept here to get the time // if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded. // window to check if current data block needs to be loaded.
...@@ -1041,8 +1039,8 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos ...@@ -1041,8 +1039,8 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo, SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
......
...@@ -1000,12 +1000,6 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc ...@@ -1000,12 +1000,6 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) {
if (pTableQueryInfo == NULL) {
return;
}
}
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
if (status == TASK_NOT_COMPLETED) { if (status == TASK_NOT_COMPLETED) {
pTaskInfo->status = status; pTaskInfo->status = status;
...@@ -1665,55 +1659,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t ...@@ -1665,55 +1659,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
const char* pKey); const char* pKey);
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
size_t size = taosArrayGetSize(groupInfo);
if (size == 0) {
return true;
}
for (int32_t i = 0; i < size; ++i) {
int32_t* index = taosArrayGet(groupInfo, i);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
bool isNull = colDataIsNull(pColInfo, rowIndex, pBlock->info.rows, NULL);
if ((isNull && buf[i] != NULL) || (!isNull && buf[i] == NULL)) {
return false;
}
char* pCell = colDataGetData(pColInfo, rowIndex);
if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
if (varDataLen(pCell) != varDataLen(buf[i])) {
return false;
} else {
if (memcmp(varDataVal(pCell), varDataVal(buf[i]), varDataLen(pCell)) != 0) {
return false;
}
}
} else {
if (memcmp(pCell, buf[i], pColInfo->info.bytes) != 0) {
return false;
}
}
}
return 0;
}
static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) {
int32_t size = (int32_t)taosArrayGetSize(pColumnList);
for (int32_t i = 0; i < size; ++i) {
int32_t* index = taosArrayGet(pColumnList, i);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
char* data = colDataGetData(pColInfo, rowIndex);
memcpy(rowColData[i], data, colDataGetLength(pColInfo, rowIndex));
}
return true;
}
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) { int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
// todo add more information about exchange operation // todo add more information about exchange operation
int32_t type = pOperator->operatorType; int32_t type = pOperator->operatorType;
...@@ -2762,7 +2707,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -2762,7 +2707,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo); pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
if (NULL == pOperator) { if (NULL == pOperator) {
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
......
...@@ -4392,7 +4392,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { ...@@ -4392,7 +4392,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
void* p = tableListGetInfo(pInfo->tableListInfo, readIdx + pInfo->tableStartIndex); void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->base.readHandle; SReadHandle* pHandle = &pInfo->base.readHandle;
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->base.dataReader, GET_TASKID(pTaskInfo)); int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
...@@ -4486,10 +4486,10 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { ...@@ -4486,10 +4486,10 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
{ {
size_t numOfTables = tableListGetSize(pInfo->tableListInfo); size_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
int32_t i = pInfo->tableStartIndex + 1; int32_t i = pInfo->tableStartIndex + 1;
for (; i < numOfTables; ++i) { for (; i < numOfTables; ++i) {
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->tableListInfo, i); STableKeyInfo* tableKeyInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i);
if (tableKeyInfo->groupId != pInfo->groupId) { if (tableKeyInfo->groupId != pInfo->groupId) {
break; break;
} }
...@@ -4613,7 +4613,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -4613,7 +4613,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
size_t tableListSize = tableListGetSize(pInfo->tableListInfo); size_t tableListSize = tableListGetSize(pTaskInfo->pTableInfoList);
if (!pInfo->hasGroupId) { if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true; pInfo->hasGroupId = true;
...@@ -4622,7 +4622,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -4622,7 +4622,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
pInfo->tableStartIndex = 0; pInfo->tableStartIndex = 0;
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->tableListInfo, pInfo->tableStartIndex))->groupId; pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex))->groupId;
startGroupTableMergeScan(pOperator); startGroupTableMergeScan(pOperator);
} }
...@@ -4641,7 +4641,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -4641,7 +4641,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
break; break;
} }
pInfo->tableStartIndex = pInfo->tableEndIndex + 1; pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
pInfo->groupId = tableListGetInfo(pInfo->tableListInfo, pInfo->tableStartIndex)->groupId; pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId;
startGroupTableMergeScan(pOperator); startGroupTableMergeScan(pOperator);
} }
} }
...@@ -4698,8 +4698,8 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla ...@@ -4698,8 +4698,8 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo, SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -4733,14 +4733,12 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -4733,14 +4733,12 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->interval = extractIntervalInfo(pTableScanNode); pInfo->interval = extractIntervalInfo(pTableScanNode);
pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec(); pInfo->sample.seed = taosGetTimestampSec();
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
pInfo->tableListInfo = pTableListInfo;
pInfo->base.scanFlag = MAIN_SCAN; pInfo->base.scanFlag = MAIN_SCAN;
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册