提交 e2f11026 编写于 作者: D dapan1121

init

上级 6f1b157b
...@@ -1979,6 +1979,37 @@ static bool isFirstLastRowQuery(SQueryAttr *pQueryAttr) { ...@@ -1979,6 +1979,37 @@ static bool isFirstLastRowQuery(SQueryAttr *pQueryAttr) {
return false; return false;
} }
static bool isCachedLastQuery(SQueryAttr *pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionID = pQueryAttr->pExpr1[i].base.functionId;
if (functionID == TSDB_FUNC_LAST || functionID == TSDB_FUNC_LAST_DST) {
continue;
}
return false;
}
if (!TSWINDOW_IS_EQUAL(pQueryAttr->window, TSWINDOW_INITIALIZER)) {
return false;
}
if (pQueryAttr->groupbyColumn) {
return false;
}
if (pQueryAttr->interval.interval > 0) {
return false;
}
if (pQueryAttr->numOfFilterCols > 0 || pQueryAttr->havingNum > 0) {
return false;
}
return true;
}
/** /**
* The following 4 kinds of query are treated as the tags query * The following 4 kinds of query are treated as the tags query
* tagprj, tid_tag query, count(tbname), 'abc' (user defined constant value column) query * tagprj, tid_tag query, count(tbname), 'abc' (user defined constant value column) query
...@@ -3963,6 +3994,8 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64 ...@@ -3963,6 +3994,8 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64
} }
} }
} }
} else if (isCachedLastQuery(pQueryAttr)) {
pRuntimeEnv->pQueryHandle = tsdbQueryCacheLast(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef);
} else if (pQueryAttr->pointInterpQuery) { } else if (pQueryAttr->pointInterpQuery) {
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef);
} else { } else {
......
...@@ -62,12 +62,20 @@ typedef struct SLoadCompBlockInfo { ...@@ -62,12 +62,20 @@ typedef struct SLoadCompBlockInfo {
int32_t fileId; int32_t fileId;
} SLoadCompBlockInfo; } SLoadCompBlockInfo;
typedef struct SCacheLastColInfo {
int16_t size;
int16_t num;
int16_t fetchIdx;
int16_t *idx;
} SCacheLastColInfo;
typedef struct STableCheckInfo { typedef struct STableCheckInfo {
STableId tableId; STableId tableId;
TSKEY lastKey; TSKEY lastKey;
STable* pTableObj; STable* pTableObj;
SBlockInfo* pCompInfo; SBlockInfo* pCompInfo;
int32_t compSize; int32_t compSize;
SCacheLastColInfo cacheLast; // cache last column chosen
int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks
int8_t chosen:2; // indicate which iterator should move forward int8_t chosen:2; // indicate which iterator should move forward
bool initBuf; // whether to initialize the in-memory skip list iterator or not bool initBuf; // whether to initialize the in-memory skip list iterator or not
...@@ -107,7 +115,7 @@ typedef struct STsdbQueryHandle { ...@@ -107,7 +115,7 @@ typedef struct STsdbQueryHandle {
SArray* pTableCheckInfo; // SArray<STableCheckInfo> SArray* pTableCheckInfo; // SArray<STableCheckInfo>
int32_t activeIndex; int32_t activeIndex;
bool checkFiles; // check file stage bool checkFiles; // check file stage
bool cachelastrow; // check if last row cached int8_t cachelastrow; // check if last row cached
bool loadExternalRow; // load time window external data rows bool loadExternalRow; // load time window external data rows
bool currentLoadExternalRows; // current load external rows bool currentLoadExternalRows; // current load external rows
int32_t loadType; // block load type int32_t loadType; // block load type
...@@ -512,6 +520,8 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon ...@@ -512,6 +520,8 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon
pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next); pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next);
} }
TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) { TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) {
pCond->twindow = updateLastrowForEachGroup(groupList); pCond->twindow = updateLastrowForEachGroup(groupList);
...@@ -528,10 +538,111 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable ...@@ -528,10 +538,111 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
} }
assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey); assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey);
pQueryHandle->type = TSDB_QUERY_TYPE_LAST; if (pQueryHandle->cachelastrow) {
pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
}
return pQueryHandle; return pQueryHandle;
} }
STimeWindow updateCacheLastForEachGroup(STableGroupInfo *groupList) {
STimeWindow window = {INT64_MAX, INT64_MIN};
int32_t totalNumOfTable = 0;
// NOTE: starts from the buffer in case of descending timestamp order check data blocks
size_t numOfGroups = taosArrayGetSize(groupList->pGroupList);
for(int32_t j = 0; j < numOfGroups; ++j) {
SArray* pGroup = taosArrayGetP(groupList->pGroupList, j);
TSKEY key = TSKEY_INITIAL_VAL;
STableKeyInfo keyInfo = {0};
size_t numOfTables = taosArrayGetSize(pGroup);
for(int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pInfo = (STableKeyInfo*) taosArrayGet(pGroup, i);
// if the lastKey equals to INT64_MIN, there is no data in this table
TSKEY lastKey = ((STable*)(pInfo->pTable))->lastKey;
if (key < lastKey) {
key = lastKey;
keyInfo.pTable = pInfo->pTable;
keyInfo.lastKey = key;
pInfo->lastKey = key;
if (key < window.skey) {
window.skey = key;
}
if (key > window.ekey) {
window.ekey = key;
}
}
}
// clear current group, unref unused table
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
// keyInfo.pTable may be NULL here.
if (pInfo->pTable != keyInfo.pTable) {
tsdbUnRefTable(pInfo->pTable);
}
}
taosArrayClear(pGroup);
// more than one table in each group, only one table left for each group
if (keyInfo.pTable != NULL) {
totalNumOfTable++;
taosArrayPush(pGroup, &keyInfo);
} else {
taosArrayDestroy(pGroup);
taosArrayRemove(groupList->pGroupList, j);
numOfGroups -= 1;
j -= 1;
}
}
// window does not being updated, so set the original
if (window.skey == INT64_MAX && window.ekey == INT64_MIN) {
window = TSWINDOW_INITIALIZER;
assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == 0);
}
groupList->numOfTables = totalNumOfTable;
return window;
}
TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) {
pCond->twindow = updateCacheLastForEachGroup(groupList);
// no qualified table
if (groupList->numOfTables == 0) {
return NULL;
}
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
int32_t code = checkForCachedLastRow(pQueryHandle, groupList);
if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
terrno = code;
return NULL;
}
assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey);
if (pQueryHandle->cachelastrow == 2) {
pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
}
return pQueryHandle;
}
SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) { SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) {
assert(pHandle != NULL); assert(pHandle != NULL);
...@@ -2460,6 +2571,58 @@ static bool loadCachedLastRow(STsdbQueryHandle* pQueryHandle) { ...@@ -2460,6 +2571,58 @@ static bool loadCachedLastRow(STsdbQueryHandle* pQueryHandle) {
return false; return false;
} }
static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) {
// the last row is cached in buffer, return it directly.
// here note that the pQueryHandle->window must be the TS_INITIALIZER
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(numOfTables > 0 && numOfCols > 0);
SQueryFilePos* cur = &pQueryHandle->cur;
SDataRow pRow = NULL;
TSKEY key = TSKEY_INITIAL_VAL;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
if (pQueryHandle->activeIndex < 0) {
updateCacheLastForEachGroup(pQueryHandle);
}
if (pQueryHandle->activeIndex < numOfTables) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
if (pQueryHandle->cachelastrow == 1) {
int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
if (ret != TSDB_CODE_SUCCESS) {
return false;
}
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL);
tfree(pRow);
// update the last key value
pCheckInfo->lastKey = key + step;
cur->rows = 1; // only one row
cur->lastKey = key + step;
cur->mixBlock = true;
cur->win.skey = key;
cur->win.ekey = key;
} else if (pQueryHandle->cachelastrow == 2) {
} else {
tsdbError("invalid cachelastrow:%d", pQueryHandle->cachelastrow);
return false;
}
return true;
}
return false;
}
static bool loadDataBlockFromTableSeq(STsdbQueryHandle* pQueryHandle) { static bool loadDataBlockFromTableSeq(STsdbQueryHandle* pQueryHandle) {
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(numOfTables > 0); assert(numOfTables > 0);
...@@ -2496,8 +2659,12 @@ bool tsdbNextDataBlock(TsdbQueryHandleT pHandle) { ...@@ -2496,8 +2659,12 @@ bool tsdbNextDataBlock(TsdbQueryHandleT pHandle) {
int64_t stime = taosGetTimestampUs(); int64_t stime = taosGetTimestampUs();
int64_t elapsedTime = stime; int64_t elapsedTime = stime;
if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST && pQueryHandle->cachelastrow) { if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST) {
return loadCachedLastRow(pQueryHandle); if (pQueryHandle->cachelastrow == 1) {
return loadCachedLastRow(pQueryHandle);
} if (pQueryHandle->cachelastrow == 2) else {
return loadCachedLast(pQueryHandle);
}
} }
if (pQueryHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) { if (pQueryHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
...@@ -2683,7 +2850,7 @@ int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) { ...@@ -2683,7 +2850,7 @@ int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) {
TSDB_RLOCK_TABLE(pTable); TSDB_RLOCK_TABLE(pTable);
*lastKey = pTable->lastKey; *lastKey = pTable->lastKey;
if ((*lastKey) != TSKEY_INITIAL_VAL && pTable->lastRow) { if ((*lastKey) != TSKEY_INITIAL_VAL && pTable->lastRow == 1) {
*pRes = tdDataRowDup(pTable->lastRow); *pRes = tdDataRowDup(pTable->lastRow);
if (*pRes == NULL) { if (*pRes == NULL) {
TSDB_RUNLOCK_TABLE(pTable); TSDB_RUNLOCK_TABLE(pTable);
...@@ -2706,12 +2873,19 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g ...@@ -2706,12 +2873,19 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g
STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0); STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0);
int32_t code = tsdbGetCachedLastRow(pInfo->pTable, &pRow, &key); int32_t code = 0;
if (code != TSDB_CODE_SUCCESS) {
pQueryHandle->cachelastrow = false; if (((STable*)pInfo->pTable)->lastRow == 1) {
} else { code = tsdbGetCachedLastRow(pInfo->pTable, &pRow, &key);
pQueryHandle->cachelastrow = (pRow != NULL); if (code != TSDB_CODE_SUCCESS) {
pQueryHandle->cachelastrow = 0;
} else {
pQueryHandle->cachelastrow = ((STable*)pInfo->pTable)->lastRow;
}
} else if (((STable*)pInfo->pTable)->lastCols && ((STable*)pInfo->pTable)->lastColNum > 0 && ((STable*)pInfo->pTable)->lastRow == 2){
pQueryHandle->cachelastrow = ((STable*)pInfo->pTable)->lastRow;
} }
// update the tsdb query time range // update the tsdb query time range
if (pQueryHandle->cachelastrow) { if (pQueryHandle->cachelastrow) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册