提交 cc50dcec 编写于 作者: D dapan1121

update

上级 46055438
......@@ -62,12 +62,6 @@ typedef struct SLoadCompBlockInfo {
int32_t fileId;
} SLoadCompBlockInfo;
typedef struct SCacheLastColInfo {
int16_t size;
int16_t num;
int16_t fetchIdx;
int16_t *idx;
} SCacheLastColInfo;
typedef struct STableCheckInfo {
STableId tableId;
......@@ -75,7 +69,6 @@ typedef struct STableCheckInfo {
STable* pTableObj;
SBlockInfo* pCompInfo;
int32_t compSize;
SCacheLastColInfo cacheLast; // cache last column chosen
int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks
int8_t chosen:2; // indicate which iterator should move forward
bool initBuf; // whether to initialize the in-memory skip list iterator or not
......@@ -101,6 +94,12 @@ typedef struct SIOCostSummary {
int64_t checkForNextTime;
} SIOCostSummary;
typedef struct SCacheLastColInfo {
int16_t i;
int16_t j;
} SCacheLastColInfo;
typedef struct STsdbQueryHandle {
STsdbRepo* pTsdb;
SQueryFilePos cur; // current position
......@@ -126,6 +125,7 @@ typedef struct STsdbQueryHandle {
SReadH rhelper;
STableBlockInfo* pDataBlockInfo;
SCacheLastColInfo lastCols;
SDataCols *pDataCols; // in order to hold current file data block
int32_t allocSize; // allocated data block size
SMemRef *pMemRef;
......@@ -546,98 +546,16 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
}
STimeWindow updateCacheLastForEachGroup(STableGroupInfo *groupList) {
STimeWindow window = {INT64_MAX, INT64_MIN};
int32_t totalNumOfTable = 0;
// NOTE: starts from the buffer in case of descending timestamp order check data blocks
size_t numOfGroups = taosArrayGetSize(groupList->pGroupList);
for(int32_t j = 0; j < numOfGroups; ++j) {
SArray* pGroup = taosArrayGetP(groupList->pGroupList, j);
TSKEY key = TSKEY_INITIAL_VAL;
STableKeyInfo keyInfo = {0};
size_t numOfTables = taosArrayGetSize(pGroup);
for(int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pInfo = (STableKeyInfo*) taosArrayGet(pGroup, i);
// if the lastKey equals to INT64_MIN, there is no data in this table
TSKEY lastKey = ((STable*)(pInfo->pTable))->lastKey;
if (key < lastKey) {
key = lastKey;
keyInfo.pTable = pInfo->pTable;
keyInfo.lastKey = key;
pInfo->lastKey = key;
if (key < window.skey) {
window.skey = key;
}
if (key > window.ekey) {
window.ekey = key;
}
}
}
// clear current group, unref unused table
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
// keyInfo.pTable may be NULL here.
if (pInfo->pTable != keyInfo.pTable) {
tsdbUnRefTable(pInfo->pTable);
}
}
taosArrayClear(pGroup);
// more than one table in each group, only one table left for each group
if (keyInfo.pTable != NULL) {
totalNumOfTable++;
taosArrayPush(pGroup, &keyInfo);
} else {
taosArrayDestroy(pGroup);
taosArrayRemove(groupList->pGroupList, j);
numOfGroups -= 1;
j -= 1;
}
}
// window does not being updated, so set the original
if (window.skey == INT64_MAX && window.ekey == INT64_MIN) {
window = TSWINDOW_INITIALIZER;
assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == 0);
}
groupList->numOfTables = totalNumOfTable;
return window;
}
TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) {
pCond->twindow = updateCacheLastForEachGroup(groupList);
// no qualified table
if (groupList->numOfTables == 0) {
return NULL;
}
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
int32_t code = checkForCachedLastRow(pQueryHandle, groupList);
int32_t code = checkForCachedLast(pQueryHandle, groupList);
if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
terrno = code;
return NULL;
}
assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey);
if (pQueryHandle->cachelastrow == 2) {
pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
}
pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
return pQueryHandle;
}
......@@ -2572,6 +2490,115 @@ static bool loadCachedLastRow(STsdbQueryHandle* pQueryHandle) {
}
static int32_t copyColsFromCacheMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, STable* pTable) {
char* pData = NULL;
STSchema* pSchema = tsdbGetTableSchema(pTable);
int32_t numOfCols = schemaNCols(pSchema);
int32_t tgNumOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
assert(numOfCols == pTable->restoreColumnNum);
assert(pTable->lastCols != NULL);
int32_t i = pQueryHandle->lastCols.i, j = pQueryHandle->lastCols.j;
while(i < tgNumOfCols && j < numOfCols) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (pTable->lastCols[j].colId < pColInfo->info.colId) {
j++;
continue;
} else if (pTable->lastCols[j].colId > pColInfo->info.colId) {
i++;
continue;
}
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
}
if (pTable->lastCols[j].bytes > 0) {
void* value = pTable->lastCols[j].pData;
switch (pColInfo->info.type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
memcpy(pData, value, varDataTLen(value));
break;
case TSDB_DATA_TYPE_NULL:
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
*(uint8_t *)pData = *(uint8_t *)value;
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
*(uint16_t *)pData = *(uint16_t *)value;
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
*(uint32_t *)pData = *(uint32_t *)value;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
*(uint64_t *)pData = *(uint64_t *)value;
break;
case TSDB_DATA_TYPE_FLOAT:
SET_FLOAT_PTR(pData, value);
break;
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_PTR(pData, value);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
*(TSKEY *)pData = tdGetKey(*(TKEY *)value);
} else {
*(TSKEY *)pData = *(TSKEY *)value;
}
break;
default:
memcpy(pData, value, pColInfo->info.bytes);
}
for (int32_t n = 0; n < tgNumOfCols; ++n) {
if (n == i) {
continue;
}
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
}
}
++i;
++j;
if (i >= tgNumOfCols || j >= numOfCols) {
pQueryHandle->lastCols.i = 0;
pQueryHandle->lastCols.j = 0;
pQueryHandle->activeIndex++;
} else {
pQueryHandle->lastCols.i = i;
pQueryHandle->lastCols.j = j;
}
return 1;
}
i++;
j++;
}
pQueryHandle->lastCols.i = 0;
pQueryHandle->lastCols.j = 0;
pQueryHandle->activeIndex++;
return 0;
}
static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) {
// the last row is cached in buffer, return it directly.
// here note that the pQueryHandle->window must be the TS_INITIALIZER
......@@ -2581,42 +2608,15 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) {
SQueryFilePos* cur = &pQueryHandle->cur;
SDataRow pRow = NULL;
TSKEY key = TSKEY_INITIAL_VAL;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
if (pQueryHandle->activeIndex < 0) {
updateCacheLastForEachGroup(pQueryHandle);
}
if (pQueryHandle->activeIndex < numOfTables) {
while (pQueryHandle->activeIndex < numOfTables) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
if (pQueryHandle->cachelastrow == 1) {
int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
if (ret != TSDB_CODE_SUCCESS) {
return false;
}
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL);
tfree(pRow);
// update the last key value
pCheckInfo->lastKey = key + step;
cur->rows = 1; // only one row
cur->lastKey = key + step;
cur->mixBlock = true;
cur->win.skey = key;
cur->win.ekey = key;
} else if (pQueryHandle->cachelastrow == 2) {
} else {
tsdbError("invalid cachelastrow:%d", pQueryHandle->cachelastrow);
return false;
if (copyColsFromCacheMem(pQueryHandle, pQueryHandle->outputCapacity, 0, numOfCols, pCheckInfo->pTableObj, NULL)) {
return true;
}
return true;
}
return false;
......@@ -2662,7 +2662,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT pHandle) {
if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST) {
if (pQueryHandle->cachelastrow == 1) {
return loadCachedLastRow(pQueryHandle);
} if (pQueryHandle->cachelastrow == 2) else {
} else if (pQueryHandle->cachelastrow == 2) {
return loadCachedLast(pQueryHandle);
}
}
......@@ -2875,17 +2875,14 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g
int32_t code = 0;
if (((STable*)pInfo->pTable)->lastRow == 1) {
if (((STable*)pInfo->pTable)->lastRow) {
code = tsdbGetCachedLastRow(pInfo->pTable, &pRow, &key);
if (code != TSDB_CODE_SUCCESS) {
pQueryHandle->cachelastrow = 0;
} else {
pQueryHandle->cachelastrow = ((STable*)pInfo->pTable)->lastRow;
pQueryHandle->cachelastrow = 1;
}
} else if (((STable*)pInfo->pTable)->lastCols && ((STable*)pInfo->pTable)->lastColNum > 0 && ((STable*)pInfo->pTable)->lastRow == 2){
pQueryHandle->cachelastrow = ((STable*)pInfo->pTable)->lastRow;
}
// update the tsdb query time range
if (pQueryHandle->cachelastrow) {
......@@ -2898,6 +2895,36 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g
return code;
}
int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList) {
assert(pQueryHandle != NULL && groupList != NULL);
SDataRow pRow = NULL;
TSKEY key = TSKEY_INITIAL_VAL;
SArray* group = taosArrayGetP(groupList->pGroupList, 0);
assert(group != NULL);
STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0);
int32_t code = 0;
if (((STable*)pInfo->pTable)->lastCols && ((STable*)pInfo->pTable)->lastColNum > 0){
pQueryHandle->cachelastrow = 2;
}
// update the tsdb query time range
if (pQueryHandle->cachelastrow) {
pQueryHandle->window = TSWINDOW_INITIALIZER;
pQueryHandle->checkFiles = false;
pQueryHandle->activeIndex = 0; // start from -1
}
tfree(pRow);
return code;
}
STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
STimeWindow window = {INT64_MAX, INT64_MIN};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册