提交 61ea7bae 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/2.0tsdb

...@@ -12,12 +12,10 @@ ...@@ -12,12 +12,10 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <taosmsg.h>
#include "os.h" #include "os.h"
#include "qfill.h" #include "qfill.h"
#include "hash.h" #include "hash.h"
#include "hashfunc.h"
#include "qExecutor.h" #include "qExecutor.h"
#include "qUtil.h" #include "qUtil.h"
#include "qast.h" #include "qast.h"
...@@ -25,7 +23,6 @@ ...@@ -25,7 +23,6 @@
#include "query.h" #include "query.h"
#include "queryLog.h" #include "queryLog.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tdataformat.h"
#include "tlosertree.h" #include "tlosertree.h"
#include "tscUtil.h" // todo move the function to common module #include "tscUtil.h" // todo move the function to common module
#include "tscompression.h" #include "tscompression.h"
...@@ -91,6 +88,9 @@ typedef struct { ...@@ -91,6 +88,9 @@ typedef struct {
} SQueryStatusInfo; } SQueryStatusInfo;
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
static void setQueryStatus(SQuery *pQuery, int8_t status); static void setQueryStatus(SQuery *pQuery, int8_t status);
static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
...@@ -1708,7 +1708,23 @@ static bool onlyFirstQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSD ...@@ -1708,7 +1708,23 @@ static bool onlyFirstQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSD
static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); } static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); }
static void changeExecuteScanOrder(SQuery *pQuery, bool stableQuery) { // todo refactor, add iterator
static void doExchangeTimeWindow(SQInfo* pQInfo) {
size_t t = GET_NUM_OF_TABLEGROUP(pQInfo);
for(int32_t i = 0; i < t; ++i) {
SArray* p1 = GET_TABLEGROUP(pQInfo, i);
size_t len = taosArrayGetSize(p1);
for(int32_t j = 0; j < len; ++j) {
STableQueryInfo* pTableQueryInfo = (STableQueryInfo*) taosArrayGetP(p1, j);
SWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY);
}
}
}
static void changeExecuteScanOrder(SQInfo *pQInfo, bool stableQuery) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
// in case of point-interpolation query, use asc order scan // in case of point-interpolation query, use asc order scan
char msg[] = "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" PRId64 char msg[] = "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" PRId64
"-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64; "-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64;
...@@ -1748,6 +1764,7 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool stableQuery) { ...@@ -1748,6 +1764,7 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool stableQuery) {
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
doExchangeTimeWindow(pQInfo);
} }
pQuery->order.order = TSDB_ORDER_ASC; pQuery->order.order = TSDB_ORDER_ASC;
...@@ -1757,6 +1774,7 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool stableQuery) { ...@@ -1757,6 +1774,7 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool stableQuery) {
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
doExchangeTimeWindow(pQInfo);
} }
pQuery->order.order = TSDB_ORDER_DESC; pQuery->order.order = TSDB_ORDER_DESC;
...@@ -2489,10 +2507,10 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { ...@@ -2489,10 +2507,10 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
int32_t numOfGroups = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
while (pQInfo->groupIndex < numOfGroups) { while (pQInfo->groupIndex < numOfGroups) {
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, pQInfo->groupIndex); SArray *group = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex);
ret = mergeIntoGroupResultImpl(pQInfo, group); ret = mergeIntoGroupResultImpl(pQInfo, group);
if (ret < 0) { // not enough disk space to save the data into disk if (ret < 0) { // not enough disk space to save the data into disk
return -1; return -1;
...@@ -2525,7 +2543,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { ...@@ -2525,7 +2543,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
} }
// check if all results has been sent to client // check if all results has been sent to client
int32_t numOfGroup = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); int32_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
if (pQInfo->numOfGroupResultPages == 0 && pQInfo->groupIndex == numOfGroup) { if (pQInfo->numOfGroupResultPages == 0 && pQInfo->groupIndex == numOfGroup) {
pQInfo->tableIndex = pQInfo->tableqinfoGroupInfo.numOfTables; // set query completed pQInfo->tableIndex = pQInfo->tableqinfoGroupInfo.numOfTables; // set query completed
return; return;
...@@ -2859,10 +2877,10 @@ void disableFuncInReverseScan(SQInfo *pQInfo) { ...@@ -2859,10 +2877,10 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
} }
} }
int32_t numOfGroups = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
for(int32_t i = 0; i < numOfGroups; ++i) { for(int32_t i = 0; i < numOfGroups; ++i) {
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, i); SArray *group = GET_TABLEGROUP(pQInfo, i);
size_t t = taosArrayGetSize(group); size_t t = taosArrayGetSize(group);
for (int32_t j = 0; j < t; ++j) { for (int32_t j = 0; j < t; ++j) {
...@@ -3349,7 +3367,7 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * ...@@ -3349,7 +3367,7 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
pCtx->resultInfo = &pResult->resultInfo[i]; pCtx->resultInfo = &pResult->resultInfo[i];
if (pCtx->resultInfo->complete) { if (pCtx->resultInfo->initialized && pCtx->resultInfo->complete) {
continue; continue;
} }
...@@ -3479,7 +3497,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) { ...@@ -3479,7 +3497,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) {
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo); totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
} else { } else {
totalSubset = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); totalSubset = GET_NUM_OF_TABLEGROUP(pQInfo);
} }
return totalSubset; return totalSubset;
...@@ -3619,36 +3637,40 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo * ...@@ -3619,36 +3637,40 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *
bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) { bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo; SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo;
// todo refactor
if (pQuery->fillType == TSDB_FILL_NONE || (pQuery->fillType != TSDB_FILL_NONE && isPointInterpoQuery(pQuery))) {
assert(pFillInfo == NULL);
return false;
}
if (pQuery->limit.limit > 0 && pQuery->rec.rows >= pQuery->limit.limit) { if (pQuery->limit.limit > 0 && pQuery->rec.total >= pQuery->limit.limit) {
return false; return false;
} }
// There are results not returned to client, fill operation applied to the remain result set in the if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
// first place is required. // There are results not returned to client yet, so filling operation applied to the remain result is required
int32_t remain = taosNumOfRemainRows(pFillInfo); // in the first place.
if (remain > 0) { int32_t remain = taosNumOfRemainRows(pFillInfo);
return true; if (remain > 0) {
} return true;
}
/*
* While the code reaches here, there are no results returned to client now. /*
* If query is not completed yet, the gaps between two results blocks need to be handled after next data block * While the code reaches here, there are no results remains now.
* is retrieved from TSDB. * If query is not completed yet, the gaps between two results blocks need to be handled after next data block
* * is retrieved from TSDB.
* NOTE: If the result set is not the first block, the gap in front of the result set will be filled. If the result *
* set is the FIRST result block, the gap between the start time of query time window and the timestamp of the * NOTE: If the result set is not the first block, the gap in front of the result set will be filled. If the result
* first result row in the actual result set will fill nothing. * set is the FIRST result block, the gap between the start time of query time window and the timestamp of the
*/ * first result row in the actual result set will fill nothing.
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { */
int32_t numOfTotal = getFilledNumOfRes(pFillInfo, pQuery->window.ekey, pQuery->rec.capacity); if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
return numOfTotal > 0; int32_t numOfTotal = getFilledNumOfRes(pFillInfo, pQuery->window.ekey, pQuery->rec.capacity);
return numOfTotal > 0;
}
} else {
// there are results waiting for returned to client.
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) &&
(isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) &&
(pRuntimeEnv->windowResInfo.size > 0)) {
return true;
}
} }
return false; return false;
...@@ -3690,7 +3712,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -3690,7 +3712,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
} }
} }
int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t *numOfInterpo) { int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t *numOfFilled) {
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo; SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo;
...@@ -3995,7 +4017,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) { ...@@ -3995,7 +4017,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
&& (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) && (!isGroupbyNormalCol(pQuery->pGroupbyExpr))
&& (!isFixedOutputQuery(pQuery)) && (!isFixedOutputQuery(pQuery))
) { ) {
SArray* pa = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0); SArray* pa = GET_TABLEGROUP(pQInfo, 0);
STableQueryInfo* pCheckInfo = taosArrayGetP(pa, 0); STableQueryInfo* pCheckInfo = taosArrayGetP(pa, 0);
cond.twindow = pCheckInfo->win; cond.twindow = pCheckInfo->win;
} }
...@@ -4039,7 +4061,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool ...@@ -4039,7 +4061,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
pQuery->precision = tsdbGetCfg(tsdb)->precision; pQuery->precision = tsdbGetCfg(tsdb)->precision;
setScanLimitationByResultBuffer(pQuery); setScanLimitationByResultBuffer(pQuery);
changeExecuteScanOrder(pQuery, false); changeExecuteScanOrder(pQInfo, false);
setupQueryHandle(tsdb, pQInfo, isSTableQuery); setupQueryHandle(tsdb, pQInfo, isSTableQuery);
pQInfo->tsdb = tsdb; pQInfo->tsdb = tsdb;
...@@ -4142,9 +4164,9 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { ...@@ -4142,9 +4164,9 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
STableQueryInfo *pTableQueryInfo = NULL; STableQueryInfo *pTableQueryInfo = NULL;
// todo opt performance using hash table // todo opt performance using hash table
size_t numOfGroup = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
for (int32_t i = 0; i < numOfGroup; ++i) { for (int32_t i = 0; i < numOfGroup; ++i) {
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, i); SArray *group = GET_TABLEGROUP(pQInfo, i);
size_t num = taosArrayGetSize(group); size_t num = taosArrayGetSize(group);
for (int32_t j = 0; j < num; ++j) { for (int32_t j = 0; j < num; ++j) {
...@@ -4197,7 +4219,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { ...@@ -4197,7 +4219,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED); setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0); SArray *group = GET_TABLEGROUP(pQInfo, 0);
STableQueryInfo* pCheckInfo = taosArrayGetP(group, index); STableQueryInfo* pCheckInfo = taosArrayGetP(group, index);
setTagVal(pRuntimeEnv, pCheckInfo->pTable, pQInfo->tsdb); setTagVal(pRuntimeEnv, pCheckInfo->pTable, pQInfo->tsdb);
...@@ -4261,7 +4283,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4261,7 +4283,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
size_t numOfGroups = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
if (isPointInterpoQuery(pQuery) || isFirstLastRowQuery(pQuery)) { if (isPointInterpoQuery(pQuery) || isFirstLastRowQuery(pQuery)) {
resetCtxOutputBuf(pRuntimeEnv); resetCtxOutputBuf(pRuntimeEnv);
...@@ -4311,7 +4333,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4311,7 +4333,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
taosArrayDestroy(s); taosArrayDestroy(s);
// here we simply set the first table as current table // here we simply set the first table as current table
pQuery->current = (STableQueryInfo*) taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0); pQuery->current = (STableQueryInfo*) GET_TABLEGROUP(pQInfo, 0);
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey); scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
int64_t numOfRes = getNumOfResult(pRuntimeEnv); int64_t numOfRes = getNumOfResult(pRuntimeEnv);
...@@ -4424,7 +4446,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4424,7 +4446,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
resetCtxOutputBuf(pRuntimeEnv); resetCtxOutputBuf(pRuntimeEnv);
resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo);
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0); SArray *group = GET_TABLEGROUP(pQInfo, 0);
assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables && assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables &&
1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList)); 1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList));
...@@ -4575,9 +4597,9 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) { ...@@ -4575,9 +4597,9 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if (isIntervalQuery(pQuery)) { if (isIntervalQuery(pQuery)) {
size_t numOfGroup = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
for (int32_t i = 0; i < numOfGroup; ++i) { for (int32_t i = 0; i < numOfGroup; ++i) {
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, i); SArray *group = GET_TABLEGROUP(pQInfo, i);
size_t num = taosArrayGetSize(group); size_t num = taosArrayGetSize(group);
for (int32_t j = 0; j < num; ++j) { for (int32_t j = 0; j < num; ++j) {
...@@ -4794,7 +4816,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { ...@@ -4794,7 +4816,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->current = pTableInfo; pQuery->current = pTableInfo;
int32_t numOfInterpo = 0; int32_t numOfFilled = 0;
TSKEY newStartKey = TSKEY_INITIAL_VAL; TSKEY newStartKey = TSKEY_INITIAL_VAL;
// skip blocks without load the actual data block from file if no filter condition present // skip blocks without load the actual data block from file if no filter condition present
...@@ -4822,9 +4844,9 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { ...@@ -4822,9 +4844,9 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
} else { } else {
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, pQuery->window.ekey); taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, pQuery->window.ekey);
taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata); taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata);
numOfInterpo = 0; numOfFilled = 0;
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfInterpo); pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfFilled);
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
limitResults(pRuntimeEnv); limitResults(pRuntimeEnv);
break; break;
...@@ -4843,7 +4865,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { ...@@ -4843,7 +4865,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex); clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
} }
pQInfo->pointsInterpo += numOfInterpo; pQInfo->pointsInterpo += numOfFilled;
} }
static void tableQueryImpl(SQInfo *pQInfo) { static void tableQueryImpl(SQInfo *pQInfo) {
...@@ -4851,45 +4873,41 @@ static void tableQueryImpl(SQInfo *pQInfo) { ...@@ -4851,45 +4873,41 @@ static void tableQueryImpl(SQInfo *pQInfo) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
if (queryHasRemainResults(pRuntimeEnv)) { if (queryHasRemainResults(pRuntimeEnv)) {
/*
* There are remain results that are not returned due to result interpolation
* So, we do keep in this procedure instead of launching retrieve procedure for next results.
*/
int32_t numOfInterpo = 0;
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfInterpo);
if (pQuery->rec.rows > 0) {
limitResults(pRuntimeEnv);
}
qDebug("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total);
return;
}
// here we have scan all qualified data in both data file and cache if (pQuery->fillType != TSDB_FILL_NONE) {
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { /*
// continue to get push data from the group result * There are remain results that are not returned due to result interpolation
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || * So, we do keep in this procedure instead of launching retrieve procedure for next results.
((isIntervalQuery(pQuery) && pQuery->rec.total < pQuery->limit.limit))) { */
// todo limit the output for interval query? int32_t numOfFilled = 0;
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfFilled);
if (pQuery->rec.rows > 0) {
limitResults(pRuntimeEnv);
}
qDebug("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total);
return;
} else {
pQuery->rec.rows = 0; pQuery->rec.rows = 0;
pQInfo->groupIndex = 0; // always start from 0 pQInfo->groupIndex = 0; // always start from 0
if (pRuntimeEnv->windowResInfo.size > 0) { if (pRuntimeEnv->windowResInfo.size > 0) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
pQuery->rec.rows += pQuery->rec.rows;
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex); clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
if (pQuery->rec.rows > 0) { if (pQuery->rec.rows > 0) {
qDebug("QInfo:%p %"PRId64" rows returned from group results, total:%"PRId64"", pQInfo, pQuery->rec.rows, pQuery->rec.total); qDebug("QInfo:%p %"PRId64" rows returned from group results, total:%"PRId64"", pQInfo, pQuery->rec.rows, pQuery->rec.total);
// there are not data remains
if (pRuntimeEnv->windowResInfo.size <= 0) {
qDebug("QInfo:%p query over, %"PRId64" rows are returned", pQInfo, pQuery->rec.total);
}
return; return;
} }
} }
} }
qDebug("QInfo:%p query over, %"PRId64" rows are returned", pQInfo, pQuery->rec.total);
return;
} }
// number of points returned during this query // number of points returned during this query
...@@ -4897,7 +4915,7 @@ static void tableQueryImpl(SQInfo *pQInfo) { ...@@ -4897,7 +4915,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1); assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1);
SArray* g = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0); SArray* g = GET_TABLEGROUP(pQInfo, 0);
STableQueryInfo* item = taosArrayGetP(g, 0); STableQueryInfo* item = taosArrayGetP(g, 0);
// group by normal column, sliding window query, interval query are handled by interval query processor // group by normal column, sliding window query, interval query are handled by interval query processor
...@@ -5658,26 +5676,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -5658,26 +5676,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
return pQInfo; return pQInfo;
_cleanup: _cleanup:
//tfree(pQuery->fillVal);
//if (pQuery->sdata != NULL) {
// for (int16_t col = 0; col < pQuery->numOfOutput; ++col) {
// tfree(pQuery->sdata[col]);
// }
//}
//
//tfree(pQuery->sdata);
//tfree(pQuery->pFilterInfo);
//tfree(pQuery->colList);
//tfree(pExprs);
//tfree(pGroupbyExpr);
//taosArrayDestroy(pQInfo->arrTableIdInfo);
//tsdbDestoryTableGroup(&pQInfo->tableGroupInfo);
//
//tfree(pQInfo);
freeQInfo(pQInfo); freeQInfo(pQInfo);
return NULL; return NULL;
...@@ -5712,7 +5710,6 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ ...@@ -5712,7 +5710,6 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
UNUSED(ret); UNUSED(ret);
} }
// only the successful complete requries the sem_post/over = 1 operations.
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) || if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) ||
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
qDebug("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, qDebug("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey,
...@@ -5722,7 +5719,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ ...@@ -5722,7 +5719,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) { if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo); qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo);
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
...@@ -5784,9 +5781,9 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -5784,9 +5781,9 @@ static void freeQInfo(SQInfo *pQInfo) {
} }
// todo refactor, extract method to destroytableDataInfo // todo refactor, extract method to destroytableDataInfo
int32_t numOfGroups = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
for (int32_t i = 0; i < numOfGroups; ++i) { for (int32_t i = 0; i < numOfGroups; ++i) {
SArray *p = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, i); SArray *p = GET_TABLEGROUP(pQInfo, i);;
size_t num = taosArrayGetSize(p); size_t num = taosArrayGetSize(p);
for(int32_t j = 0; j < num; ++j) { for(int32_t j = 0; j < num; ++j) {
...@@ -6053,6 +6050,11 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { ...@@ -6053,6 +6050,11 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
return; return;
} }
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table exists for query, abort", pQInfo);
return;
}
qDebug("QInfo:%p query task is launched", pQInfo); qDebug("QInfo:%p query task is launched", pQInfo);
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) { if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
...@@ -6175,14 +6177,14 @@ static void buildTagQueryResult(SQInfo* pQInfo) { ...@@ -6175,14 +6177,14 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
size_t numOfGroup = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
assert(numOfGroup == 0 || numOfGroup == 1); assert(numOfGroup == 0 || numOfGroup == 1);
if (numOfGroup == 0) { if (numOfGroup == 0) {
return; return;
} }
SArray* pa = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0); SArray* pa = GET_TABLEGROUP(pQInfo, 0);
size_t num = taosArrayGetSize(pa); size_t num = taosArrayGetSize(pa);
assert(num == pQInfo->tableqinfoGroupInfo.numOfTables); assert(num == pQInfo->tableqinfoGroupInfo.numOfTables);
......
...@@ -133,7 +133,6 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { ...@@ -133,7 +133,6 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
} }
pWindowResInfo->size = remain; pWindowResInfo->size = remain;
printf("---------------size:%ld\n", taosHashGetSize(pWindowResInfo->hashList));
for (int32_t k = 0; k < pWindowResInfo->size; ++k) { for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
SWindowResult *pResult = &pWindowResInfo->pResult[k]; SWindowResult *pResult = &pWindowResInfo->pResult[k];
int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->window.skey, int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->window.skey,
......
...@@ -121,7 +121,7 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { ...@@ -121,7 +121,7 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
TSDB_FILE_HEAD_SIZE, pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno)); TSDB_FILE_HEAD_SIZE, pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno));
errno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
...@@ -144,7 +144,7 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { ...@@ -144,7 +144,7 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
return tsdbLoadCompIdx(pHelper, NULL); return tsdbLoadCompIdx(pHelper, NULL);
_err: _err:
return -1; return terrno;
} }
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
......
...@@ -148,7 +148,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab ...@@ -148,7 +148,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = -1; pQueryHandle->cur.fid = -1;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER; pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
pQueryHandle->checkFiles = true;//ASCENDING_TRAVERSE(pQueryHandle->order); pQueryHandle->checkFiles = true;
pQueryHandle->activeIndex = 0; // current active table index pQueryHandle->activeIndex = 0; // current active table index
pQueryHandle->qinfo = qinfo; pQueryHandle->qinfo = qinfo;
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock; pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
...@@ -475,11 +475,15 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK ...@@ -475,11 +475,15 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK
} }
static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) { static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) {
// todo check open file failed
SFileGroup* fileGroup = pQueryHandle->pFileGroup; SFileGroup* fileGroup = pQueryHandle->pFileGroup;
assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0); assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0);
tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup);
int32_t code = tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup);
//open file failed, return error code to client
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// load all the comp offset value for all tables in this file // load all the comp offset value for all tables in this file
*numOfBlocks = 0; *numOfBlocks = 0;
...@@ -593,8 +597,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo ...@@ -593,8 +597,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
if (pCheckInfo->pDataCols == NULL) { if (pCheckInfo->pDataCols == NULL) {
STsdbMeta* pMeta = tsdbGetMeta(pRepo); STsdbMeta* pMeta = tsdbGetMeta(pRepo);
// TODO // TODO
pCheckInfo->pDataCols = pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
} }
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj)); tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj));
...@@ -1359,16 +1362,18 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -1359,16 +1362,18 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
} }
// todo opt for only one table case // todo opt for only one table case
static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) { static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* exists) {
pQueryHandle->numOfBlocks = 0; pQueryHandle->numOfBlocks = 0;
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) { while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
int32_t type = ASCENDING_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL; int32_t type = ASCENDING_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL;
if (getFileCompInfo(pQueryHandle, &numOfBlocks, type) != TSDB_CODE_SUCCESS) { if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks, type)) != TSDB_CODE_SUCCESS) {
break; break;
} }
...@@ -1393,20 +1398,25 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) { ...@@ -1393,20 +1398,25 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) {
// no data in file anymore // no data in file anymore
if (pQueryHandle->numOfBlocks <= 0) { if (pQueryHandle->numOfBlocks <= 0) {
assert(pQueryHandle->pFileGroup == NULL); if (code == TSDB_CODE_SUCCESS) {
assert(pQueryHandle->pFileGroup == NULL);
}
cur->fid = -1; // denote that there are no data in file anymore cur->fid = -1; // denote that there are no data in file anymore
*exists = false;
return false; return code;
} }
cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1; cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
cur->fid = pQueryHandle->pFileGroup->fileId; cur->fid = pQueryHandle->pFileGroup->fileId;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
return loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo); *exists = loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo);
return TSDB_CODE_SUCCESS;
} }
static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) {
STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb); STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb);
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
...@@ -1419,7 +1429,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { ...@@ -1419,7 +1429,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order); tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid); tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
return getDataBlocksInFilesImpl(pQueryHandle); return getDataBlocksInFilesImpl(pQueryHandle, exists);
} else { } else {
// check if current file block is all consumed // check if current file block is all consumed
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
...@@ -1430,7 +1440,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { ...@@ -1430,7 +1440,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) || if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) { (cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// all data blocks in current file has been checked already, try next file if exists // all data blocks in current file has been checked already, try next file if exists
return getDataBlocksInFilesImpl(pQueryHandle); return getDataBlocksInFilesImpl(pQueryHandle, exists);
} else { } else {
// next block of the same file // next block of the same file
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
...@@ -1440,11 +1450,15 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { ...@@ -1440,11 +1450,15 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
cur->blockCompleted = false; cur->blockCompleted = false;
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo); *exists = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo);
return TSDB_CODE_SUCCESS;
} }
} else { } else {
handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo); handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
return pQueryHandle->realNumOfRows > 0; *exists = pQueryHandle->realNumOfRows > 0;
return TSDB_CODE_SUCCESS;
} }
} }
} }
...@@ -1576,8 +1590,14 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1576,8 +1590,14 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
} }
if (pQueryHandle->checkFiles) { if (pQueryHandle->checkFiles) {
if (getDataBlocksInFiles(pQueryHandle)) { bool exists = true;
return true; int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
if (exists) {
return exists;
} }
pQueryHandle->activeIndex = 0; pQueryHandle->activeIndex = 0;
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
4. pip install ../src/connector/python/linux/python2 ; pip3 install 4. pip install ../src/connector/python/linux/python2 ; pip3 install
../src/connector/python/linux/python3 ../src/connector/python/linux/python3
5. pip install numpy; pip3 install numpy 5. pip install numpy; pip3 install numpy (numpy is required only if you need to run querySort.py)
> Note: Both Python2 and Python3 are currently supported by the Python test > Note: Both Python2 and Python3 are currently supported by the Python test
> framework. Since Python2 is no longer officially supported by Python Software > framework. Since Python2 is no longer officially supported by Python Software
......
...@@ -77,11 +77,7 @@ class TDTestCase: ...@@ -77,11 +77,7 @@ class TDTestCase:
# join queries # join queries
tdSql.query( tdSql.query(
"select * from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id") "select * from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id")
tdSql.checkRows(6) tdSql.checkRows(6)
tdSql.query(
"select * from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id order by ts desc")
tdSql.checkColumnSorted(0, "desc")
tdSql.error( tdSql.error(
"select ts, pressure, temperature, id, dscrption from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id") "select ts, pressure, temperature, id, dscrption from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id")
......
...@@ -16,6 +16,7 @@ import taos ...@@ -16,6 +16,7 @@ import taos
from util.log import * from util.log import *
from util.cases import * from util.cases import *
from util.sql import * from util.sql import *
import numpy as np
class TDTestCase: class TDTestCase:
...@@ -26,6 +27,46 @@ class TDTestCase: ...@@ -26,6 +27,46 @@ class TDTestCase:
self.rowNum = 10 self.rowNum = 10
self.ts = 1537146000000 self.ts = 1537146000000
def checkColumnSorted(self, col, order):
frame = inspect.stack()[1]
callerModule = inspect.getmodule(frame[0])
callerFilename = callerModule.__file__
if col < 0:
tdLog.exit(
"%s failed: sql:%s, col:%d is smaller than zero" %
(callerFilename, tdSql.sql, col))
if col > tdSql.queryCols:
tdLog.exit(
"%s failed: sql:%s, col:%d is larger than queryCols:%d" %
(callerFilename, tdSql.sql, col, tdSql.queryCols))
matrix = np.array(tdSql.queryResult)
list = matrix[:, 0]
if order == "" or order.upper() == "ASC":
if all(sorted(list) == list):
tdLog.info(
"sql:%s, column :%d is sorted in accending order as expected" %
(tdSql.sql, col))
else:
tdLog.exit(
"%s failed: sql:%s, col:%d is not sorted in accesnind order" %
(callerFilename, tdSql.sql, col))
elif order.upper() == "DESC":
if all(sorted(list, reverse=True) == list):
tdLog.info(
"sql:%s, column :%d is sorted in decending order as expected" %
(tdSql.sql, col))
else:
tdLog.exit(
"%s failed: sql:%s, col:%d is not sorted in decending order" %
(callerFilename, tdSql.sql, col))
else:
tdLog.exit(
"%s failed: sql:%s, the order provided for col:%d is not correct" %
(callerFilename, tdSql.sql, col))
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
...@@ -49,11 +90,11 @@ class TDTestCase: ...@@ -49,11 +90,11 @@ class TDTestCase:
print("======= step 2: verify order for each column =========") print("======= step 2: verify order for each column =========")
# sort for timestamp in asc order # sort for timestamp in asc order
tdSql.query("select * from st order by ts asc") tdSql.query("select * from st order by ts asc")
tdSql.checkColumnSorted(0, "asc") self.checkColumnSorted(0, "asc")
# sort for timestamp in desc order # sort for timestamp in desc order
tdSql.query("select * from st order by ts desc") tdSql.query("select * from st order by ts desc")
tdSql.checkColumnSorted(0, "desc") self.checkColumnSorted(0, "desc")
for i in range(1, 10): for i in range(1, 10):
tdSql.error("select * from st order by tbcol%d" % i) tdSql.error("select * from st order by tbcol%d" % i)
...@@ -63,17 +104,17 @@ class TDTestCase: ...@@ -63,17 +104,17 @@ class TDTestCase:
tdSql.query( tdSql.query(
"select avg(tbcol1) from st group by tagcol%d order by tagcol%d" % "select avg(tbcol1) from st group by tagcol%d order by tagcol%d" %
(i, i)) (i, i))
tdSql.checkColumnSorted(1, "") self.checkColumnSorted(1, "")
tdSql.query( tdSql.query(
"select avg(tbcol1) from st group by tagcol%d order by tagcol%d asc" % "select avg(tbcol1) from st group by tagcol%d order by tagcol%d asc" %
(i, i)) (i, i))
tdSql.checkColumnSorted(1, "asc") self.checkColumnSorted(1, "asc")
tdSql.query( tdSql.query(
"select avg(tbcol1) from st group by tagcol%d order by tagcol%d desc" % "select avg(tbcol1) from st group by tagcol%d order by tagcol%d desc" %
(i, i)) (i, i))
tdSql.checkColumnSorted(1, "desc") self.checkColumnSorted(1, "desc")
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -43,7 +43,7 @@ python3 ./test.py -f tag_lite/commit.py ...@@ -43,7 +43,7 @@ python3 ./test.py -f tag_lite/commit.py
python3 ./test.py -f tag_lite/create.py python3 ./test.py -f tag_lite/create.py
python3 ./test.py -f tag_lite/datatype.py python3 ./test.py -f tag_lite/datatype.py
python3 ./test.py -f tag_lite/datatype-without-alter.py python3 ./test.py -f tag_lite/datatype-without-alter.py
# python3 ./test.py -f tag_lite/delete.py python3 ./test.py -f tag_lite/delete.py
python3 ./test.py -f tag_lite/double.py python3 ./test.py -f tag_lite/double.py
python3 ./test.py -f tag_lite/float.py python3 ./test.py -f tag_lite/float.py
python3 ./test.py -f tag_lite/int_binary.py python3 ./test.py -f tag_lite/int_binary.py
...@@ -134,9 +134,10 @@ python3 ./test.py -f table/del_stable.py ...@@ -134,9 +134,10 @@ python3 ./test.py -f table/del_stable.py
python3 ./test.py -f query/filter.py python3 ./test.py -f query/filter.py
python3 ./test.py -f query/filterAllIntTypes.py python3 ./test.py -f query/filterAllIntTypes.py
python3 ./test.py -f query/filterFloatAndDouble.py python3 ./test.py -f query/filterFloatAndDouble.py
python3 ./test.py -f query/filterOtherTypes.py
python3 ./test.py -f query/queryError.py
python3 ./test.py -f query/querySort.py python3 ./test.py -f query/querySort.py
#stream #stream
python3 ./test.py -f stream/stream1.py python3 ./test.py -f stream/stream1.py
python3 ./test.py -f stream/stream2.py python3 ./test.py -f stream/stream2.py
......
...@@ -8,24 +8,8 @@ python3 ./test.py $1 -s && sleep 1 ...@@ -8,24 +8,8 @@ python3 ./test.py $1 -s && sleep 1
# insert # insert
python3 ./test.py $1 -f insert/basic.py python3 ./test.py $1 -f insert/basic.py
python3 ./test.py $1 -s && sleep 1 python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/int.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/float.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/bigint.py python3 ./test.py $1 -f insert/bigint.py
python3 ./test.py $1 -s && sleep 1 python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/bool.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/double.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/smallint.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/tinyint.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/binary.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/date.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/nchar.py python3 ./test.py $1 -f insert/nchar.py
python3 ./test.py $1 -s && sleep 1 python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/multi.py python3 ./test.py $1 -f insert/multi.py
...@@ -42,18 +26,6 @@ python3 ./test.py $1 -s && sleep 1 ...@@ -42,18 +26,6 @@ python3 ./test.py $1 -s && sleep 1
# import # import
python3 ./test.py $1 -f import_merge/importDataLastSub.py python3 ./test.py $1 -f import_merge/importDataLastSub.py
python3 ./test.py $1 -s && sleep 1 python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f import_merge/importHead.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f import_merge/importLastT.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f import_merge/importSpan.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f import_merge/importTail.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f import_merge/importTRestart.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f import_merge/importInsertThenImport.py
python3 ./test.py $1 -s && sleep 1
#tag #tag
python3 ./test.py $1 -f tag_lite/filter.py python3 ./test.py $1 -f tag_lite/filter.py
......
...@@ -17,7 +17,6 @@ import time ...@@ -17,7 +17,6 @@ import time
import datetime import datetime
import inspect import inspect
from util.log import * from util.log import *
import numpy as np
class TDSql: class TDSql:
...@@ -199,47 +198,7 @@ class TDSql: ...@@ -199,47 +198,7 @@ class TDSql:
"%s failed: sql:%s, affectedRows:%d != expect:%d" % "%s failed: sql:%s, affectedRows:%d != expect:%d" %
(callerFilename, self.sql, self.affectedRows, expectAffectedRows)) (callerFilename, self.sql, self.affectedRows, expectAffectedRows))
tdLog.info("sql:%s, affectedRows:%d == expect:%d" % tdLog.info("sql:%s, affectedRows:%d == expect:%d" %
(self.sql, self.affectedRows, expectAffectedRows)) (self.sql, self.affectedRows, expectAffectedRows))
def checkColumnSorted(self, col, order):
frame = inspect.stack()[1]
callerModule = inspect.getmodule(frame[0])
callerFilename = callerModule.__file__
if col < 0:
tdLog.exit(
"%s failed: sql:%s, col:%d is smaller than zero" %
(callerFilename, self.sql, col))
if col > self.queryCols:
tdLog.exit(
"%s failed: sql:%s, col:%d is larger than queryCols:%d" %
(callerFilename, self.sql, col, self.queryCols))
matrix = np.array(self.queryResult)
list = matrix[:, 0]
if order == "" or order.upper() == "ASC":
if all(sorted(list) == list):
tdLog.info(
"sql:%s, column :%d is sorted in accending order as expected" %
(self.sql, col))
else:
tdLog.exit(
"%s failed: sql:%s, col:%d is not sorted in accesnind order" %
(callerFilename, self.sql, col))
elif order.upper() == "DESC":
if all(sorted(list, reverse=True) == list):
tdLog.info(
"sql:%s, column :%d is sorted in decending order as expected" %
(self.sql, col))
else:
tdLog.exit(
"%s failed: sql:%s, col:%d is not sorted in decending order" %
(callerFilename, self.sql, col))
else:
tdLog.exit(
"%s failed: sql:%s, the order provided for col:%d is not correct" %
(callerFilename, self.sql, col))
tdSql = TDSql() tdSql = TDSql()
run general/cache/new_metrics.sim run general/cache/new_metrics.sim
run general/column/commit.sim
run general/compress/compress.sim
run general/compute/interval.sim run general/compute/interval.sim
run general/db/basic4.sim run general/http/restful_full.sim
run general/field/binary.sim
run general/http/restful_insert.sim
run general/import/commit.sim run general/import/commit.sim
run general/import/replica1.sim run general/import/replica1.sim
run general/parser/auto_create_tb_drop_tb.sim run general/parser/auto_create_tb_drop_tb.sim
run general/parser/binary_escapeCharacter.sim run general/parser/binary_escapeCharacter.sim
run general/parser/select_from_cache_disk.sim run general/parser/select_from_cache_disk.sim
run general/parser/projection_limit_offset.sim
run general/parser/join.sim
run general/parser/where.sim
run general/parser/projection_limit_offset.sim
run general/stable/vnode3.sim run general/stable/vnode3.sim
run general/table/autocreate.sim
run general/table/fill.sim
run general/table/vgroup.sim
run general/tag/filter.sim run general/tag/filter.sim
run general/table/vgroup.sim
run general/user/authority.sim run general/user/authority.sim
run general/user/pass_alter.sim
run general/vector/metrics_mix.sim run general/vector/metrics_mix.sim
run general/vector/table_field.sim run general/vector/table_field.sim
...@@ -232,24 +232,3 @@ cd ../../../debug; make ...@@ -232,24 +232,3 @@ cd ../../../debug; make
./test.sh -f general/vector/table_mix.sim ./test.sh -f general/vector/table_mix.sim
./test.sh -f general/vector/table_query.sim ./test.sh -f general/vector/table_query.sim
./test.sh -f general/vector/table_time.sim ./test.sh -f general/vector/table_time.sim
./test.sh -f unique/account/account_create.sim
./test.sh -f unique/account/account_delete.sim
./test.sh -f unique/account/account_len.sim
./test.sh -f unique/account/authority.sim
./test.sh -f unique/account/basic.sim
./test.sh -f unique/account/paras.sim
./test.sh -f unique/account/pass_alter.sim
./test.sh -f unique/account/pass_len.sim
./test.sh -f unique/account/usage.sim
./test.sh -f unique/account/user_create.sim
./test.sh -f unique/account/user_len.sim
./test.sh -f unique/cluster/balance1.sim
./test.sh -f unique/cluster/balance2.sim
./test.sh -f unique/dnode/balance1.sim
./test.sh -f unique/dnode/balance2.sim
./test.sh -f unique/stable/dnode3.sim
./test.sh -f unique/mnode/mgmt22.sim
./test.sh -f unique/mnode/mgmt33.sim
./test.sh -f unique/vnode/many.sim
\ No newline at end of file
...@@ -102,9 +102,10 @@ cd ../../../debug; make ...@@ -102,9 +102,10 @@ cd ../../../debug; make
./test.sh -f unique/arbitrator/dn3_mn1_replica_change_dropDnod.sim ./test.sh -f unique/arbitrator/dn3_mn1_replica_change_dropDnod.sim
./test.sh -f unique/arbitrator/dn3_mn1_replica_change.sim ./test.sh -f unique/arbitrator/dn3_mn1_replica_change.sim
./test.sh -f unique/arbitrator/dn3_mn1_stopDnode_timeout.sim ./test.sh -f unique/arbitrator/dn3_mn1_stopDnode_timeout.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_change.sim #./test.sh -f unique/arbitrator/dn3_mn1_vnode_change.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_offline.sim #./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_offline.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_online.sim #./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_online.sim
#./test.sh -f unique/arbitrator/dn3_mn1_vnode_createErrData_online.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_noCorruptFile_offline.sim ./test.sh -f unique/arbitrator/dn3_mn1_vnode_noCorruptFile_offline.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_delDir.sim ./test.sh -f unique/arbitrator/dn3_mn1_vnode_delDir.sim
./test.sh -f unique/arbitrator/dn3_mn1_r2_vnode_delDir.sim ./test.sh -f unique/arbitrator/dn3_mn1_r2_vnode_delDir.sim
......
##unsupport run general/alter/cached_schema_after_alter.sim ##unsupport run general/alter/cached_schema_after_alter.sim
unsupport run general/alter/count.sim run general/alter/count.sim
unsupport run general/alter/import.sim run general/alter/import.sim
##unsupport run general/alter/insert1.sim ##unsupport run general/alter/insert1.sim
unsupport run general/alter/insert2.sim run general/alter/insert2.sim
unsupport run general/alter/metrics.sim run general/alter/metrics.sim
unsupport run general/alter/table.sim run general/alter/table.sim
run general/cache/new_metrics.sim run general/cache/new_metrics.sim
run general/cache/restart_metrics.sim run general/cache/restart_metrics.sim
run general/cache/restart_table.sim run general/cache/restart_table.sim
...@@ -86,14 +86,14 @@ run general/insert/query_block2_file.sim ...@@ -86,14 +86,14 @@ run general/insert/query_block2_file.sim
run general/insert/query_file_memory.sim run general/insert/query_file_memory.sim
run general/insert/query_multi_file.sim run general/insert/query_multi_file.sim
run general/insert/tcp.sim run general/insert/tcp.sim
##unsupport run general/parser/alter.sim run general/parser/alter.sim
run general/parser/alter1.sim run general/parser/alter1.sim
run general/parser/alter_stable.sim run general/parser/alter_stable.sim
run general/parser/auto_create_tb.sim run general/parser/auto_create_tb.sim
run general/parser/auto_create_tb_drop_tb.sim run general/parser/auto_create_tb_drop_tb.sim
run general/parser/col_arithmetic_operation.sim run general/parser/col_arithmetic_operation.sim
run general/parser/columnValue.sim run general/parser/columnValue.sim
#run general/parser/commit.sim run general/parser/commit.sim
run general/parser/create_db.sim run general/parser/create_db.sim
run general/parser/create_mt.sim run general/parser/create_mt.sim
run general/parser/create_tb.sim run general/parser/create_tb.sim
...@@ -106,7 +106,7 @@ run general/parser/first_last.sim ...@@ -106,7 +106,7 @@ run general/parser/first_last.sim
##unsupport run general/parser/import_file.sim ##unsupport run general/parser/import_file.sim
run general/parser/lastrow.sim run general/parser/lastrow.sim
run general/parser/nchar.sim run general/parser/nchar.sim
##unsupport run general/parser/null_char.sim run general/parser/null_char.sim
run general/parser/single_row_in_tb.sim run general/parser/single_row_in_tb.sim
run general/parser/select_from_cache_disk.sim run general/parser/select_from_cache_disk.sim
run general/parser/limit.sim run general/parser/limit.sim
...@@ -132,7 +132,7 @@ run general/parser/groupby.sim ...@@ -132,7 +132,7 @@ run general/parser/groupby.sim
run general/parser/bug.sim run general/parser/bug.sim
run general/parser/tags_dynamically_specifiy.sim run general/parser/tags_dynamically_specifiy.sim
run general/parser/set_tag_vals.sim run general/parser/set_tag_vals.sim
##unsupport run general/parser/repeatAlter.sim run general/parser/repeatAlter.sim
##unsupport run general/parser/slimit_alter_tags.sim ##unsupport run general/parser/slimit_alter_tags.sim
##unsupport run general/parser/stream_on_sys.sim ##unsupport run general/parser/stream_on_sys.sim
run general/parser/stream.sim run general/parser/stream.sim
......
...@@ -64,7 +64,7 @@ print ======== step7 ...@@ -64,7 +64,7 @@ print ======== step7
$lastRows = $data00 $lastRows = $data00
print ======== loop Times $x print ======== loop Times $x
if $x < 2 then if $x < 5 then
$x = $x + 1 $x = $x + 1
goto loop goto loop
endi endi
......
...@@ -75,7 +75,7 @@ print ======== step8 ...@@ -75,7 +75,7 @@ print ======== step8
$lastRows = $data00 $lastRows = $data00
print ======== loop Times $x print ======== loop Times $x
if $x < 2 then if $x < 5 then
$x = $x + 1 $x = $x + 1
goto loop goto loop
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册