提交 aafb6098 编写于 作者: H Haojun Liao

[TD-225] refactor.

上级 eb97f122
...@@ -9,6 +9,7 @@ extern "C" { ...@@ -9,6 +9,7 @@ extern "C" {
#define GET_TYPED_DATA(_v, _finalType, _type, _data) \ #define GET_TYPED_DATA(_v, _finalType, _type, _data) \
switch (_type) { \ switch (_type) { \
case TSDB_DATA_TYPE_BOOL: \
case TSDB_DATA_TYPE_TINYINT: \ case TSDB_DATA_TYPE_TINYINT: \
(_v) = (_finalType)GET_INT8_VAL(_data); \ (_v) = (_finalType)GET_INT8_VAL(_data); \
break; \ break; \
......
...@@ -29,31 +29,30 @@ ...@@ -29,31 +29,30 @@
int32_t getOutputInterResultBufSize(SQuery* pQuery); int32_t getOutputInterResultBufSize(SQuery* pQuery);
void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type); size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv);
void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type); int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type);
SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
int32_t initWindowResInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type); void resetResultRowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo);
void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int32_t num);
void clearClosedResultRows(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pResultRowInfo);
int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo);
void closeAllResultRows(SResultRowInfo* pResultRowInfo);
void removeRedundantResultRows(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order);
void cleanupTimeWindowInfo(SResultRowInfo* pResultRowInfo); int32_t initResultRow(SResultRow *pResultRow);
void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo); void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot);
void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num); bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot);
void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type);
void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type);
void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv); SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index);
int32_t numOfClosedTimeWindow(SResultRowInfo* pResultRowInfo);
void closeTimeWindow(SResultRowInfo* pResultRowInfo, int32_t slot);
void closeAllTimeWindow(SResultRowInfo* pResultRowInfo);
void removeRedundantWindow(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order);
static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size); assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size);
return pResultRowInfo->pResult[slot]; return pResultRowInfo->pResult[slot];
} }
bool isWindowResClosed(SResultRowInfo *pResultRowInfo, int32_t slot);
int32_t initResultRow(SResultRow *pResultRow);
static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SResultRow *pResult, static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SResultRow *pResult,
tFilePage* page) { tFilePage* page) {
assert(pResult != NULL && pRuntimeEnv != NULL); assert(pResult != NULL && pRuntimeEnv != NULL);
...@@ -71,8 +70,6 @@ bool notNull_filter(SColumnFilterElem *pFilter, char* minval, char* maxval); ...@@ -71,8 +70,6 @@ bool notNull_filter(SColumnFilterElem *pFilter, char* minval, char* maxval);
__filter_func_t *getRangeFilterFuncArray(int32_t type); __filter_func_t *getRangeFilterFuncArray(int32_t type);
__filter_func_t *getValueFilterFuncArray(int32_t type); __filter_func_t *getValueFilterFuncArray(int32_t type);
size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv);
SResultRowPool* initResultRowPool(size_t size); SResultRowPool* initResultRowPool(size_t size);
SResultRow* getNewResultRow(SResultRowPool* p); SResultRow* getNewResultRow(SResultRowPool* p);
int64_t getResultRowPoolMemSize(SResultRowPool* p); int64_t getResultRowPoolMemSize(SResultRowPool* p);
......
...@@ -614,14 +614,14 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf ...@@ -614,14 +614,14 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
return 0; return 0;
} }
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pWindowResInfo, SDataBlockInfo* pBockInfo, static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SDataBlockInfo* pBockInfo,
STimeWindow *win, bool masterscan, bool* newWind, SResultRow** pResult) { STimeWindow *win, bool masterscan, bool* newWind, SResultRow** pResult) {
assert(win->skey <= win->ekey); assert(win->skey <= win->ekey);
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
// todo refactor // todo refactor
int64_t uid = getResultInfoUId(pRuntimeEnv); int64_t uid = getResultInfoUId(pRuntimeEnv);
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, uid); SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, uid);
if (pResultRow == NULL) { if (pResultRow == NULL) {
*newWind = false; *newWind = false;
...@@ -717,7 +717,7 @@ static int32_t updateResultRowCurrentIndex(SResultRowInfo* pWindowResInfo, TSKEY ...@@ -717,7 +717,7 @@ static int32_t updateResultRowCurrentIndex(SResultRowInfo* pWindowResInfo, TSKEY
TSKEY ekey = pResult->win.ekey; TSKEY ekey = pResult->win.ekey;
if ((ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) { if ((ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) {
closeTimeWindow(pWindowResInfo, i); closeResultRow(pWindowResInfo, i);
} else { } else {
skey = pResult->win.skey; skey = pResult->win.skey;
break; break;
...@@ -751,7 +751,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe ...@@ -751,7 +751,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
// query completed // query completed
if ((lastKey >= pQuery->current->win.ekey && ascQuery) || (lastKey <= pQuery->current->win.ekey && (!ascQuery))) { if ((lastKey >= pQuery->current->win.ekey && ascQuery) || (lastKey <= pQuery->current->win.ekey && (!ascQuery))) {
closeAllTimeWindow(pWindowResInfo); closeAllResultRows(pWindowResInfo);
pWindowResInfo->curIndex = pWindowResInfo->size - 1; pWindowResInfo->curIndex = pWindowResInfo->size - 1;
setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL); setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL);
...@@ -1351,14 +1351,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat ...@@ -1351,14 +1351,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
} }
int64_t v = -1; int64_t v = -1;
switch(type) { GET_TYPED_DATA(v, int64_t, type, pData);
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: v = GET_INT8_VAL(pData); break;
case TSDB_DATA_TYPE_SMALLINT: v = GET_INT16_VAL(pData); break;
case TSDB_DATA_TYPE_INT: v = GET_INT32_VAL(pData); break;
case TSDB_DATA_TYPE_BIGINT: v = GET_INT64_VAL(pData); break;
}
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
if (pResultRow->key == NULL) { if (pResultRow->key == NULL) {
pResultRow->key = malloc(varDataTLen(pData)); pResultRow->key = malloc(varDataTLen(pData));
...@@ -1790,7 +1783,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl ...@@ -1790,7 +1783,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
if (QUERY_IS_INTERVAL_QUERY(pQuery)) { if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
} else if (pRuntimeEnv->groupbyNormalCol) { } else if (pRuntimeEnv->groupbyNormalCol) {
closeAllTimeWindow(pWindowResInfo); closeAllResultRows(pWindowResInfo);
numOfRes = pWindowResInfo->size; numOfRes = pWindowResInfo->size;
} else { // projection query } else { // projection query
numOfRes = (int32_t)getNumOfResult(pRuntimeEnv); numOfRes = (int32_t)getNumOfResult(pRuntimeEnv);
...@@ -2094,7 +2087,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2094,7 +2087,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv); SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv);
qDebug("QInfo:%p teardown runtime env", pQInfo); qDebug("QInfo:%p teardown runtime env", pQInfo);
cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo); cleanupResultRowInfo(&pRuntimeEnv->windowResInfo);
if (pRuntimeEnv->pCtx != NULL) { if (pRuntimeEnv->pCtx != NULL) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
...@@ -2928,7 +2921,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2928,7 +2921,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
if (QUERY_IS_INTERVAL_QUERY(pQuery) && (IS_MASTER_SCAN(pRuntimeEnv)|| pRuntimeEnv->scanFlag == REPEAT_SCAN)) { if (QUERY_IS_INTERVAL_QUERY(pQuery) && (IS_MASTER_SCAN(pRuntimeEnv)|| pRuntimeEnv->scanFlag == REPEAT_SCAN)) {
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
closeAllTimeWindow(&pRuntimeEnv->windowResInfo); closeAllResultRows(&pRuntimeEnv->windowResInfo);
pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window
} else { } else {
assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)); assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
...@@ -4057,12 +4050,12 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -4057,12 +4050,12 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
// for each group result, call the finalize function for each column // for each group result, call the finalize function for each column
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (pRuntimeEnv->groupbyNormalCol) { if (pRuntimeEnv->groupbyNormalCol) {
closeAllTimeWindow(pWindowResInfo); closeAllResultRows(pWindowResInfo);
} }
for (int32_t i = 0; i < pWindowResInfo->size; ++i) { for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SResultRow *buf = pWindowResInfo->pResult[i]; SResultRow *buf = pWindowResInfo->pResult[i];
if (!isWindowResClosed(pWindowResInfo, i)) { if (!isResultRowClosed(pWindowResInfo, i)) {
continue; continue;
} }
...@@ -4112,7 +4105,7 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void ...@@ -4112,7 +4105,7 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void
// set more initial size of interval/groupby query // set more initial size of interval/groupby query
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
int32_t initialSize = 128; int32_t initialSize = 128;
int32_t code = initWindowResInfo(&pTableQueryInfo->windowResInfo, initialSize, TSDB_DATA_TYPE_INT); int32_t code = initResultRowInfo(&pTableQueryInfo->windowResInfo, initialSize, TSDB_DATA_TYPE_INT);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
...@@ -4128,7 +4121,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) { ...@@ -4128,7 +4121,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) {
} }
tVariantDestroy(&pTableQueryInfo->tag); tVariantDestroy(&pTableQueryInfo->tag);
cleanupTimeWindowInfo(&pTableQueryInfo->windowResInfo); cleanupResultRowInfo(&pTableQueryInfo->windowResInfo);
} }
/** /**
...@@ -4360,7 +4353,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo, int32_ ...@@ -4360,7 +4353,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo, int32_
int32_t step = -1; int32_t step = -1;
qDebug("QInfo:%p start to copy data from windowResInfo to query buf", pQInfo); qDebug("QInfo:%p start to copy data from windowResInfo to query buf", pQInfo);
int32_t totalSet = numOfClosedTimeWindow(pResultInfo); int32_t totalSet = numOfClosedResultRows(pResultInfo);
SResultRow** result = pResultInfo->pResult; SResultRow** result = pResultInfo->pResult;
if (orderType == TSDB_ORDER_ASC) { if (orderType == TSDB_ORDER_ASC) {
...@@ -4481,7 +4474,7 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc ...@@ -4481,7 +4474,7 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc
// TODO refactor // TODO refactor
if ((pTableQueryInfo->lastKey >= pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey <= pTableQueryInfo->win.ekey && (!ascQuery))) { if ((pTableQueryInfo->lastKey >= pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey <= pTableQueryInfo->win.ekey && (!ascQuery))) {
closeAllTimeWindow(pWindowResInfo); closeAllResultRows(pWindowResInfo);
pWindowResInfo->curIndex = pWindowResInfo->size - 1; pWindowResInfo->curIndex = pWindowResInfo->size - 1;
} else { } else {
updateResultRowCurrentIndex(pWindowResInfo, pTableQueryInfo->lastKey, ascQuery); updateResultRowCurrentIndex(pWindowResInfo, pTableQueryInfo->lastKey, ascQuery);
...@@ -5031,7 +5024,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo ...@@ -5031,7 +5024,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
type = TSDB_DATA_TYPE_INT; // group id type = TSDB_DATA_TYPE_INT; // group id
} }
code = initWindowResInfo(&pRuntimeEnv->windowResInfo, 8, type); code = initResultRowInfo(&pRuntimeEnv->windowResInfo, 8, type);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -5051,7 +5044,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo ...@@ -5051,7 +5044,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
type = TSDB_DATA_TYPE_TIMESTAMP; type = TSDB_DATA_TYPE_TIMESTAMP;
} }
code = initWindowResInfo(&pRuntimeEnv->windowResInfo, numOfResultRows, type); code = initResultRowInfo(&pRuntimeEnv->windowResInfo, numOfResultRows, type);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -5479,7 +5472,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5479,7 +5472,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQInfo->groupIndex = currentGroupIndex; // restore the group index pQInfo->groupIndex = currentGroupIndex; // restore the group index
assert(pQuery->rec.rows == pWindowResInfo->size); assert(pQuery->rec.rows == pWindowResInfo->size);
clearClosedTimeWindow(pRuntimeEnv); clearClosedResultRows(pRuntimeEnv, &pRuntimeEnv->windowResInfo);
break; break;
} }
} else if (pRuntimeEnv->queryWindowIdentical && pRuntimeEnv->pTSBuf == NULL && !isTSCompQuery(pQuery)) { } else if (pRuntimeEnv->queryWindowIdentical && pRuntimeEnv->pTSBuf == NULL && !isTSCompQuery(pQuery)) {
...@@ -5641,7 +5634,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5641,7 +5634,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
} }
resetDefaultResInfoOutputBuf(pRuntimeEnv); resetDefaultResInfoOutputBuf(pRuntimeEnv);
resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo);
SArray *group = GET_TABLEGROUP(pQInfo, 0); SArray *group = GET_TABLEGROUP(pQInfo, 0);
assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables && assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables &&
...@@ -5796,11 +5789,11 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) { ...@@ -5796,11 +5789,11 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
size_t num = taosArrayGetSize(group); size_t num = taosArrayGetSize(group);
for (int32_t j = 0; j < num; ++j) { for (int32_t j = 0; j < num; ++j) {
STableQueryInfo* item = taosArrayGetP(group, j); STableQueryInfo* item = taosArrayGetP(group, j);
closeAllTimeWindow(&item->windowResInfo); closeAllResultRows(&item->windowResInfo);
} }
} }
} else { // close results for group result } else { // close results for group result
closeAllTimeWindow(&pQInfo->runtimeEnv.windowResInfo); closeAllResultRows(&pQInfo->runtimeEnv.windowResInfo);
} }
} }
...@@ -6048,10 +6041,10 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) ...@@ -6048,10 +6041,10 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start)
if ((pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) && pQuery->limit.offset > 0 && if ((pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) && pQuery->limit.offset > 0 &&
pQuery->fillType == TSDB_FILL_NONE) { pQuery->fillType == TSDB_FILL_NONE) {
// maxOutput <= 0, means current query does not generate any results // maxOutput <= 0, means current query does not generate any results
int32_t numOfClosed = numOfClosedTimeWindow(&pRuntimeEnv->windowResInfo); int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo);
int32_t c = (int32_t)(MIN(numOfClosed, pQuery->limit.offset)); int32_t c = (int32_t)(MIN(numOfClosed, pQuery->limit.offset));
clearFirstNWindowRes(pRuntimeEnv, c); popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, c);
pQuery->limit.offset -= c; pQuery->limit.offset -= c;
} }
...@@ -6088,7 +6081,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { ...@@ -6088,7 +6081,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
pQuery->rec.rows = 0; pQuery->rec.rows = 0;
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
clearFirstNWindowRes(pRuntimeEnv, pQInfo->groupIndex); popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex);
} }
// no result generated, abort // no result generated, abort
...@@ -6121,16 +6114,16 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { ...@@ -6121,16 +6114,16 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
// all data scanned, the group by normal column can return // all data scanned, the group by normal column can return
if (pRuntimeEnv->groupbyNormalCol) { // todo refactor with merge interval time result if (pRuntimeEnv->groupbyNormalCol) { // todo refactor with merge interval time result
// maxOutput <= 0, means current query does not generate any results // maxOutput <= 0, means current query does not generate any results
int32_t numOfClosed = numOfClosedTimeWindow(&pRuntimeEnv->windowResInfo); int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo);
if ((pQuery->limit.offset > 0 && pQuery->limit.offset < numOfClosed) || pQuery->limit.offset == 0) { if ((pQuery->limit.offset > 0 && pQuery->limit.offset < numOfClosed) || pQuery->limit.offset == 0) {
// skip offset result rows // skip offset result rows
clearFirstNWindowRes(pRuntimeEnv, (int32_t) pQuery->limit.offset); popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (int32_t) pQuery->limit.offset);
pQuery->rec.rows = 0; pQuery->rec.rows = 0;
pQInfo->groupIndex = 0; pQInfo->groupIndex = 0;
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
clearFirstNWindowRes(pRuntimeEnv, pQInfo->groupIndex); popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex);
doSecondaryArithmeticProcess(pQuery); doSecondaryArithmeticProcess(pQuery);
limitResults(pRuntimeEnv); limitResults(pRuntimeEnv);
...@@ -6164,7 +6157,7 @@ static void tableQueryImpl(SQInfo *pQInfo) { ...@@ -6164,7 +6157,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
if (pRuntimeEnv->windowResInfo.size > 0) { if (pRuntimeEnv->windowResInfo.size > 0) {
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
clearFirstNWindowRes(pRuntimeEnv, pQInfo->groupIndex); popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex);
if (pQuery->rec.rows > 0) { if (pQuery->rec.rows > 0) {
qDebug("QInfo:%p %"PRId64" rows returned from group results, total:%"PRId64"", pQInfo, pQuery->rec.rows, pQuery->rec.total); qDebug("QInfo:%p %"PRId64" rows returned from group results, total:%"PRId64"", pQInfo, pQuery->rec.rows, pQuery->rec.total);
...@@ -7029,7 +7022,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -7029,7 +7022,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQInfo->runtimeEnv.pResultRowHashTable = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pQInfo->runtimeEnv.pResultRowHashTable = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pQInfo->runtimeEnv.keyBuf = malloc(TSDB_MAX_BYTES_PER_ROW); pQInfo->runtimeEnv.keyBuf = malloc(TSDB_MAX_BYTES_PER_ROW);
pQInfo->runtimeEnv.pool = initResultRowPool(getWindowResultSize(&pQInfo->runtimeEnv)); pQInfo->runtimeEnv.pool = initResultRowPool(getResultRowSize(&pQInfo->runtimeEnv));
pQInfo->runtimeEnv.prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + srcSize); pQInfo->runtimeEnv.prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + srcSize);
char* start = POINTER_BYTES * pQuery->numOfCols + (char*) pQInfo->runtimeEnv.prevRow; char* start = POINTER_BYTES * pQuery->numOfCols + (char*) pQInfo->runtimeEnv.prevRow;
......
...@@ -43,7 +43,7 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) { ...@@ -43,7 +43,7 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) {
return size; return size;
} }
int32_t initWindowResInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) { int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) {
pResultRowInfo->capacity = size; pResultRowInfo->capacity = size;
pResultRowInfo->type = type; pResultRowInfo->type = type;
...@@ -59,10 +59,11 @@ int32_t initWindowResInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t ...@@ -59,10 +59,11 @@ int32_t initWindowResInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void cleanupTimeWindowInfo(SResultRowInfo *pResultRowInfo) { void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
if (pResultRowInfo == NULL) { if (pResultRowInfo == NULL) {
return; return;
} }
if (pResultRowInfo->capacity == 0) { if (pResultRowInfo->capacity == 0) {
assert(pResultRowInfo->pResult == NULL); assert(pResultRowInfo->pResult == NULL);
return; return;
...@@ -77,7 +78,7 @@ void cleanupTimeWindowInfo(SResultRowInfo *pResultRowInfo) { ...@@ -77,7 +78,7 @@ void cleanupTimeWindowInfo(SResultRowInfo *pResultRowInfo) {
tfree(pResultRowInfo->pResult); tfree(pResultRowInfo->pResult);
} }
void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) {
if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0) {
return; return;
} }
...@@ -100,13 +101,12 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultR ...@@ -100,13 +101,12 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultR
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
} }
void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int32_t num) {
SResultRowInfo *pResultRowInfo = &pRuntimeEnv->windowResInfo;
if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0 || num == 0) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0 || num == 0) {
return; return;
} }
int32_t numOfClosed = numOfClosedTimeWindow(pResultRowInfo); int32_t numOfClosed = numOfClosedResultRows(pResultRowInfo);
assert(num >= 0 && num <= numOfClosed); assert(num >= 0 && num <= numOfClosed);
int16_t type = pResultRowInfo->type; int16_t type = pResultRowInfo->type;
...@@ -159,17 +159,16 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { ...@@ -159,17 +159,16 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
pResultRowInfo->curIndex = -1; pResultRowInfo->curIndex = -1;
} }
void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) { void clearClosedResultRows(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) {
SResultRowInfo *pResultRowInfo = &pRuntimeEnv->windowResInfo;
if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0) {
return; return;
} }
int32_t numOfClosed = numOfClosedTimeWindow(pResultRowInfo); int32_t numOfClosed = numOfClosedResultRows(pResultRowInfo);
clearFirstNWindowRes(pRuntimeEnv, numOfClosed); popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, numOfClosed);
} }
int32_t numOfClosedTimeWindow(SResultRowInfo *pResultRowInfo) { int32_t numOfClosedResultRows(SResultRowInfo *pResultRowInfo) {
int32_t i = 0; int32_t i = 0;
while (i < pResultRowInfo->size && pResultRowInfo->pResult[i]->closed) { while (i < pResultRowInfo->size && pResultRowInfo->pResult[i]->closed) {
++i; ++i;
...@@ -178,7 +177,7 @@ int32_t numOfClosedTimeWindow(SResultRowInfo *pResultRowInfo) { ...@@ -178,7 +177,7 @@ int32_t numOfClosedTimeWindow(SResultRowInfo *pResultRowInfo) {
return i; return i;
} }
void closeAllTimeWindow(SResultRowInfo *pResultRowInfo) { void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size);
for (int32_t i = 0; i < pResultRowInfo->size; ++i) { for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
...@@ -195,7 +194,7 @@ void closeAllTimeWindow(SResultRowInfo *pResultRowInfo) { ...@@ -195,7 +194,7 @@ void closeAllTimeWindow(SResultRowInfo *pResultRowInfo) {
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time. * the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time.
* NOTE: remove redundant, only when the result set order equals to traverse order * NOTE: remove redundant, only when the result set order equals to traverse order
*/ */
void removeRedundantWindow(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order) { void removeRedundantResultRows(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order) {
assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size);
if (pResultRowInfo->size <= 1) { if (pResultRowInfo->size <= 1) {
return; return;
...@@ -224,11 +223,11 @@ void removeRedundantWindow(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_ ...@@ -224,11 +223,11 @@ void removeRedundantWindow(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_
} }
} }
bool isWindowResClosed(SResultRowInfo *pResultRowInfo, int32_t slot) { bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot) {
return (getResultRow(pResultRowInfo, slot)->closed == true); return (getResultRow(pResultRowInfo, slot)->closed == true);
} }
void closeTimeWindow(SResultRowInfo *pResultRowInfo, int32_t slot) { void closeResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
getResultRow(pResultRowInfo, slot)->closed = true; getResultRow(pResultRowInfo, slot)->closed = true;
} }
...@@ -310,7 +309,7 @@ SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRo ...@@ -310,7 +309,7 @@ SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRo
return (SResultRowCellInfo*)((char*) pRow->pCellInfo + pRuntimeEnv->rowCellInfoOffset[index]); return (SResultRowCellInfo*)((char*) pRow->pCellInfo + pRuntimeEnv->rowCellInfoOffset[index]);
} }
size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv) { size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv) {
return (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultRowCellInfo)) + pRuntimeEnv->interBufSize + sizeof(SResultRow); return (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultRowCellInfo)) + pRuntimeEnv->interBufSize + sizeof(SResultRow);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册