提交 c060d18f 编写于 作者: H hzcheng

Merge branch 'develop' into feature/2.0tsdb

......@@ -96,18 +96,20 @@ enum {
static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* group);
static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult);
static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo);
static int32_t flushFromResultBuf(SQInfo *pQInfo);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow);
static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow);
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY *tsCol, int32_t size,
int32_t functionId, SDataStatis *pStatis, bool hasNull, void *param, int32_t scanFlag);
static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static void destroyMeterQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols);
static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static bool hasMainOutput(SQuery *pQuery);
static void createTableDataInfo(SQInfo* pQInfo);
static int32_t setAdditionalInfo(SQInfo *pQInfo, STable* pTable, STableQueryInfo *pTableQueryInfo);
static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static bool hasMainOutput(SQuery *pQuery);
static int32_t flushFromResultBuf(SQInfo *pQInfo);
bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter *pPointInterpSupporter) {
#if 0
......@@ -2662,22 +2664,19 @@ static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTable
* set tag value in SQLFunctionCtx
* e.g.,tag information into input buffer
*/
static void doSetTagValueInParam(void* tsdb, STableId id, int32_t tagColIdx, tVariant *param) {
assert(tagColIdx >= 0);
static void doSetTagValueInParam(void* tsdb, STableId id, int32_t tagColId, tVariant *param) {
tVariantDestroy(param);
char* val = NULL;
int16_t bytes = 0;
int16_t type = 0;
int16_t type = 0;
tsdbGetTableTagVal(tsdb, id, tagColIdx, &type, &bytes, &val);
tsdbGetTableTagVal(tsdb, id, tagColId, &type, &bytes, &val);
tVariantCreateFromBinary(param, val, bytes, type);
}
void vnodeSetTagValueInParam(STableGroupInfo *groupList, SQueryRuntimeEnv *pRuntimeEnv, STableId id, void* tsdb) {
void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void* tsdb) {
SQuery * pQuery = pRuntimeEnv->pQuery;
// SColumnModel *pTagSchema = NULL;//pSidSet->pColumnModel;
SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase;
if (pQuery->numOfOutputCols == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) {
......@@ -2686,20 +2685,23 @@ void vnodeSetTagValueInParam(STableGroupInfo *groupList, SQueryRuntimeEnv *pRunt
} else {
// set tag value, by which the results are aggregated.
for (int32_t idx = 0; idx < pQuery->numOfOutputCols; ++idx) {
SColIndex *pColEx = &pQuery->pSelectExpr[idx].pBase.colInfo;
SColIndex *pCol = &pQuery->pSelectExpr[idx].pBase.colInfo;
// ts_comp column required the tag value for join filter
if (!TSDB_COL_IS_TAG(pColEx->flag)) {
if (!TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
doSetTagValueInParam(tsdb, id, pColEx->colIndex, &pRuntimeEnv->pCtx[idx].tag);
// todo use tag column index to optimize performance
doSetTagValueInParam(tsdb, id, pCol->colId, &pRuntimeEnv->pCtx[idx].tag);
}
// set the join tag for first column
if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
pRuntimeEnv->pTSBuf != NULL) {
assert(pFuncMsg->numOfParams == 1);
assert(0); // to do fix me
// doSetTagValueInParam(pTagSchema, pFuncMsg->arg->argValue.i64, pMeterSidInfo, &pRuntimeEnv->pCtx[0].tag);
}
}
......@@ -2902,17 +2904,13 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param)
}
int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
// int64_t st = taosGetTimestampMs();
int64_t st = taosGetTimestampMs();
int32_t ret = TSDB_CODE_SUCCESS;
// while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) {
// int32_t start = 0;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx];
// int32_t end = pQInfo->groupInfo.numOfTables - 1;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1];
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
while (pQInfo->subgroupIdx < numOfGroups) {
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->subgroupIdx);
ret = mergeIntoGroupResultImpl(pQInfo, group);
if (ret < 0) { // not enough disk space to save the data into disk
return -1;
......@@ -2921,16 +2919,16 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
pQInfo->subgroupIdx += 1;
// this group generates at least one result, return results
// if (ret > 0) {
// break;
// }
if (ret > 0) {
break;
}
assert(pQInfo->numOfGroupResultPages == 0);
dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1);
// }
dTrace("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->subgroupIdx - 1);
}
// dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms",
// GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st);
dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms",
pQInfo, pQInfo->subgroupIdx - 1, numOfGroups, taosGetTimestampMs() - st);
return TSDB_CODE_SUCCESS;
}
......@@ -3025,11 +3023,11 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* pGroup) {
int32_t numOfTables = 0;
for (int32_t i = 0; i < size; ++i) {
SPair* p = taosArrayGet(pGroup, i);
STableQueryInfo* pInfo = p->sec;
STableDataInfo* pInfo = p->sec;
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, pInfo->tid);
if (list.size > 0 && pInfo->windowResInfo.size > 0) {
// pTableList[numOfTables] = &pTableDataInfo[i];
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, pInfo->pTableQInfo->tid);
if (list.size > 0 && pInfo->pTableQInfo->windowResInfo.size > 0) {
pTableList[numOfTables] = pInfo;
numOfTables += 1;
}
}
......@@ -3151,8 +3149,7 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) {
// the base value for group result, since the maximum number of table for each vnode will not exceed 100,000.
int32_t pageId = -1;
assert(0);
int32_t remain = 0;//pQuery->sdata[0]->num;
int32_t remain = pQuery->sdata[0]->num;
int32_t offset = 0;
while (remain > 0) {
......@@ -3188,7 +3185,7 @@ void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pRes
pCtx[k].startOffset = 0;
pCtx[k].resultInfo = &pResultInfo[k];
pQuery->sdata[k] = 0;
pQuery->sdata[k]->num = 0;
}
}
......@@ -3516,7 +3513,6 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
// store the start query position
// void *pos = tsdbDataBlockTell(pRuntimeEnv->pQueryHandle);
SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv);
int64_t skey = pQuery->lastKey;
......@@ -3755,7 +3751,7 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, STable* pTable, STableQueryInfo *pTabl
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
assert(pTableQueryInfo->lastKey > 0);
vnodeSetTagValueInParam(&pQInfo->groupInfo, pRuntimeEnv, pTable->tableId, pQInfo->tsdb);
setTagVal(pRuntimeEnv, pTable->tableId, pQInfo->tsdb);
// both the master and supplement scan needs to set the correct ts comp start position
if (pRuntimeEnv->pTSBuf != NULL) {
......@@ -4316,7 +4312,7 @@ static UNUSED_FUNC bool doCheckWithPrevQueryRange(SQuery *pQuery, TSKEY nextKey)
return true;
}
static UNUSED_FUNC void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
......@@ -4389,41 +4385,46 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
return et - st;
}
static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool *dataInCache, int32_t index,
int32_t start) {
// STableIdInfo **pMeterSidExtInfo = pQInfo->pMeterSidExtInfo;
// SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
// SQuery * pQuery = pRuntimeEnv->pQuery;
#if 0
static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
SPair* p = taosArrayGet(group, index);
SMeterObj *pMeterObj = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[index]->sid);
if (pMeterObj == NULL) {
dError("QInfo:%p do not find required meter id: %d, all meterObjs id is:", pQInfo, pMeterSidExtInfo[index]->sid);
return false;
}
STable* pTable = p->first;
STableDataInfo* pInfo = p->sec;
vnodeSetTagValueInParam(pSupporter->pSidSet, pRuntimeEnv, pMeterSidExtInfo[index]);
setTagVal(pRuntimeEnv, pTable->tableId, pQInfo->tsdb);
dTrace("QInfo:%p query on (%d): vid:%d sid:%d meterId:%s, qrange:%" PRId64 "-%" PRId64, pQInfo, index - start,
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->skey, pQuery->ekey);
dTrace("QInfo:%p query on (%d): uid:%" PRIu64 ", tid:%d, qrange:%" PRId64 "-%" PRId64, pQInfo, index,
pTable->tableId.uid, pInfo->pTableQInfo->lastKey, pInfo->pTableQInfo->win.ekey);
pQInfo->pObj = pMeterObj;
pQuery->lastKey = pQuery->skey;
pRuntimeEnv->pMeterObj = pMeterObj;
vnodeUpdateQueryColumnIndex(pQuery, pRuntimeEnv->pMeterObj);
vnodeUpdateFilterColumnIndex(pQuery);
vnodeCheckIfDataExists(pRuntimeEnv, pMeterObj, dataInDisk, dataInCache);
STsdbQueryCond cond = {
.twindow = {pInfo->pTableQInfo->lastKey, pInfo->pTableQInfo->win.ekey},
.order = pQuery->order.order,
.colList = pQuery->colList,
};
// data in file or cache is not qualified for the query. abort
if (!(dataInCache || dataInDisk)) {
dTrace("QInfo:%p vid:%d sid:%d meterId:%s, qrange:%" PRId64 "-%" PRId64 ", nores, %p", pQInfo, pMeterObj->vnode,
pMeterObj->sid, pMeterObj->meterId, pQuery->skey, pQuery->ekey, pQuery);
return false;
SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0]));
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
taosArrayPush(cols, &pQuery->colList[i]);
}
SArray* g1 = taosArrayInit(1, POINTER_BYTES);
STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1};
SArray* tx = taosArrayInit(1, sizeof(SPair));
taosArrayPush(tx, p);
taosArrayPush(g1, &tx);
// include only current table
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, cols);
// vnodeUpdateQueryColumnIndex(pQuery, pRuntimeEnv->pMeterObj);
// vnodeUpdateFilterColumnIndex(pQuery);
if (pRuntimeEnv->pTSBuf != NULL) {
if (pRuntimeEnv->cur.vnodeIndex == -1) {
int64_t tag = pRuntimeEnv->pCtx[0].tag.i64Key;
......@@ -4438,9 +4439,7 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool *
}
}
#endif
// initCtxOutputBuf(pRuntimeEnv);
initCtxOutputBuf(pRuntimeEnv);
return true;
}
......@@ -4448,9 +4447,7 @@ static UNUSED_FUNC int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, i
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
bool dataInDisk = true;
bool dataInCache = true;
if (!multimeterMultioutputHelper(pQInfo, &dataInDisk, &dataInCache, index, start)) {
if (!multiTableMultioutputHelper(pQInfo, index)) {
return 0;
}
......@@ -4494,61 +4491,47 @@ static UNUSED_FUNC int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, i
*
* @param pQInfo
*/
static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
static void sequentialTableProcess(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
setQueryStatus(pQuery, QUERY_COMPLETED);
#if 0
// STableGroupInfo *pTableIdList = pSupporter->pSidSet;
int32_t vid = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid)->vnode;
size_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
if (isPointInterpoQuery(pQuery)) {
resetCtxOutputBuf(pRuntimeEnv);
assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0);
while (pSupporter->subgroupIdx < pTableIdList->numOfSubSet) {
int32_t start = pTableIdList->starterPos[pSupporter->subgroupIdx];
int32_t end = pTableIdList->starterPos[pSupporter->subgroupIdx + 1] - 1;
#if 0
while (pQInfo->subgroupIdx < numOfGroups) {
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->subgroupIdx);
size_t numOfTable = taosArrayGetSize(group);
if (isFirstLastRowQuery(pQuery)) {
dTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet,
pSupporter->subgroupIdx);
pQInfo->subgroupIdx);
TSKEY key = -1;
int32_t index = -1;
// choose the last key for one group
pSupporter->meterIdx = start;
pQInfo->tableIndex = 0;
for (int32_t k = start; k <= end; ++k, pSupporter->meterIdx++) {
for (int32_t k = 0; k < numOfTable; ++k, pQInfo->tableIndex++) {
if (isQueryKilled(pQInfo)) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return;
}
// get the last key of meters that belongs to this group
SMeterObj *pMeterObj = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[k]->sid);
if (pMeterObj != NULL) {
if (key < pMeterObj->lastKey) {
key = pMeterObj->lastKey;
index = k;
}
}
}
pQuery->skey = key;
pQuery->ekey = key;
pSupporter->rawSKey = key;
pSupporter->rawEKey = key;
pQuery->window.skey = key;
pQuery->window.ekey = key;
int64_t num = doCheckMetersInGroup(pQInfo, index, start);
assert(num >= 0);
// int64_t num = doCheckMetersInGroup(pQInfo, index, start);
// assert(num >= 0);
} else {
dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet,
pSupporter->subgroupIdx);
pQInfo->subgroupIdx);
for (int32_t k = start; k <= end; ++k) {
if (isQueryKilled(pQInfo)) {
......@@ -4573,129 +4556,110 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
break;
}
}
#endif
} else {
/*
* 1. super table projection query, 2. group-by on normal columns query, 3. ts-comp query
*/
assert(pSupporter->meterIdx >= 0);
createTableDataInfo(pQInfo);
/*
* 1. super table projection query, 2. group-by on normal columns query, 3. ts-comp query
* if the subgroup index is larger than 0, results generated by group by tbname,k is existed.
* we need to return it to client in the first place.
*/
if (pSupporter->subgroupIdx > 0) {
if (pQInfo->subgroupIdx > 0) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
pQInfo->size += pQuery->size;
pQuery->rec.total += pQuery->rec.rows;
if (pQuery->size > 0) {
if (pQuery->rec.rows > 0) {
return;
}
}
if (pSupporter->meterIdx >= pTableIdList->numOfTables) {
// all data have returned already
if (pQInfo->tableIndex >= pQInfo->groupInfo.numOfTables) {
return;
}
resetCtxOutputBuf(pRuntimeEnv);
resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo);
while (pSupporter->meterIdx < pSupporter->numOfTables) {
int32_t k = pSupporter->meterIdx;
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
assert(taosArrayGetSize(group) == pQInfo->groupInfo.numOfTables && 1 == taosArrayGetSize(pQInfo->groupInfo.pGroupList));
while (pQInfo->tableIndex < pQInfo->groupInfo.numOfTables) {
int32_t k = pQInfo->tableIndex;
if (isQueryKilled(pQInfo)) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return;
}
TSKEY skey = pQInfo->pTableQuerySupporter->pMeterSidExtInfo[k]->key;
SPair *p = taosArrayGet(group, k);
STableDataInfo* pInfo = p->sec;
TSKEY skey = pInfo->pTableQInfo->lastKey;
if (skey > 0) {
pQuery->skey = skey;
pQuery->window.skey = skey;
}
bool dataInDisk = true;
bool dataInCache = true;
if (!multimeterMultioutputHelper(pQInfo, &dataInDisk, &dataInCache, k, 0)) {
pQuery->skey = pSupporter->rawSKey;
pQuery->ekey = pSupporter->rawEKey;
pSupporter->meterIdx++;
if (!multiTableMultioutputHelper(pQInfo, k)) {
pQInfo->tableIndex++;
continue;
}
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
for (int32_t i = 0; i < pRuntimeEnv->numOfFiles; ++i) {
resetMMapWindow(&pRuntimeEnv->pVnodeFiles[i]);
}
#endif
SPointInterpoSupporter pointInterpSupporter = {0};
assert(0);
// if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter, NULL) == false) {
// pQuery->skey = pSupporter->rawSKey;
// pQuery->ekey = pSupporter->rawEKey;
//
// pSupporter->meterIdx++;
// continue;
// }
// SPointInterpoSupporter pointInterpSupporter = {0};
// TODO handle the limit problem
if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) {
forwardQueryStartPosition(pRuntimeEnv);
// forwardQueryStartPosition(pRuntimeEnv);
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) {
pQuery->skey = pSupporter->rawSKey;
pQuery->ekey = pSupporter->rawEKey;
pSupporter->meterIdx++;
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
pQInfo->tableIndex++;
continue;
}
}
scanAllDataBlocks(pRuntimeEnv);
pQuery->size = getNumOfResult(pRuntimeEnv);
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
doSkipResults(pRuntimeEnv);
// the limitation of output result is reached, set the query completed
if (doRevisedResultsByLimit(pQInfo)) {
pSupporter->meterIdx = pSupporter->pSidSet->numOfTables;
pQInfo->tableIndex = pQInfo->groupInfo.numOfTables;
break;
}
// enable execution for next table, when handling the projection query
enableExecutionForNextTable(pRuntimeEnv);
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) {
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
/*
* query range is identical in terms of all meters involved in query,
* so we need to restore them at the *beginning* of query on each meter,
* not the consecutive query on meter on which is aborted due to buffer limitation
* to ensure that, we can reset the query range once query on a meter is completed.
*/
pQuery->skey = pSupporter->rawSKey;
pQuery->ekey = pSupporter->rawEKey;
pSupporter->meterIdx++;
pQInfo->pTableQuerySupporter->pMeterSidExtInfo[k]->key = pQuery->lastKey;
pQInfo->tableIndex++;
pInfo->pTableQInfo->lastKey = pQuery->lastKey;
// if the buffer is full or group by each table, we need to jump out of the loop
if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL) ||
isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)) {
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL) /*||
isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)*/) {
break;
}
} else { // forward query range
pQuery->skey = pQuery->lastKey;
pQuery->window.skey = pQuery->lastKey;
// all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter
if (pQuery->size == 0) {
assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL));
if (pQuery->rec.rows == 0) {
assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
continue;
} else {
pQInfo->pTableQuerySupporter->pMeterSidExtInfo[k]->key = pQuery->lastKey;
// buffer is full, wait for the next round to retrieve data from current meter
assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL));
break;
// pQInfo->pTableQuerySupporter->pMeterSidExtInfo[k]->key = pQuery->lastKey;
// // buffer is full, wait for the next round to retrieve data from current meter
// assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL));
// break;
}
}
}
......@@ -4734,20 +4698,16 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
}
}
pQInfo->pTableQuerySupporter->subgroupIdx = 0;
pQuery->size = 0;
pQInfo->subgroupIdx = 0;
pQuery->rec.rows = 0;
copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult);
}
pQInfo->size += pQuery->size;
pQuery->pointsOffset = pQuery->pointsToRead;
pQuery->rec.total += pQuery->rec.rows;
dTrace(
"QInfo %p vid:%d, numOfTables:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d,"
"next skey:%" PRId64 ", offset:%" PRId64,
pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->size, pQInfo->size,
pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset);
#endif
dTrace( "QInfo %p, numOfTables:%d, index:%d, numOfGroups:%d, %d points returned, total:%d totalReturn:%d,"
" offset:%" PRId64, pQInfo, pQInfo->groupInfo.numOfTables, pQInfo->tableIndex, numOfGroups,
pQuery->rec.rows, pQuery->rec.total, pQuery->limit.offset);
}
static void createTableDataInfo(SQInfo* pQInfo) {
......@@ -4755,6 +4715,7 @@ static void createTableDataInfo(SQInfo* pQInfo) {
// todo make sure the table are added the reference count to gauranteed that all involved tables are valid
size_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
int32_t index = 0;
for (int32_t i = 0; i < numOfGroups; ++i) { // load all meter meta info
SArray *group = *(SArray**) taosArrayGet(pQInfo->groupInfo.pGroupList, i);
......@@ -4762,12 +4723,19 @@ static void createTableDataInfo(SQInfo* pQInfo) {
size_t s = taosArrayGetSize(group);
for(int32_t j = 0; j < s; ++j) {
SPair* p = (SPair*) taosArrayGet(group, j);
// STableDataInfo has been created for each table
if (p->sec != NULL) { // todo refactor
return;
}
STableDataInfo* pInfo = calloc(1, sizeof(STableDataInfo));
setTableDataInfo(pInfo, index, i);
pInfo->pTableQInfo = createTableQueryInfo(&pQInfo->runtimeEnv, ((STable*)(p->first))->tableId.tid, pQuery->window);
p->sec = pInfo;
index += 1;
}
}
......@@ -5182,7 +5150,7 @@ static void stableQueryImpl(SQInfo* pQInfo) {
assert((pQuery->checkBuffer == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) ||
isGroupbyNormalCol(pQuery->pGroupbyExpr));
vnodeSTableSeqProcessor(pQInfo);
sequentialTableProcess(pQInfo);
}
// record the total elapsed time
......
......@@ -225,18 +225,27 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
}
}
int32_t tsdbGetTableTagVal(tsdb_repo_t* repo, STableId id, int32_t col, int16_t* type, int16_t* bytes, char** val) {
int32_t tsdbGetTableTagVal(tsdb_repo_t* repo, STableId id, int32_t colId, int16_t* type, int16_t* bytes, char** val) {
STsdbMeta* pMeta = tsdbGetMeta(repo);
STable* pTable = tsdbGetTableByUid(pMeta, id.uid);
STSchema* pSchema = tsdbGetTableTagSchema(pMeta, pTable);
STColumn* pCol = schemaColAt(pSchema, col);
STColumn* pCol = NULL;
for(int32_t col = 0; col < schemaNCols(pSchema); ++col) {
STColumn* p = schemaColAt(pSchema, col);
if (p->colId == colId) {
pCol = p;
}
}
assert(pCol != NULL);
SDataRow row = (SDataRow)pTable->tagVal;
char* d = dataRowAt(row, TD_DATA_ROW_HEAD_SIZE);
*val = d;
*type = pCol->type;
*type = pCol->type;
*bytes = pCol->bytes;
return 0;
......
......@@ -98,31 +98,23 @@ typedef struct SBlockOrderSupporter {
typedef struct STsdbQueryHandle {
STsdbRepo* pTsdb;
SQueryFilePos cur; // current position
SQueryFilePos start; // the start position, used for secondary/third iteration
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
int16_t numOfRowsPerPage;
uint16_t flag; // denotes reversed scan of data or not
int16_t order;
STimeWindow window; // the primary query time window that applies to all queries
int32_t blockBufferSize;
SCompBlock* pBlock;
int32_t numOfBlocks;
SField** pFields;
SArray* pColumns; // column list, SColumnInfoData array list
bool locateStart;
int32_t realNumOfRows;
bool loadDataAfterSeek; // load data after seek.
SArray* pTableCheckInfo;
int32_t activeIndex;
bool checkFiles; // check file stage
int32_t tableIndex;
bool isFirstSlot;
void* qinfo; // query info handle, for debug purpose
bool checkFiles; // check file stage
void* qinfo; // query info handle, for debug purpose
STableBlockInfo* pDataBlockInfo;
SFileGroup* pFileGroup;
......@@ -154,8 +146,6 @@ tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, S
pQueryHandle->compIndex = calloc(10000, sizeof(SCompIdx));
tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
pQueryHandle->loadDataAfterSeek = false;
pQueryHandle->isFirstSlot = true;
pQueryHandle->cur.fid = -1;
size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册