提交 7153c8e4 编写于 作者: H Haojun Liao

[td-2895] refactor.

上级 b7a27d6d
...@@ -39,8 +39,6 @@ ...@@ -39,8 +39,6 @@
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN) #define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define GET_QINFO_ADDR(x) ((SQInfo *)((char *)(x)-offsetof(SQInfo, runtimeEnv)))
#define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step)) #define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step))
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
...@@ -163,15 +161,12 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, ...@@ -163,15 +161,12 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData,
SDataStatis *pStatis, SExprInfo* pExprInfo); SDataStatis *pStatis, SExprInfo* pExprInfo);
static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex); static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex);
//static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx);
static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo); static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo);
//static void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static bool hasMainOutput(SQuery *pQuery); static bool hasMainOutput(SQuery *pQuery);
//static int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, STableQueryInfo *pTableQueryInfo); //static int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, STableQueryInfo *pTableQueryInfo);
static void releaseQueryBuf(size_t numOfTables); static void releaseQueryBuf(size_t numOfTables);
static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order); static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order);
//static void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type);
static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win); static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win);
static STableIdInfo createTableIdInfo(SQuery* pQuery); static STableIdInfo createTableIdInfo(SQuery* pQuery);
...@@ -409,7 +404,6 @@ static bool isProjQuery(SQuery *pQuery) { ...@@ -409,7 +404,6 @@ static bool isProjQuery(SQuery *pQuery) {
static bool isTsCompQuery(SQuery *pQuery) { return pQuery->pExpr1[0].base.functionId == TSDB_FUNC_TS_COMP; } static bool isTsCompQuery(SQuery *pQuery) { return pQuery->pExpr1[0].base.functionId == TSDB_FUNC_TS_COMP; }
static bool isTopBottomQuery(SQuery *pQuery) { static bool isTopBottomQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pExpr1[i].base.functionId; int32_t functionId = pQuery->pExpr1[i].base.functionId;
...@@ -1438,7 +1432,7 @@ static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResult ...@@ -1438,7 +1432,7 @@ static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResult
d = varDataVal(pData); d = varDataVal(pData);
len = varDataLen(pData); len = varDataLen(pData);
} else if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { } else if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); SQInfo* pQInfo = pRuntimeEnv->qinfo;
qError("QInfo:%p group by not supported on double/float columns, abort", pQInfo); qError("QInfo:%p group by not supported on double/float columns, abort", pQInfo);
return -1; return -1;
} }
...@@ -1798,7 +1792,7 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) { ...@@ -1798,7 +1792,7 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
static void calResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo); static void calResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo);
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables) { static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables) {
qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv)); qDebug("QInfo:%p setup runtime env", pRuntimeEnv->qinfo);
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
pRuntimeEnv->prevGroupId = INT32_MIN; pRuntimeEnv->prevGroupId = INT32_MIN;
...@@ -1817,9 +1811,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1817,9 +1811,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->sasArray = calloc(pQuery->numOfOutput, sizeof(SArithmeticSupport)); pRuntimeEnv->sasArray = calloc(pQuery->numOfOutput, sizeof(SArithmeticSupport));
if (/*pRuntimeEnv->rowCellInfoOffset == NULL || */pRuntimeEnv->sasArray == NULL || if (pRuntimeEnv->sasArray == NULL || pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL ||
pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL || pRuntimeEnv->prevRow == NULL || pRuntimeEnv->prevRow == NULL || pRuntimeEnv->tagVal == NULL) {
pRuntimeEnv->tagVal == NULL) {
goto _clean; goto _clean;
} }
...@@ -1833,7 +1826,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1833,7 +1826,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
*(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN; *(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN;
} }
qDebug("QInfo:%p init runtime completed", GET_QINFO_ADDR(pRuntimeEnv)); qDebug("QInfo:%p init runtime completed", pRuntimeEnv->qinfo);
// group by normal column, sliding window query, interval query are handled by interval query processor // group by normal column, sliding window query, interval query are handled by interval query processor
// interval (down sampling operation) // interval (down sampling operation)
...@@ -1915,7 +1908,7 @@ static void doFreeQueryHandle(SQInfo* pQInfo) { ...@@ -1915,7 +1908,7 @@ static void doFreeQueryHandle(SQInfo* pQInfo) {
static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv); SQInfo* pQInfo = (SQInfo*) pRuntimeEnv->qinfo;
qDebug("QInfo:%p teardown runtime env", pQInfo); qDebug("QInfo:%p teardown runtime env", pQInfo);
...@@ -2135,11 +2128,11 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) { ...@@ -2135,11 +2128,11 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
/* /*
* todo add more parameters to check soon.. * todo add more parameters to check soon..
*/ */
bool colIdCheck(SQuery *pQuery) { bool colIdCheck(SQuery *pQuery, void* qinfo) {
// load data column information is incorrect // load data column information is incorrect
for (int32_t i = 0; i < pQuery->numOfCols - 1; ++i) { for (int32_t i = 0; i < pQuery->numOfCols - 1; ++i) {
if (pQuery->colList[i].colId == pQuery->colList[i + 1].colId) { if (pQuery->colList[i].colId == pQuery->colList[i + 1].colId) {
qError("QInfo:%p invalid data load column for query", GET_QINFO_ADDR(pQuery)); qError("QInfo:%p invalid data load column for query", qinfo);
return false; return false;
} }
} }
...@@ -2220,8 +2213,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo ...@@ -2220,8 +2213,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
if (isPointInterpoQuery(pQuery) && pQuery->interval.interval == 0) { if (isPointInterpoQuery(pQuery) && pQuery->interval.interval == 0) {
if (!QUERY_IS_ASC_QUERY(pQuery)) { if (!QUERY_IS_ASC_QUERY(pQuery)) {
qDebug(msg, GET_QINFO_ADDR(pQuery), "interp", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, qDebug(msg, pQInfo, "interp", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
} }
...@@ -2232,7 +2224,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo ...@@ -2232,7 +2224,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
if (pQuery->interval.interval == 0) { if (pQuery->interval.interval == 0) {
if (onlyFirstQuery(pQuery)) { if (onlyFirstQuery(pQuery)) {
if (!QUERY_IS_ASC_QUERY(pQuery)) { if (!QUERY_IS_ASC_QUERY(pQuery)) {
qDebug(msg, GET_QINFO_ADDR(pQuery), "only-first", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, qDebug(msg, pQInfo, "only-first", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey,
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
...@@ -2242,7 +2234,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo ...@@ -2242,7 +2234,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
pQuery->order.order = TSDB_ORDER_ASC; pQuery->order.order = TSDB_ORDER_ASC;
} else if (onlyLastQuery(pQuery)) { } else if (onlyLastQuery(pQuery)) {
if (QUERY_IS_ASC_QUERY(pQuery)) { if (QUERY_IS_ASC_QUERY(pQuery)) {
qDebug(msg, GET_QINFO_ADDR(pQuery), "only-last", pQuery->order.order, TSDB_ORDER_DESC, pQuery->window.skey, qDebug(msg, pQInfo, "only-last", pQuery->order.order, TSDB_ORDER_DESC, pQuery->window.skey,
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
...@@ -2256,7 +2248,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo ...@@ -2256,7 +2248,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
if (stableQuery) { if (stableQuery) {
if (onlyFirstQuery(pQuery)) { if (onlyFirstQuery(pQuery)) {
if (!QUERY_IS_ASC_QUERY(pQuery)) { if (!QUERY_IS_ASC_QUERY(pQuery)) {
qDebug(msg, GET_QINFO_ADDR(pQuery), "only-first stable", pQuery->order.order, TSDB_ORDER_ASC, qDebug(msg, pQInfo, "only-first stable", pQuery->order.order, TSDB_ORDER_ASC,
pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
...@@ -2266,7 +2258,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo ...@@ -2266,7 +2258,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
pQuery->order.order = TSDB_ORDER_ASC; pQuery->order.order = TSDB_ORDER_ASC;
} else if (onlyLastQuery(pQuery)) { } else if (onlyLastQuery(pQuery)) {
if (QUERY_IS_ASC_QUERY(pQuery)) { if (QUERY_IS_ASC_QUERY(pQuery)) {
qDebug(msg, GET_QINFO_ADDR(pQuery), "only-last stable", pQuery->order.order, TSDB_ORDER_DESC, qDebug(msg, pQInfo, "only-last stable", pQuery->order.order, TSDB_ORDER_DESC,
pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
...@@ -2576,7 +2568,7 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* ...@@ -2576,7 +2568,7 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo*
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
int64_t groupId = pQuery->current->groupIndex; int64_t groupId = pQuery->current->groupIndex;
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); SQInfo* pQInfo = pRuntimeEnv->qinfo;
SQueryCostInfo* pCost = &pQInfo->summary; SQueryCostInfo* pCost = &pQInfo->summary;
if (pRuntimeEnv->pTsBuf != NULL && pQuery->stableQuery) { if (pRuntimeEnv->pTsBuf != NULL && pQuery->stableQuery) {
...@@ -3081,34 +3073,6 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo * ...@@ -3081,34 +3073,6 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
pTableQueryInfo->resInfo.curIndex = pTableQueryInfo->resInfo.size - 1; pTableQueryInfo->resInfo.curIndex = pTableQueryInfo->resInfo.size - 1;
} }
#if 0
static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pWindowResInfo, int32_t order, int32_t numOfOutput) {
SQuery* pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
bool closed = getResultRowStatus(pWindowResInfo, i);
if (!closed) {
continue;
}
SResultRow *pRow = getResultRow(pWindowResInfo, i);
// open/close the specified query for each group result
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functId = pQuery->pExpr1[j].base.functionId;
SResultRowCellInfo* pInfo = getResultCell(pRow, numOfOutput, j);
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) ||
((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) {
pInfo->complete = false;
} else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) {
pInfo->complete = true;
}
}
}
}
#endif
static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) { static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) {
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv)); int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
...@@ -3565,7 +3529,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) { ...@@ -3565,7 +3529,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) {
} }
//static int32_t doCopyToSData(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType) { //static int32_t doCopyToSData(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType) {
// void* qinfo = GET_QINFO_ADDR(pRuntimeEnv); // void* qinfo = pRuntimeEnv->qinfo;
// SQuery *pQuery = pRuntimeEnv->pQuery; // SQuery *pQuery = pRuntimeEnv->pQuery;
// //
// int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); // int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
...@@ -3595,7 +3559,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) { ...@@ -3595,7 +3559,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) {
// //current output space is not enough to accommodate all data of this page, prepare more space // //current output space is not enough to accommodate all data of this page, prepare more space
// if (numOfRowsToCopy > (pRuntimeEnv->resultInfo.capacity - numOfResult)) { // if (numOfRowsToCopy > (pRuntimeEnv->resultInfo.capacity - numOfResult)) {
// int32_t newSize = pRuntimeEnv->resultInfo.capacity + (numOfRowsToCopy - numOfResult); // int32_t newSize = pRuntimeEnv->resultInfo.capacity + (numOfRowsToCopy - numOfResult);
// expandBuffer(pRuntimeEnv, newSize, GET_QINFO_ADDR(pRuntimeEnv)); // expandBuffer(pRuntimeEnv, newSize, pRuntimeEnv->qinfo);
// } // }
// //
// pGroupResInfo->index += 1; // pGroupResInfo->index += 1;
...@@ -3659,7 +3623,7 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG ...@@ -3659,7 +3623,7 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG
//current output space is not enough to accommodate all data of this page, prepare more space //current output space is not enough to accommodate all data of this page, prepare more space
// if (numOfRowsToCopy > (pRuntimeEnv->resultInfo.capacity - numOfResult)) { // if (numOfRowsToCopy > (pRuntimeEnv->resultInfo.capacity - numOfResult)) {
// int32_t newSize = pRuntimeEnv->resultInfo.capacity + (numOfRowsToCopy - numOfResult); // int32_t newSize = pRuntimeEnv->resultInfo.capacity + (numOfRowsToCopy - numOfResult);
// expandBuffer(pRuntimeEnv, newSize, GET_QINFO_ADDR(pRuntimeEnv)); // expandBuffer(pRuntimeEnv, newSize, pRuntimeEnv->qinfo);
// } // }
pGroupResInfo->index += 1; pGroupResInfo->index += 1;
...@@ -3927,7 +3891,7 @@ void queryCostStatis(SQInfo *pQInfo) { ...@@ -3927,7 +3891,7 @@ void queryCostStatis(SQInfo *pQInfo) {
// //
// int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock); // int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
// //
// qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv), // qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, pRuntimeEnv->qinfo,
// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey); // pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
//} //}
...@@ -4000,7 +3964,7 @@ static void generateBlockDistResult(STableBlockDist *pTableBlockDist) { ...@@ -4000,7 +3964,7 @@ static void generateBlockDistResult(STableBlockDist *pTableBlockDist) {
// //
// SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; // SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
// while (tsdbNextDataBlock(pQueryHandle)) { // while (tsdbNextDataBlock(pQueryHandle)) {
// if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { // if (isQueryKilled(pRuntimeEnv->qinfo)) {
// longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); // longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
// } // }
// //
...@@ -4011,7 +3975,7 @@ static void generateBlockDistResult(STableBlockDist *pTableBlockDist) { ...@@ -4011,7 +3975,7 @@ static void generateBlockDistResult(STableBlockDist *pTableBlockDist) {
// pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.ekey : blockInfo.window.skey; // pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.ekey : blockInfo.window.skey;
// pTableQueryInfo->lastKey += step; // pTableQueryInfo->lastKey += step;
// //
// qDebug("QInfo:%p skip rows:%d, offset:%" PRId64, GET_QINFO_ADDR(pRuntimeEnv), blockInfo.rows, // qDebug("QInfo:%p skip rows:%d, offset:%" PRId64, pRuntimeEnv->qinfo, blockInfo.rows,
// pQuery->limit.offset); // pQuery->limit.offset);
// } else { // find the appropriated start position in current block // } else { // find the appropriated start position in current block
// updateOffsetVal(pRuntimeEnv, &blockInfo); // updateOffsetVal(pRuntimeEnv, &blockInfo);
...@@ -4060,7 +4024,7 @@ static void generateBlockDistResult(STableBlockDist *pTableBlockDist) { ...@@ -4060,7 +4024,7 @@ static void generateBlockDistResult(STableBlockDist *pTableBlockDist) {
// pRuntimeEnv->resultRowInfo.curIndex = index; // restore the window index // pRuntimeEnv->resultRowInfo.curIndex = index; // restore the window index
// //
// qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%" PRId64, // qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%" PRId64,
// GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, // pRuntimeEnv->qinfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
// pQuery->current->lastKey); // pQuery->current->lastKey);
// //
// return key; // return key;
...@@ -4234,6 +4198,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) ...@@ -4234,6 +4198,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
} }
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
cond.loadExternalRows = isPointInterpoQuery(pQuery);
if (!isSTableQuery if (!isSTableQuery
&& (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1) && (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1)
...@@ -4338,9 +4303,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts ...@@ -4338,9 +4303,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pRuntimeEnv->cur.vgroupIndex = -1; pRuntimeEnv->cur.vgroupIndex = -1;
if (onlyQueryTags(pQuery)) { if (onlyQueryTags(pQuery)) {
// TODO refactor.
pRuntimeEnv->resultInfo.capacity = 4096; pRuntimeEnv->resultInfo.capacity = 4096;
pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput); pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput);
} else if (isTsCompQuery(pQuery)) { } else if (isTsCompQuery(pQuery) || isPointInterpoQuery(pQuery)) {
pRuntimeEnv->pTableScanner = createSeqTableBlockScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); pRuntimeEnv->pTableScanner = createSeqTableBlockScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
} else if (needReverseScan(pQuery)) { } else if (needReverseScan(pQuery)) {
pRuntimeEnv->pTableScanner = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1); pRuntimeEnv->pTableScanner = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1);
...@@ -4411,195 +4377,6 @@ static void doTableQueryInfoTimeWindowCheck(SQuery* pQuery, STableQueryInfo* pTa ...@@ -4411,195 +4377,6 @@ static void doTableQueryInfoTimeWindowCheck(SQuery* pQuery, STableQueryInfo* pTa
} }
} }
#if 0
static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
SQueryCostInfo* summary = &pQInfo->summary;
int64_t st = taosGetTimestampMs();
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1;
if (isQueryKilled(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
STableQueryInfo **pTableQueryInfo = (STableQueryInfo**) taosHashGet(pRuntimeEnv->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid));
if(pTableQueryInfo == NULL) {
break;
}
pQuery->current = *pTableQueryInfo;
doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo);
if (!pQuery->groupbyColumn) {
setEnvForEachBlock(pRuntimeEnv, *pTableQueryInfo, &blockInfo);
}
if (pQuery->stabledev) {
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
if (pQuery->pExpr1[i].base.functionId == TSDB_FUNC_STDDEV_DST) {
setParamValue(pRuntimeEnv);
break;
}
}
}
uint32_t status = 0;
SDataStatis *pStatis = NULL;
SArray *pDataBlock = NULL;
int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pQuery->current->resInfo, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status);
if (ret != TSDB_CODE_SUCCESS) {
break;
}
if (status == BLK_DATA_DISCARD) {
pQuery->current->lastKey = QUERY_IS_ASC_QUERY(pQuery)? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue;
}
summary->totalRows += blockInfo.rows;
stableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, pDataBlock, binarySearchForKey);
qDebug("QInfo:%p check data block completed, uid:%"PRId64", tid:%d, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, "
"lastKey:%" PRId64,
pQInfo, blockInfo.uid, blockInfo.tid, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows,
pQuery->current->lastKey);
}
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
updateWindowResNumOfRes(pRuntimeEnv);
int64_t et = taosGetTimestampMs();
return et - st;
}
static UNUSED_FUNC bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0);
STableQueryInfo* pCheckInfo = taosArrayGetP(group, index);
if (pQuery->hasTagResults || pRuntimeEnv->pTsBuf != NULL) {
setTagVal(pRuntimeEnv, pCheckInfo->pTable);
}
STableId* id = TSDB_TABLEID(pCheckInfo->pTable);
qDebug("QInfo:%p query on (%d): uid:%" PRIu64 ", tid:%d, qrange:%" PRId64 "-%" PRId64, pQInfo, index,
id->uid, id->tid, pCheckInfo->lastKey, pCheckInfo->win.ekey);
STsdbQueryCond cond = {
.twindow = {pCheckInfo->lastKey, pCheckInfo->win.ekey},
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
.loadExternalRows = false,
};
// todo refactor
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
SArray *tx = taosArrayInit(1, sizeof(STableKeyInfo));
STableKeyInfo info = {.pTable = pCheckInfo->pTable, .lastKey = pCheckInfo->lastKey};
taosArrayPush(tx, &info);
taosArrayPush(g1, &tx);
STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1};
// include only current table
if (pRuntimeEnv->pQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
pRuntimeEnv->pQueryHandle = NULL;
}
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &gp, pQInfo, &pQuery->memRef);
taosArrayDestroy(tx);
taosArrayDestroy(g1);
if (pRuntimeEnv->pQueryHandle == NULL) {
longjmp(pRuntimeEnv->env, terrno);
}
if (pRuntimeEnv->pTsBuf != NULL) {
tVariant* pTag = &pRuntimeEnv->pCtx[0].tag;
if (pRuntimeEnv->cur.vgroupIndex == -1) {
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQuery->vgId, pTag);
// failed to find data with the specified tag value and vnodeId
if (!tsBufIsValidElem(&elem)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
} else {
qError("QInfo:%p failed to find tag:%"PRId64" in ts_comp", pQInfo, pTag->i64);
}
return false;
} else {
STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz,
cur.blockIndex, cur.tsIndex);
} else {
qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64,
cur.blockIndex, cur.tsIndex);
}
}
} else {
STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf);
if (tVariantCompare(elem.tag, &pRuntimeEnv->pCtx[0].tag) != 0) {
STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQuery->vgId, pTag);
// failed to find data with the specified tag value and vnodeId
if (!tsBufIsValidElem(&elem1)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
} else {
qError("QInfo:%p failed to find tag:%"PRId64" in ts_comp", pQInfo, pTag->i64);
}
return false;
} else {
STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, cur.blockIndex, cur.tsIndex);
} else {
qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64, cur.blockIndex, cur.tsIndex);
}
}
} else {
tsBufSetCursor(pRuntimeEnv->pTsBuf, &pRuntimeEnv->cur);
STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p continue scan ts_comp file, tag:%s blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, cur.blockIndex, cur.tsIndex);
} else {
qDebug("QInfo:%p continue scan ts_comp file, tag:%"PRId64" blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64, cur.blockIndex, cur.tsIndex);
}
}
}
}
initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx);
return true;
}
#endif
STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win) { STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win) {
STsdbQueryCond cond = { STsdbQueryCond cond = {
.colList = pQuery->colList, .colList = pQuery->colList,
...@@ -4636,478 +4413,19 @@ static UNUSED_FUNC void updateTableIdInfo(SQuery* pQuery, SHashObj* pTableIdInfo ...@@ -4636,478 +4413,19 @@ static UNUSED_FUNC void updateTableIdInfo(SQuery* pQuery, SHashObj* pTableIdInfo
} }
} }
#if 0 static void doCloseAllTimeWindow(SQueryRuntimeEnv* pRuntimeEnv) {
/** size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
* super table query handler for (int32_t i = 0; i < numOfGroup; ++i) {
* 1. super table projection query, group-by on normal columns query, ts-comp query SArray* group = GET_TABLEGROUP(pRuntimeEnv, i);
* 2. point interpolation query, last row query
*
* @param pQInfo
*/
static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
setQueryStatus(pQuery, QUERY_COMPLETED);
size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
if (isPointInterpoQuery(pQuery)) {
resetDefaultResInfoOutputBuf(pRuntimeEnv);
assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0);
while (pRuntimeEnv->groupIndex < numOfGroups) {
SArray *group = taosArrayGetP(pQuery->tableGroupInfo.pGroupList, pRuntimeEnv->groupIndex);
qDebug("QInfo:%p point interpolation query on group:%d, total group:%" PRIzu ", current group:%p", pQInfo,
pRuntimeEnv->groupIndex, numOfGroups, group);
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
SArray *tx = taosArrayDup(group);
taosArrayPush(g1, &tx);
STableGroupInfo gp = {.numOfTables = taosArrayGetSize(tx), .pGroupList = g1};
// include only current table
if (pRuntimeEnv->pQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
pRuntimeEnv->pQueryHandle = NULL;
}
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQuery->tsdb, &cond, &gp, pQInfo, &pQuery->memRef);
taosArrayDestroy(tx);
taosArrayDestroy(g1);
if (pRuntimeEnv->pQueryHandle == NULL) {
longjmp(pRuntimeEnv->env, terrno);
}
initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx);
SArray *s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle);
assert(taosArrayGetSize(s) >= 1);
setTagVal(pRuntimeEnv, taosArrayGetP(s, 0));
taosArrayDestroy(s);
// here we simply set the first table as current table
SArray *first = GET_TABLEGROUP(pRuntimeEnv, pRuntimeEnv->groupIndex);
pQuery->current = taosArrayGetP(first, 0);
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
int64_t numOfRes = getNumOfResult(pRuntimeEnv);
if (numOfRes > 0) {
pRuntimeEnv->resultInfo.rows += numOfRes;
forwardCtxOutputBuf(pRuntimeEnv, numOfRes);
}
skipResults(pRuntimeEnv);
pRuntimeEnv->groupIndex += 1;
// enable execution for next table, when handling the projection query
enableExecutionForNextTable(pRuntimeEnv);
if (pRuntimeEnv->resultInfo.rows >= pRuntimeEnv->resultInfo.capacity) {
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
break;
}
}
} else if (pQuery->groupbyColumn) { // group-by on normal columns query
while (pRuntimeEnv->groupIndex < numOfGroups) {
SArray *group = taosArrayGetP(pQuery->tableGroupInfo.pGroupList, pRuntimeEnv->groupIndex);
qDebug("QInfo:%p group by normal columns group:%d, total group:%" PRIzu "", pQInfo, pRuntimeEnv->groupIndex,
numOfGroups);
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
SArray *tx = taosArrayDup(group);
taosArrayPush(g1, &tx);
STableGroupInfo gp = {.numOfTables = taosArrayGetSize(tx), .pGroupList = g1};
// include only current table
if (pRuntimeEnv->pQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
pRuntimeEnv->pQueryHandle = NULL;
}
// no need to update the lastkey for each table
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &gp, pQInfo, &pQuery->memRef);
taosArrayDestroy(g1);
taosArrayDestroy(tx);
if (pRuntimeEnv->pQueryHandle == NULL) {
longjmp(pRuntimeEnv->env, terrno);
}
SArray *s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle);
assert(taosArrayGetSize(s) >= 1);
setTagVal(pRuntimeEnv, taosArrayGetP(s, 0));
// here we simply set the first table as current table
scanMultiTableDataBlocks(pQInfo);
pRuntimeEnv->groupIndex += 1;
taosArrayDestroy(s);
// no results generated for current group, continue to try the next group
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
if (pWindowResInfo->size <= 0) {
continue;
}
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
pWindowResInfo->pResult[i]->closed = true; // enable return all results for group by normal columns
SResultRow *pResult = pWindowResInfo->pResult[i];
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
SResultRowCellInfo *pCell = getResultCell(pRuntimeEnv, pResult, j);
pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pCell->numOfRes));
}
}
qDebug("QInfo:%p generated groupby columns results %d rows for group %d completed", pQInfo, pWindowResInfo->size,
pRuntimeEnv->groupIndex);
pRuntimeEnv->resultInfo.rows = 0;
if (pWindowResInfo->size > pRuntimeEnv->resultInfo.capacity) {
expandBuffer(pRuntimeEnv, pWindowResInfo->size, pQInfo);
}
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0);
copyToOutputBuf(pRuntimeEnv, pWindowResInfo);
assert(pRuntimeEnv->resultInfo.rows == pWindowResInfo->size);
resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
cleanupGroupResInfo(&pRuntimeEnv->groupResInfo);
break;
}
} else if (pQuery->queryWindowIdentical && pRuntimeEnv->pTsBuf == NULL && !isTsCompQuery(pQuery)) {
//super table projection query with identical query time range for all tables.
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
resetDefaultResInfoOutputBuf(pRuntimeEnv);
SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0);
assert(taosArrayGetSize(group) == pRuntimeEnv->tableqinfoGroupInfo.numOfTables &&
1 == taosArrayGetSize(pRuntimeEnv->tableqinfoGroupInfo.pGroupList));
void *pQueryHandle = pRuntimeEnv->pQueryHandle;
if (pQueryHandle == NULL) {
STsdbQueryCond con = createTsdbQueryCond(pQuery, &pQuery->window);
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &con, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef);
pQueryHandle = pRuntimeEnv->pQueryHandle;
}
// skip blocks without load the actual data block from file if no filter condition present
// skipBlocks(&pQInfo->runtimeEnv);
// if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) {
// setQueryStatus(pQuery, QUERY_COMPLETED);
// return;
// }
if (pQuery->prjInfo.vgroupLimit != -1) {
assert(pQuery->limit.limit == -1 && pQuery->limit.offset == 0);
} else if (pQuery->limit.limit != -1) {
assert(pQuery->prjInfo.vgroupLimit == -1);
}
bool hasMoreBlock = true;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
SQueryCostInfo *summary = &pQInfo->summary;
while ((hasMoreBlock = tsdbNextDataBlock(pQueryHandle)) == true) {
summary->totalBlocks += 1;
if (isQueryKilled(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
STableQueryInfo **pTableQueryInfo =
(STableQueryInfo **) taosHashGet(pRuntimeEnv->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid));
if (pTableQueryInfo == NULL) {
break;
}
pQuery->current = *pTableQueryInfo;
doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo);
if (pQuery->hasTagResults) {
setTagVal(pRuntimeEnv, pQuery->current->pTable);
}
if (pQuery->prjInfo.vgroupLimit > 0 && pQuery->current->resInfo.size > pQuery->prjInfo.vgroupLimit) {
pQuery->current->lastKey =
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue;
}
// it is a super table ordered projection query, check for the number of output for each vgroup
if (pQuery->prjInfo.vgroupLimit > 0 && pRuntimeEnv->resultInfo.rows >= pQuery->prjInfo.vgroupLimit) {
if (QUERY_IS_ASC_QUERY(pQuery) && blockInfo.window.skey >= pQuery->prjInfo.ts) {
pQuery->current->lastKey =
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue;
} else if (!QUERY_IS_ASC_QUERY(pQuery) && blockInfo.window.ekey <= pQuery->prjInfo.ts) {
pQuery->current->lastKey =
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue;
}
}
uint32_t status = 0;
SDataStatis *pStatis = NULL;
SArray *pDataBlock = NULL;
int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pQuery->current->resInfo, pQueryHandle, &blockInfo,
&pStatis, &pDataBlock, &status);
if (ret != TSDB_CODE_SUCCESS) {
break;
}
if(status == BLK_DATA_DISCARD) {
pQuery->current->lastKey =
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue;
}
ensureOutputBuffer(pRuntimeEnv, blockInfo.rows);
int64_t prev = getNumOfResult(pRuntimeEnv);
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1;
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
summary->totalRows += blockInfo.rows;
qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%" PRId64,
GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes,
pQuery->current->lastKey);
pRuntimeEnv->resultInfo.rows = getNumOfResult(pRuntimeEnv);
int64_t inc = pRuntimeEnv->resultInfo.rows - prev;
pQuery->current->resInfo.size += (int32_t) inc;
// the flag may be set by tableApplyFunctionsOnBlock, clear it here
CLEAR_QUERY_STATUS(pQuery, QUERY_COMPLETED);
updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo);
if (pQuery->prjInfo.vgroupLimit >= 0) {
if (((pRuntimeEnv->resultInfo.rows + pRuntimeEnv->resultInfo.total) < pQuery->prjInfo.vgroupLimit) || ((pRuntimeEnv->resultInfo.rows + pRuntimeEnv->resultInfo.total) > pQuery->prjInfo.vgroupLimit && prev < pQuery->prjInfo.vgroupLimit)) {
if (QUERY_IS_ASC_QUERY(pQuery) && pQuery->prjInfo.ts < blockInfo.window.ekey) {
pQuery->prjInfo.ts = blockInfo.window.ekey;
} else if (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->prjInfo.ts > blockInfo.window.skey) {
pQuery->prjInfo.ts = blockInfo.window.skey;
}
}
} else {
// the limitation of output result is reached, set the query completed
skipResults(pRuntimeEnv);
if (limitOperator(pQuery, pQInfo)) {
SET_STABLE_QUERY_OVER(pRuntimeEnv);
break;
}
}
// while the output buffer is full or limit/offset is applied, query may be paused here
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL|QUERY_COMPLETED)) {
break;
}
}
if (!hasMoreBlock) {
setQueryStatus(pQuery, QUERY_COMPLETED);
SET_STABLE_QUERY_OVER(pRuntimeEnv);
}
} else {
/*
* the following two cases handled here.
* 1. ts-comp query, and 2. the super table projection query with different query time range for each table.
* If the subgroup index is larger than 0, results generated by group by tbname,k is existed.
* we need to return it to client in the first place.
*/
if (hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
pRuntimeEnv->resultInfo.total += pRuntimeEnv->resultInfo.rows;
if (pRuntimeEnv->resultInfo.rows > 0) {
return;
}
}
// all data have returned already
if (pRuntimeEnv->tableIndex >= pRuntimeEnv->tableqinfoGroupInfo.numOfTables) {
return;
}
resetDefaultResInfoOutputBuf(pRuntimeEnv);
resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0);
assert(taosArrayGetSize(group) == pRuntimeEnv->tableqinfoGroupInfo.numOfTables &&
1 == taosArrayGetSize(pRuntimeEnv->tableqinfoGroupInfo.pGroupList));
while (pRuntimeEnv->tableIndex < pRuntimeEnv->tableqinfoGroupInfo.numOfTables) {
if (isQueryKilled(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
pQuery->current = taosArrayGetP(group, pRuntimeEnv->tableIndex);
if (!multiTableMultioutputHelper(pQInfo, pRuntimeEnv->tableIndex)) {
pRuntimeEnv->tableIndex++;
continue;
}
// TODO handle the limit offset problem
if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) {
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
pRuntimeEnv->tableIndex++;
continue;
}
}
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
skipResults(pRuntimeEnv);
// the limitation of output result is reached, set the query completed
if (limitOperator(pQuery, pQInfo)) {
SET_STABLE_QUERY_OVER(pRuntimeEnv);
break;
}
// enable execution for next table, when handling the projection query
enableExecutionForNextTable(pRuntimeEnv);
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
/*
* query range is identical in terms of all meters involved in query,
* so we need to restore them at the *beginning* of query on each meter,
* not the consecutive query on meter on which is aborted due to buffer limitation
* to ensure that, we can reset the query range once query on a meter is completed.
*/
pRuntimeEnv->tableIndex++;
updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo);
// if the buffer is full or group by each table, we need to jump out of the loop
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
break;
}
if (pRuntimeEnv->pTsBuf != NULL) {
pRuntimeEnv->cur = pRuntimeEnv->pTsBuf->cur;
}
} else {
// all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter
if (pRuntimeEnv->resultInfo.rows == 0) {
assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
continue;
} else {
// buffer is full, wait for the next round to retrieve data from current meter
assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
break;
}
}
}
if (pRuntimeEnv->tableIndex >= pRuntimeEnv->tableqinfoGroupInfo.numOfTables) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
/*
* 1. super table projection query, group-by on normal columns query, ts-comp query
* 2. point interpolation query, last row query
*
* group-by on normal columns query and last_row query do NOT invoke the finalizer here,
* since the finalize stage will be done at the client side.
*
* projection query, point interpolation query do not need the finalizer.
*
* Only the ts-comp query requires the finalizer function to be executed here.
*/
if (isTsCompQuery(pQuery)) {
finalizeQueryResult(pRuntimeEnv);
}
if (pRuntimeEnv->pTsBuf != NULL) { size_t num = taosArrayGetSize(group);
pRuntimeEnv->cur = pRuntimeEnv->pTsBuf->cur; for (int32_t j = 0; j < num; ++j) {
STableQueryInfo* item = taosArrayGetP(group, j);
closeAllResultRows(&item->resInfo);
} }
qDebug("QInfo %p numOfTables:%" PRIu64 ", index:%d, numOfGroups:%" PRIzu ", %" PRId64
" points returned, total:%" PRId64 ", offset:%" PRId64,
pQInfo, (uint64_t)pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pRuntimeEnv->tableIndex, numOfGroups, pRuntimeEnv->resultInfo.rows,
pRuntimeEnv->resultInfo.total, pQuery->limit.offset);
} }
} }
static int32_t doSaveContext(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SET_REVERSE_SCAN_FLAG(pRuntimeEnv);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
SWITCH_ORDER(pQuery->order.order);
if (pRuntimeEnv->pTsBuf != NULL) {
SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order);
}
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
// clean unused handle
if (pRuntimeEnv->pSecQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
}
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
switchCtxOrder(pRuntimeEnv);
disableFuncInReverseScan(pRuntimeEnv);
setupQueryRangeForReverseScan(pRuntimeEnv);
pRuntimeEnv->prevGroupId = INT32_MIN;
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef);
return (pRuntimeEnv->pSecQueryHandle == NULL)? -1:0;
}
static UNUSED_FUNC void doRestoreContext(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
SWITCH_ORDER(pQuery->order.order);
if (pRuntimeEnv->pTsBuf != NULL) {
SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order);
}
switchCtxOrder(pRuntimeEnv);
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
}
#endif
static void doCloseAllTimeWindow(SQueryRuntimeEnv* pRuntimeEnv) {
// if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) {
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
for (int32_t i = 0; i < numOfGroup; ++i) {
SArray* group = GET_TABLEGROUP(pRuntimeEnv, i);
size_t num = taosArrayGetSize(group);
for (int32_t j = 0; j < num; ++j) {
STableQueryInfo* item = taosArrayGetP(group, j);
closeAllResultRows(&item->resInfo);
}
}
// } else { // close results for group result
// closeAllResultRows(&pQInfo->runtimeEnv.resultRowInfo);
// }
// }
}
static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) { static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) {
SSDataBlock *pBlock = &pTableScanInfo->block; SSDataBlock *pBlock = &pTableScanInfo->block;
SQuery* pQuery = pTableScanInfo->pRuntimeEnv->pQuery; SQuery* pQuery = pTableScanInfo->pRuntimeEnv->pQuery;
...@@ -5817,7 +5135,7 @@ static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -5817,7 +5135,7 @@ static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableAggregate"; pOperator->name = "TableAggregate";
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->upstream = upstream; pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
...@@ -6216,7 +5534,7 @@ void buildTableBlockDistResult(SQInfo *pQInfo) { ...@@ -6216,7 +5534,7 @@ void buildTableBlockDistResult(SQInfo *pQInfo) {
int64_t startTime = taosGetTimestampUs(); int64_t startTime = taosGetTimestampUs();
while (tsdbNextDataBlockWithoutMerge(pQueryHandle)) { while (tsdbNextDataBlockWithoutMerge(pQueryHandle)) {
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { if (isQueryKilled(pRuntimeEnv->qinfo)) {
freeTableBlockDist(pTableBlockDist); freeTableBlockDist(pTableBlockDist);
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
...@@ -7192,7 +6510,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr ...@@ -7192,7 +6510,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
} }
} }
colIdCheck(pQuery); colIdCheck(pQuery, pQInfo);
// todo refactor // todo refactor
pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX); pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册