提交 f48a1f1f 编写于 作者: C Cary Xu

[TS-493-M]<enhance>: postpone cache last to query time

上级 fc8b47ea
......@@ -41,6 +41,7 @@ typedef struct STable {
int16_t restoreColumnNum;
bool hasRestoreLastColumn;
int lastColSVersion;
int16_t cacheLastConfigVersion;
T_REF_DECLARE()
} STable;
......
......@@ -79,8 +79,8 @@ struct STsdbRepo {
STsdbCfg save_config; // save apply config
bool config_changed; // config changed flag
pthread_mutex_t save_mutex; // protect save config
uint8_t hasCachedLastColumn;
int16_t cacheLastConfigVersion;
STsdbAppH appH;
STsdbStat stat;
......@@ -110,7 +110,8 @@ int tsdbUnlockRepo(STsdbRepo* pRepo);
STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo);
int tsdbRestoreInfo(STsdbRepo* pRepo);
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg);
UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg);
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable);
void tsdbGetRootDir(int repoid, char dirName[]);
void tsdbGetDataDir(int repoid, char dirName[]);
......
......@@ -146,7 +146,9 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
if (oldCfg.cacheLastRow != pRepo->config.cacheLastRow) {
if (tsdbLockRepo(pRepo) < 0) return;
tsdbCacheLastData(pRepo, &oldCfg);
// tsdbCacheLastData(pRepo, &oldCfg);
// lazy load last cache when query or update
pRepo->cacheLastConfigVersion += 1;
tsdbUnlockRepo(pRepo);
}
......
......@@ -562,7 +562,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
return NULL;
}
pRepo->config_changed = false;
atomic_store_8(&pRepo->hasCachedLastColumn, 0);
pRepo->cacheLastConfigVersion = 0;
code = tsem_init(&(pRepo->readyToCommit), 0, 1);
if (code != 0) {
......@@ -788,21 +788,30 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh,
// Get the data in row
STSchema *pSchema = tsdbGetTableSchema(pTable);
pTable->lastRow = taosTMalloc(memRowMaxBytesFromSchema(pSchema));
if (pTable->lastRow == NULL) {
SMemRow lastRow = taosTMalloc(memRowMaxBytesFromSchema(pSchema));
if (lastRow == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
memRowSetType(pTable->lastRow, SMEM_ROW_DATA);
tdInitDataRow(memRowDataBody(pTable->lastRow), pSchema);
memRowSetType(lastRow, SMEM_ROW_DATA);
tdInitDataRow(memRowDataBody(lastRow), pSchema);
for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
STColumn *pCol = schemaColAt(pSchema, icol);
SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol;
tdAppendColVal(memRowDataBody(pTable->lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type,
tdAppendColVal(memRowDataBody(lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type,
pCol->offset);
}
TSKEY lastKey = memRowKey(lastRow);
pTable->lastKey = memRowKey(pTable->lastRow);
// during the load data in file, new data would be inserted and last row has been updated
if (tsdbGetTableLastKeyImpl(pTable) <= lastKey) {
TSDB_WLOCK_TABLE(pTable);
pTable->lastRow = lastRow;
pTable->lastKey = lastKey;
TSDB_WUNLOCK_TABLE(pTable);
} else {
taosTZfree(lastRow);
}
return 0;
}
......@@ -874,14 +883,99 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
tsdbDestroyReadH(&readh);
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
atomic_store_8(&pRepo->hasCachedLastColumn, 1);
// if (CACHE_LAST_NULL_COLUMN(pCfg)) {
// atomic_store_8(&pRepo->hasCachedLastColumn, 1);
// }
return 0;
}
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable) {
bool cacheLastRow = CACHE_LAST_ROW(&(pRepo->config));
bool cacheLastCol = CACHE_LAST_NULL_COLUMN(&(pRepo->config));
SFSIter fsiter;
SReadH readh;
SDFileSet *pSet;
int cacheLastRowTableNum = 0;
int cacheLastColTableNum = 0;
tsdbDebug("tsdbLoadLastCache for %s, cacheLastRow:%d, cacheLastCol:%d", pTable->name->data, cacheLastRow, cacheLastCol);
pTable->cacheLastConfigVersion = pRepo->cacheLastConfigVersion;
if (!cacheLastRow && pTable->lastRow != NULL) {
taosTZfree(pTable->lastRow);
pTable->lastRow = NULL;
}
if (!cacheLastCol && pTable->lastCols != NULL) {
tsdbFreeLastColumns(pTable);
}
if (!cacheLastRow && !cacheLastCol) {
return 0;
}
cacheLastRowTableNum = (cacheLastRow && pTable->lastRow == NULL) ? 1 : 0;
cacheLastColTableNum = (cacheLastCol && pTable->lastCols == NULL) ? 1 : 0;
if (cacheLastRowTableNum == 0 && cacheLastColTableNum == 0) {
return 0;
}
if (tsdbInitReadH(&readh, pRepo) < 0) {
return -1;
}
tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD);
while ((pSet = tsdbFSIterNext(&fsiter)) != NULL && (cacheLastRowTableNum > 0 || cacheLastColTableNum > 0)) {
if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) {
tsdbDestroyReadH(&readh);
return -1;
}
if (tsdbLoadBlockIdx(&readh) < 0) {
tsdbDestroyReadH(&readh);
return -1;
}
//tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data);
if (tsdbSetReadTable(&readh, pTable) < 0) {
tsdbDestroyReadH(&readh);
return -1;
}
SBlockIdx *pIdx = readh.pBlkIdx;
if (cacheLastRow && pIdx && pTable->lastRow == NULL && cacheLastRowTableNum > 0) {
pTable->lastKey = pIdx->maxKey;
if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) {
tsdbDestroyReadH(&readh);
return -1;
}
cacheLastRowTableNum -= 1;
}
// restore NULL columns
if (pIdx && cacheLastColTableNum > 0 && !pTable->hasRestoreLastColumn) {
if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) {
tsdbDestroyReadH(&readh);
return -1;
}
if (pTable->hasRestoreLastColumn) {
cacheLastColTableNum -= 1;
}
}
}
tsdbDestroyReadH(&readh);
return 0;
}
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
bool cacheLastRow = false, cacheLastCol = false;
SFSIter fsiter;
SReadH readh;
......@@ -915,9 +1009,9 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
// if close last option,need to free data
if (need_free_last_row || need_free_last_col) {
if (need_free_last_col) {
atomic_store_8(&pRepo->hasCachedLastColumn, 0);
}
// if (need_free_last_col) {
// atomic_store_8(&pRepo->hasCachedLastColumn, 0);
// }
tsdbInfo("free cache last data since cacheLast option changed");
for (int i = 1; i <= maxTableIdx; i++) {
STable *pTable = pMeta->tables[i];
......@@ -995,9 +1089,9 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
tsdbDestroyReadH(&readh);
if (cacheLastCol) {
atomic_store_8(&pRepo->hasCachedLastColumn, 1);
}
// if (cacheLastCol) {
// atomic_store_8(&pRepo->hasCachedLastColumn, 1);
// }
return 0;
}
......@@ -1058,5 +1058,8 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r
updateTableLatestColumn(pRepo, pTable, row);
}
}
pTable->cacheLastConfigVersion = pRepo->cacheLastConfigVersion;
return 0;
}
......@@ -797,6 +797,7 @@ static STable *tsdbNewTable() {
pTable->lastCols = NULL;
pTable->restoreColumnNum = 0;
pTable->cacheLastConfigVersion = 0;
pTable->maxColNum = 0;
pTable->hasRestoreLastColumn = false;
pTable->lastColSVersion = -1;
......
......@@ -156,6 +156,7 @@ typedef struct STableGroupSupporter {
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList);
static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle);
static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle);
static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey);
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
......@@ -602,6 +603,8 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
return NULL;
}
lazyLoadCacheLast(pQueryHandle);
int32_t code = checkForCachedLastRow(pQueryHandle, groupList);
if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
terrno = code;
......@@ -616,6 +619,29 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
return pQueryHandle;
}
static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle) {
STsdbRepo* pRepo = pQueryHandle->pTsdb;
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
size_t i = 0;
int32_t code = 0;
for (i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
STable* pTable = pCheckInfo->pTableObj;
if (pTable->cacheLastConfigVersion == pRepo->cacheLastConfigVersion) {
continue;
}
if (tsdbLockRepo(pRepo) < 0) return -1;
code = tsdbLoadLastCache(pRepo, pTable);
if (tsdbUnlockRepo(pRepo) != 0) return -1;
if (code != 0) {
break;
}
}
return code;
}
TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
......@@ -623,6 +649,8 @@ TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STab
return NULL;
}
lazyLoadCacheLast(pQueryHandle);
int32_t code = checkForCachedLast(pQueryHandle);
if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
terrno = code;
......@@ -3198,7 +3226,9 @@ int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle) {
int32_t code = 0;
if (pQueryHandle->pTsdb && atomic_load_8(&pQueryHandle->pTsdb->hasCachedLastColumn)){
STsdbRepo* pRepo = pQueryHandle->pTsdb;
if (pRepo && CACHE_LAST_NULL_COLUMN(&(pRepo->config))) {
pQueryHandle->cachelastrow = TSDB_CACHED_TYPE_LAST;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册