提交 8c2766e3 编写于 作者: H hjxilinx

support the sliding query [tbase-266]

上级 c6b0d6b0
......@@ -31,10 +31,6 @@ extern "C" {
#include "tsqlfunction.h"
#include "tutil.h"
//#define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \
// ((res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows) + \
// (1 - ord.order) * (res->numOfRows - 1) * tscFieldInfoGetField(_queryinfo, col)->bytes)
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \
(res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows)
......@@ -117,13 +113,6 @@ typedef struct SColumnBaseInfo {
struct SLocalReducer;
// todo move to utility
typedef struct SString {
int32_t alloc;
int32_t n;
char * z;
} SString;
typedef struct SCond {
uint64_t uid;
char * cond;
......
......@@ -2890,24 +2890,17 @@ static FORCE_INLINE void date_col_output_function_f(SQLFunctionCtx *pCtx, int32_
static void col_project_function(SQLFunctionCtx *pCtx) {
INC_INIT_VAL(pCtx, pCtx->size);
char *pDest = 0;
// if (pCtx->order == TSQL_SO_ASC) {
pDest = pCtx->aOutputBuf;
// } else {
// pDest = pCtx->aOutputBuf - (pCtx->size - 1) * pCtx->inputBytes;
// }
char *pData = GET_INPUT_CHAR(pCtx);
if (pCtx->order == TSQL_SO_ASC) {
memcpy(pDest, pData, (size_t)pCtx->size * pCtx->inputBytes);
memcpy(pCtx->aOutputBuf, pData, (size_t)pCtx->size * pCtx->inputBytes);
} else {
for(int32_t i = 0; i < pCtx->size; ++i) {
memcpy(pDest + (pCtx->size - 1 - i) * pCtx->inputBytes, pData + i * pCtx->inputBytes,
memcpy(pCtx->aOutputBuf + (pCtx->size - 1 - i) * pCtx->inputBytes, pData + i * pCtx->inputBytes,
pCtx->inputBytes);
}
}
pCtx->aOutputBuf += pCtx->size * pCtx->outputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
pCtx->aOutputBuf += pCtx->size * pCtx->outputBytes;
}
static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
......@@ -3229,7 +3222,7 @@ static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) {
GET_RES_INFO(pCtx)->numOfRes += 1;
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t step = 1/*GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_INT: {
......
......@@ -37,7 +37,6 @@ uint32_t MurmurHash3_32(const char *key, uint32_t len);
*/
uint32_t taosIntHash_32(const char *key, uint32_t len);
uint32_t taosIntHash_64(const char *key, uint32_t len);
_hash_fn_t taosGetDefaultHashFunction(int32_t type);
......
......@@ -1485,9 +1485,6 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
}
TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalSKey : pRuntimeEnv->intervalEKey;
// int64_t alignedTimestamp =
// taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision);
setExecParams(pQuery, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField, hasNull,
pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag);
}
......@@ -1980,7 +1977,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
SWindowStatus* pStatus = getCurrentSWindow(&pRuntimeEnv->swindowResInfo);
if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) {
qTrace("not completed in supplementary scan, ignore\n");
// qTrace("QInfo:%p not completed in supplementary scan, ignore funcId:%d, window:%lld-%lld",
// GET_QINFO_ADDR(pQuery), functionId, pStatus->window.skey, pStatus->window.ekey);
continue;
}
......@@ -2012,7 +2010,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
SWindowStatus* pStatus = getCurrentSWindow(&pRuntimeEnv->swindowResInfo);
if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) {
qTrace("not completed in supplementary scan, ignore");
// qTrace("QInfo:%p not completed in supplementary scan, ignore funcId:%d, window:%lld-%lld",
// GET_QINFO_ADDR(pQuery), functionId, pStatus->window.skey, pStatus->window.ekey);
continue;
}
......@@ -2068,7 +2067,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
free(sasArray);
if (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0) {
if (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0 && IS_MASTER_SCAN(pRuntimeEnv)) {
SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo;
// query completed
......@@ -2099,9 +2098,12 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
pWindowResInfo->prevSKey = skey;
// the number of completed slots are larger than the threshold, dump to client immediately.
if (numOfResFromResWindowInfo(pWindowResInfo) > pWindowResInfo->threshold) {
int32_t v = numOfResFromResWindowInfo(pWindowResInfo);
if (v > pWindowResInfo->threshold) {
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
}
dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, v);
}
}
......@@ -2110,7 +2112,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
* because the results of group by normal column is put into intermediate buffer.
*/
int32_t num = 0;
if (!groupbyStateValue) {
if (!groupbyStateValue && !(pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) {
num = getNumOfResult(pRuntimeEnv) - prevNumOfRes;
}
......@@ -4500,7 +4502,8 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
}
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags;
initResWindowInfo(&pRuntimeEnv->swindowResInfo, 10039, TSDB_DATA_TYPE_BIGINT, pSupporter->pResult);
int16_t type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
initResWindowInfo(&pRuntimeEnv->swindowResInfo, 10039, type, pSupporter->pResult);
}
if (pQuery->nAggTimeInterval != 0) {
......@@ -4917,10 +4920,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
while (1) {
// check if query is killed or not set the status of query to pass the status check
// if (isQueryKilled(pQuery)) {
// setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
// return cnt;
// }
if (isQueryKilled(pQuery)) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return cnt;
}
int32_t numOfRes = 0;
SBlockInfo blockInfo = {0};
......@@ -5745,7 +5748,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
// int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
// reset the execution contexts
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
......@@ -5765,7 +5768,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
*
* diff function is handled in multi-output function
*/
pRuntimeEnv->pCtx[j].ptsOutputBuf += TSDB_KEYSIZE * output * factor;
pRuntimeEnv->pCtx[j].ptsOutputBuf += TSDB_KEYSIZE * output/* * factor*/;
}
resetResultInfo(pRuntimeEnv->pCtx[j].resultInfo);
......@@ -5856,12 +5859,15 @@ static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus
SWAP(pQuery->skey, pQuery->ekey, TSKEY);
pQuery->lastKey = pQuery->skey;
pRuntimeEnv->startPos = pRuntimeEnv->endPos;
SWAP(pRuntimeEnv->intervalSKey, pRuntimeEnv->intervalEKey, TSKEY);
}
static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SWAP(pQuery->skey, pQuery->ekey, TSKEY);
SWAP(pRuntimeEnv->intervalSKey, pRuntimeEnv->intervalEKey, TSKEY);
pQuery->lastKey = pStatus->lastKey;
pQuery->skey = pStatus->skey;
pQuery->ekey = pStatus->ekey;
......@@ -5883,6 +5889,8 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) {
return;
}
dTrace("QInfo:%p start to supp scan", GET_QINFO_ADDR(pQuery));
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
// usually this load operation will incur load disk block operation
......@@ -5923,7 +5931,8 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
/* store the start query position */
savePointPosition(&pRuntimeEnv->startPos, pQuery->fileId, pQuery->slot, pQuery->pos);
int64_t skey = pQuery->lastKey;
while (1) {
doScanAllDataBlocks(pRuntimeEnv);
......@@ -5978,26 +5987,14 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
int64_t newSkey = pQuery->skey;
pQuery->skey = skey;
doSingleMeterSupplementScan(pRuntimeEnv);
// reset status code
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) {
SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SOutputRes *buf = &pWindowResInfo->pResult[i];
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
buf->resultInfo[j].complete = false;
}
}
} else {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[i]);
if (pResInfo != NULL) {
pResInfo->complete = false;
}
}
}
// update the pQuery->skey/pQuery->ekey to limit the scan scope of sliding query during
// supplementary scan
pQuery->skey = newSkey;
}
void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) {
......@@ -6006,10 +6003,15 @@ void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) {
// for each group result, call the finalize function for each column
SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo;
bool groupbyCol = isGroupbyNormalCol(pQuery->pGroupbyExpr);
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SOutputRes *buf = &pWindowResInfo->pResult[i];
SWindowStatus* pStatus = &pWindowResInfo->pStatus[i];
if (groupbyCol) {
pStatus->closed = true;
}
if (!pStatus->closed) {
continue;
}
......
......@@ -85,6 +85,29 @@ static void setStartPositionForCacheBlock(SQuery *pQuery, SCacheBlock *pBlock, b
}
}
static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery* pQuery = pRuntimeEnv->pQuery;
// enable execution for next table
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) {
SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SOutputRes *buf = &pWindowResInfo->pResult[i];
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
buf->resultInfo[j].complete = false;
}
}
} else {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[i]);
if (pResInfo != NULL) {
pResInfo->complete = false;
}
}
}
}
static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) {
SQuery * pQuery = &pQInfo->query;
SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter;
......@@ -548,7 +571,10 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start
pointInterpSupporterDestroy(&pointInterpSupporter);
vnodeScanAllData(pRuntimeEnv);
// enable execution for next table
enableExecutionForNextTable(pRuntimeEnv);
// first/last_row query, do not invoke the finalize for super table query
if (!isFirstLastRowQuery(pQuery)) {
doFinalizeResult(pRuntimeEnv);
......@@ -725,7 +751,10 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
pSupporter->meterIdx = pSupporter->pSidSet->numOfSids;
break;
}
// enable execution for next table
enableExecutionForNextTable(pRuntimeEnv);
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) {
/*
* query range is identical in terms of all meters involved in query,
......@@ -738,10 +767,10 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
pSupporter->meterIdx++;
// if the buffer is full or group by each table, we need to jump out of the loop
// if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL) ||
// isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)) {
if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL) ||
isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)) {
break;
// }
}
} else {
// forward query range
......@@ -768,11 +797,14 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
pRuntimeEnv->cur = pRuntimeEnv->pTSBuf->cur;
}
// todo refactor
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
SSlidingWindowResInfo* pWindowResInfo = &pRuntimeEnv->swindowResInfo;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SOutputRes *buf = &pWindowResInfo->pResult[i];
pWindowResInfo->pStatus[i].closed = true; // enable return all results for group by normal columns
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
buf->numOfRows = MAX(buf->numOfRows, buf->resultInfo[j].numOfRes);
}
......
......@@ -103,4 +103,4 @@ _hash_fn_t taosGetDefaultHashFunction(int32_t type) {
}
return fn;
}
\ No newline at end of file
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册