提交 d1eb6f83 编写于 作者: H Haojun Liao

[td-2895] refactor.

上级 ac08e23b
......@@ -33,7 +33,6 @@
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step))
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
......@@ -54,14 +53,6 @@ typedef enum SResultTsInterpType {
RESULT_ROW_END_INTERP = 2,
} SResultTsInterpType;
typedef struct {
int32_t status; // query status
TSKEY lastKey; // the lastKey value before query executed
STimeWindow w; // whole query time window
int32_t windowIndex; // index of active time window result for interval query
STSCursor cur;
} SQueryStatusInfo;
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
uint32_t v = rand();
......@@ -695,13 +686,14 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
}
}
static UNUSED_FUNC void updateResultRowIndex(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, bool ascQuery, bool timeWindowInterpo) {
if ((pTableQueryInfo->lastKey > pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey < pTableQueryInfo->win.ekey && (!ascQuery))) {
static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, STimeWindow* pWindow, TSKEY lastKey,
bool ascQuery, bool timeWindowInterpo) {
if ((lastKey > pWindow->ekey && ascQuery) || (lastKey < pWindow->ekey && (!ascQuery))) {
closeAllResultRows(pResultRowInfo);
pResultRowInfo->curIndex = pResultRowInfo->size - 1;
} else {
int32_t step = ascQuery? 1:-1;
doUpdateResultRowIndex(pResultRowInfo, pTableQueryInfo->lastKey - step, ascQuery, timeWindowInterpo);
int32_t step = ascQuery ? 1 : -1;
doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, timeWindowInterpo);
}
}
......@@ -1179,13 +1171,16 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
}
}
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
STableIntervalOperatorInfo* pInfo, SSDataBlock* pSDataBlock, int32_t groupId) {
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t groupId) {
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
int32_t numOfOutput = pOperatorInfo->numOfOutput;
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
bool ascQuery = QUERY_IS_ASC_QUERY(pQuery);
int32_t prevIndex = curTimeWindowIndex(pResultRowInfo);
TSKEY* tsCols = NULL;
......@@ -1196,11 +1191,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
}
int32_t startPos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : (pSDataBlock->info.rows - 1);
int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(pQuery, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows);
STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQuery);
bool masterScan = (pRuntimeEnv->scanFlag == MASTER_SCAN) ? true : false;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, pInfo->pCtx,
......@@ -1277,9 +1272,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
}
if (pQuery->timeWindowInterpo) {
int32_t rowIndex = QUERY_IS_ASC_QUERY(pQuery)? pSDataBlock->info.rows-1:0;
int32_t rowIndex = ascQuery? (pSDataBlock->info.rows-1):0;
saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex);
}
updateResultRowInfoActiveIndex(pResultRowInfo, &pQuery->window, pQuery->current->lastKey, ascQuery, pQuery->timeWindowInterpo);
}
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
......@@ -3174,7 +3171,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t groupIndex,
TSKEY nextKey) {
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
// lastKey needs to be updated
pTableQueryInfo->lastKey = nextKey;
......@@ -3183,8 +3180,8 @@ void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, i
}
int64_t uid = 0;
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, &pInfo->resultRowInfo, (char *)&groupIndex,
sizeof(groupIndex), true, uid);
SResultRow* pResultRow =
doPrepareResultRowFromKey(pRuntimeEnv, &pInfo->resultRowInfo, (char*)&groupIndex, sizeof(groupIndex), true, uid);
assert (pResultRow != NULL);
/*
......@@ -3356,16 +3353,6 @@ void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) {
pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
}
bool requireTimestamp(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; i++) {
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if ((aAggs[functionId].status & TSDB_FUNCSTATE_NEED_TS) != 0) {
return true;
}
}
return false;
}
/**
* copyToOutputBuf support copy data in ascending/descending order
* For interval query of both super table and table, copy the data in ascending order, since the output results are
......@@ -3801,49 +3788,6 @@ void queryCostStatis(SQInfo *pQInfo) {
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo);
static UNUSED_FUNC void setTableQueryHandle(SQueryRuntimeEnv* pRuntimeEnv, int32_t tableIndex) {
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t numOfGroup = (int32_t) GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
STableQueryInfo* pCheckInfo = NULL;
if (numOfGroup == 1) {
SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0);
pCheckInfo = taosArrayGetP(group, tableIndex);
} else {
assert(numOfGroup == pRuntimeEnv->tableqinfoGroupInfo.numOfTables);
SArray *group = GET_TABLEGROUP(pRuntimeEnv, tableIndex);
pCheckInfo = taosArrayGetP(group, 0);
}
// handle the first table
STsdbQueryCond cond = {
.twindow = {pCheckInfo->lastKey, pCheckInfo->win.ekey},
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
.loadExternalRows = isPointInterpoQuery(pQuery),
};
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
SArray *tx = taosArrayInit(1, sizeof(STableKeyInfo));
STableKeyInfo info = {.pTable = pCheckInfo->pTable, .lastKey = pCheckInfo->lastKey};
taosArrayPush(tx, &info);
taosArrayPush(g1, &tx);
STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1};
if (pRuntimeEnv->pQueryHandle == NULL) {
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &gp, pRuntimeEnv->qinfo, &pQuery->memRef);
} else {
tsdbResetQueryHandleForNewTable(pRuntimeEnv->pQueryHandle, &cond, &gp);
}
taosArrayDestroy(tx);
taosArrayDestroy(g1);
}
static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
......@@ -4437,7 +4381,7 @@ static SSDataBlock* doSTableAggregate(void* param) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
TSKEY k = (pQuery->order.order == TSDB_ORDER_ASC)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1;
TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1;
setExecutionContext(pRuntimeEnv, pInfo, pOperator->numOfOutput, pQuery->current->groupIndex, k);
doAggregateImpl(pOperator, pQuery->window.skey, pInfo->pCtx, pBlock);
}
......@@ -4593,7 +4537,7 @@ static SSDataBlock* doIntervalAgg(void* param) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order);
hashIntervalAgg(pOperator, &pIntervalInfo->resultRowInfo, pIntervalInfo, pBlock, 0);
hashIntervalAgg(pOperator, &pIntervalInfo->resultRowInfo, pBlock, 0);
}
// restore the value
......@@ -4651,7 +4595,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order);
setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pIntervalInfo, pBlock, pTableQueryInfo->groupIndex);
hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex);
}
pOperator->status = OP_RES_TO_RETURN;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册