提交 1062357e 编写于 作者: L lichuang

[TD-4034]when cacheLast option changed,restore or free cache last data

上级 fbb16057
...@@ -38,8 +38,9 @@ typedef struct STable { ...@@ -38,8 +38,9 @@ typedef struct STable {
SRWLatch latch; // TODO: implementa latch functions SRWLatch latch; // TODO: implementa latch functions
SDataCol *lastCols; SDataCol *lastCols;
int16_t lastColNum; int16_t maxColNum;
int16_t maxColumnNum; int16_t restoreColumnNum;
bool hasRestoreLastColumn;
int lastColSVersion; int lastColSVersion;
T_REF_DECLARE() T_REF_DECLARE()
} STable; } STable;
...@@ -90,6 +91,7 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema); ...@@ -90,6 +91,7 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema);
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId); int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId);
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema); int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema);
STSchema* tsdbGetTableLatestSchema(STable *pTable); STSchema* tsdbGetTableLatestSchema(STable *pTable);
void tsdbFreeLastColumns(STable* pTable);
static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) { static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
......
...@@ -76,6 +76,9 @@ struct STsdbRepo { ...@@ -76,6 +76,9 @@ struct STsdbRepo {
bool config_changed; // config changed flag bool config_changed; // config changed flag
pthread_mutex_t save_mutex; // protect save config pthread_mutex_t save_mutex; // protect save config
uint8_t hasCachedLastRow;
uint8_t hasCachedLastColumn;
STsdbAppH appH; STsdbAppH appH;
STsdbStat stat; STsdbStat stat;
STsdbMeta* tsdbMeta; STsdbMeta* tsdbMeta;
...@@ -100,6 +103,7 @@ int tsdbUnlockRepo(STsdbRepo* pRepo); ...@@ -100,6 +103,7 @@ int tsdbUnlockRepo(STsdbRepo* pRepo);
STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo); STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo); int tsdbCheckCommit(STsdbRepo* pRepo);
int tsdbRestoreInfo(STsdbRepo* pRepo); int tsdbRestoreInfo(STsdbRepo* pRepo);
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg);
void tsdbGetRootDir(int repoid, char dirName[]); void tsdbGetRootDir(int repoid, char dirName[]);
void tsdbGetDataDir(int repoid, char dirName[]); void tsdbGetDataDir(int repoid, char dirName[]);
......
...@@ -113,11 +113,15 @@ int tsdbScheduleCommit(STsdbRepo *pRepo) { ...@@ -113,11 +113,15 @@ int tsdbScheduleCommit(STsdbRepo *pRepo) {
} }
static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
pthread_mutex_lock(&pRepo->save_mutex);
pRepo->config_changed = false; pRepo->config_changed = false;
STsdbCfg * pSaveCfg = &pRepo->save_config; STsdbCfg * pSaveCfg = &pRepo->save_config;
STsdbCfg oldCfg;
int32_t oldTotalBlocks = pRepo->config.totalBlocks; int32_t oldTotalBlocks = pRepo->config.totalBlocks;
memcpy(&oldCfg, &(pRepo->config), sizeof(STsdbCfg));
pRepo->config.compression = pRepo->save_config.compression; pRepo->config.compression = pRepo->save_config.compression;
pRepo->config.keep = pRepo->save_config.keep; pRepo->config.keep = pRepo->save_config.keep;
pRepo->config.keep1 = pRepo->save_config.keep1; pRepo->config.keep1 = pRepo->save_config.keep1;
...@@ -125,10 +129,12 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { ...@@ -125,10 +129,12 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow; pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow;
pRepo->config.totalBlocks = pRepo->save_config.totalBlocks; pRepo->config.totalBlocks = pRepo->save_config.totalBlocks;
tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d),totalBlocks(%d)", pthread_mutex_unlock(&pRepo->save_mutex);
tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d->%d),totalBlocks(%d->%d)",
REPO_ID(pRepo), REPO_ID(pRepo),
pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2,
pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->totalBlocks); pSaveCfg->totalBlocks, oldCfg.cacheLastRow, pSaveCfg->cacheLastRow, oldTotalBlocks, pSaveCfg->totalBlocks);
int err = tsdbExpendPool(pRepo, oldTotalBlocks); int err = tsdbExpendPool(pRepo, oldTotalBlocks);
if (!TAOS_SUCCEEDED(err)) { if (!TAOS_SUCCEEDED(err)) {
...@@ -136,6 +142,12 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { ...@@ -136,6 +142,12 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err)); REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err));
} }
if (oldCfg.cacheLastRow != pRepo->config.cacheLastRow) {
if (tsdbLockRepo(pRepo) < 0) return;
tsdbCacheLastData(pRepo, &oldCfg);
tsdbUnlockRepo(pRepo);
}
} }
static void *tsdbLoopCommit(void *arg) { static void *tsdbLoopCommit(void *arg) {
...@@ -166,9 +178,7 @@ static void *tsdbLoopCommit(void *arg) { ...@@ -166,9 +178,7 @@ static void *tsdbLoopCommit(void *arg) {
// check if need to apply new config // check if need to apply new config
if (pRepo->config_changed) { if (pRepo->config_changed) {
pthread_mutex_lock(&pRepo->save_mutex);
tsdbApplyRepoConfig(pRepo); tsdbApplyRepoConfig(pRepo);
pthread_mutex_unlock(&pRepo->save_mutex);
} }
tsdbCommitData(pRepo); tsdbCommitData(pRepo);
......
...@@ -548,6 +548,8 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { ...@@ -548,6 +548,8 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
return NULL; return NULL;
} }
pRepo->config_changed = false; pRepo->config_changed = false;
atomic_store_8(&pRepo->hasCachedLastRow, 0);
atomic_store_8(&pRepo->hasCachedLastColumn, 0);
code = tsem_init(&(pRepo->readyToCommit), 0, 1); code = tsem_init(&(pRepo->readyToCommit), 0, 1);
if (code != 0) { if (code != 0) {
...@@ -636,7 +638,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -636,7 +638,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
int err = 0; int err = 0;
numColumns = schemaNCols(pSchema); numColumns = schemaNCols(pSchema);
if (numColumns <= pTable->maxColumnNum) { if (numColumns <= pTable->restoreColumnNum) {
return 0; return 0;
} }
if (pTable->lastColSVersion != schemaVersion(pSchema)) { if (pTable->lastColSVersion != schemaVersion(pSchema)) {
...@@ -675,7 +677,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -675,7 +677,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
SBlockIdx *pIdx = pReadh->pBlkIdx; SBlockIdx *pIdx = pReadh->pBlkIdx;
blockIdx = (int32_t)(pIdx->numOfBlocks - 1); blockIdx = (int32_t)(pIdx->numOfBlocks - 1);
while (numColumns > pTable->maxColumnNum && blockIdx >= 0) { while (numColumns > pTable->restoreColumnNum && blockIdx >= 0) {
bool loadStatisData = false; bool loadStatisData = false;
pBlock = pReadh->pBlkInfo->blocks + blockIdx; pBlock = pReadh->pBlkInfo->blocks + blockIdx;
blockIdx -= 1; blockIdx -= 1;
...@@ -693,7 +695,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -693,7 +695,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
loadStatisData = true; loadStatisData = true;
} }
for (int16_t i = 0; i < numColumns && numColumns > pTable->maxColumnNum; ++i) { for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) {
STColumn *pCol = schemaColAt(pSchema, i); STColumn *pCol = schemaColAt(pSchema, i);
// ignore loaded columns // ignore loaded columns
if (pTable->lastCols[i].bytes != 0) { if (pTable->lastCols[i].bytes != 0) {
...@@ -733,7 +735,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -733,7 +735,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
pLastCol->ts = dataRowKey(row); pLastCol->ts = dataRowKey(row);
pTable->maxColumnNum += 1; pTable->restoreColumnNum += 1;
tsdbDebug("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts); tsdbDebug("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts);
break; break;
...@@ -766,7 +768,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { ...@@ -766,7 +768,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
for (int i = 1; i < pMeta->maxTables; i++) { for (int i = 1; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue; if (pTable == NULL) continue;
pTable->maxColumnNum = 0; pTable->restoreColumnNum = 0;
} }
} }
...@@ -841,5 +843,165 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { ...@@ -841,5 +843,165 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
} }
tsdbDestroyReadH(&readh); tsdbDestroyReadH(&readh);
if (CACHE_LAST_ROW(pCfg)) {
atomic_store_8(&pRepo->hasCachedLastRow, 1);
}
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
atomic_store_8(&pRepo->hasCachedLastColumn, 1);
}
return 0;
}
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
bool cacheLastRow = false, cacheLastCol = false;
SFSIter fsiter;
SReadH readh;
SDFileSet *pSet;
STsdbMeta *pMeta = pRepo->tsdbMeta;
//STsdbCfg * pCfg = REPO_CFG(pRepo);
SBlock * pBlock;
int tableNum = 0;
int maxTableIdx = 0;
int cacheLastRowTableNum = 0;
int cacheLastColTableNum = 0;
bool need_free_last_row = CACHE_LAST_ROW(oldCfg) && !CACHE_LAST_ROW(&(pRepo->config));
bool need_free_last_col = CACHE_LAST_NULL_COLUMN(oldCfg) && !CACHE_LAST_NULL_COLUMN(&(pRepo->config));
if (CACHE_LAST_ROW(&(pRepo->config)) || CACHE_LAST_NULL_COLUMN(&(pRepo->config))) {
tsdbInfo("tsdbCacheLastData cache last data since cacheLast option changed");
cacheLastRow = !CACHE_LAST_ROW(oldCfg) && CACHE_LAST_ROW(&(pRepo->config));
cacheLastCol = !CACHE_LAST_NULL_COLUMN(oldCfg) && CACHE_LAST_NULL_COLUMN(&(pRepo->config));
}
// calc max table idx and table num
for (int i = 1; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue;
tableNum += 1;
maxTableIdx = i;
if (cacheLastCol) {
pTable->restoreColumnNum = 0;
}
}
// if close last option,need to free data
if (need_free_last_row || need_free_last_col) {
if (need_free_last_row) {
atomic_store_8(&pRepo->hasCachedLastRow, 0);
}
if (need_free_last_col) {
atomic_store_8(&pRepo->hasCachedLastColumn, 0);
}
tsdbInfo("free cache last data since cacheLast option changed");
for (int i = 1; i < maxTableIdx; i++) {
STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue;
if (need_free_last_row) {
taosTZfree(pTable->lastRow);
pTable->lastRow = NULL;
pTable->lastKey = TSKEY_INITIAL_VAL;
}
if (need_free_last_col) {
tsdbFreeLastColumns(pTable);
}
}
}
if (!cacheLastRow && !cacheLastCol) {
return 0;
}
cacheLastRowTableNum = cacheLastRow ? tableNum : 0;
cacheLastColTableNum = cacheLastCol ? tableNum : 0;
if (tsdbInitReadH(&readh, pRepo) < 0) {
return -1;
}
tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD);
while ((pSet = tsdbFSIterNext(&fsiter)) != NULL && (cacheLastRowTableNum > 0 || cacheLastColTableNum > 0)) {
if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) {
tsdbDestroyReadH(&readh);
return -1;
}
if (tsdbLoadBlockIdx(&readh) < 0) {
tsdbDestroyReadH(&readh);
return -1;
}
for (int i = 1; i <= maxTableIdx; i++) {
STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue;
//tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data);
if (tsdbSetReadTable(&readh, pTable) < 0) {
tsdbDestroyReadH(&readh);
return -1;
}
SBlockIdx *pIdx = readh.pBlkIdx;
if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) {
pTable->lastKey = pIdx->maxKey;
if (tsdbLoadBlockInfo(&readh, NULL) < 0) {
tsdbDestroyReadH(&readh);
return -1;
}
pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1;
if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) {
tsdbDestroyReadH(&readh);
return -1;
}
// Get the data in row
ASSERT(pTable->lastRow == NULL);
STSchema *pSchema = tsdbGetTableSchema(pTable);
pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema));
if (pTable->lastRow == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyReadH(&readh);
return -1;
}
tdInitDataRow(pTable->lastRow, pSchema);
for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
STColumn *pCol = schemaColAt(pSchema, icol);
SDataCol *pDataCol = readh.pDCols[0]->cols + icol;
tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes,
pCol->offset);
}
cacheLastRowTableNum -= 1;
}
// restore NULL columns
if (pIdx && cacheLastColTableNum > 0 && !pTable->hasRestoreLastColumn) {
if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) {
tsdbDestroyReadH(&readh);
return -1;
}
if (pTable->hasRestoreLastColumn) {
cacheLastColTableNum -= 1;
}
}
}
}
tsdbDestroyReadH(&readh);
if (cacheLastRow) {
atomic_store_8(&pRepo->hasCachedLastRow, 1);
}
if (cacheLastCol) {
atomic_store_8(&pRepo->hasCachedLastColumn, 1);
}
return 0; return 0;
} }
\ No newline at end of file
...@@ -44,7 +44,6 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable); ...@@ -44,7 +44,6 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable);
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable); static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable);
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid); static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid);
static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema); static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema);
static void tsdbFreeLastColumns(STable* pTable);
// ------------------ OUTER FUNCTIONS ------------------ // ------------------ OUTER FUNCTIONS ------------------
int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) { int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) {
...@@ -590,12 +589,12 @@ void tsdbUnRefTable(STable *pTable) { ...@@ -590,12 +589,12 @@ void tsdbUnRefTable(STable *pTable) {
} }
} }
static void tsdbFreeLastColumns(STable* pTable) { void tsdbFreeLastColumns(STable* pTable) {
if (pTable->lastCols == NULL) { if (pTable->lastCols == NULL) {
return; return;
} }
for (int i = 0; i < pTable->lastColNum; ++i) { for (int i = 0; i < pTable->maxColNum; ++i) {
if (pTable->lastCols[i].bytes == 0) { if (pTable->lastCols[i].bytes == 0) {
continue; continue;
} }
...@@ -605,14 +604,16 @@ static void tsdbFreeLastColumns(STable* pTable) { ...@@ -605,14 +604,16 @@ static void tsdbFreeLastColumns(STable* pTable) {
} }
tfree(pTable->lastCols); tfree(pTable->lastCols);
pTable->lastCols = NULL; pTable->lastCols = NULL;
pTable->lastColNum = 0; pTable->maxColNum = 0;
pTable->lastColSVersion = -1;
pTable->restoreColumnNum = 0;
} }
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) { int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) {
if (pTable->lastCols == NULL) { if (pTable->lastCols == NULL) {
return -1; return -1;
} }
for (int16_t i = 0; i < pTable->lastColNum; ++i) { for (int16_t i = 0; i < pTable->maxColNum; ++i) {
if (pTable->lastCols[i].colId == colId) { if (pTable->lastCols[i].colId == colId) {
return i; return i;
} }
...@@ -640,8 +641,8 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) { ...@@ -640,8 +641,8 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) {
} }
pTable->lastColSVersion = schemaVersion(pSchema); pTable->lastColSVersion = schemaVersion(pSchema);
pTable->lastColNum = numOfColumn; pTable->maxColNum = numOfColumn;
pTable->maxColumnNum = 0; pTable->restoreColumnNum = 0;
return 0; return 0;
} }
...@@ -682,11 +683,11 @@ int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) { ...@@ -682,11 +683,11 @@ int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) {
} }
SDataCol *oldLastCols = pTable->lastCols; SDataCol *oldLastCols = pTable->lastCols;
int16_t oldLastColNum = pTable->lastColNum; int16_t oldLastColNum = pTable->maxColNum;
pTable->lastColSVersion = schemaVersion(pNewSchema); pTable->lastColSVersion = schemaVersion(pNewSchema);
pTable->lastCols = lastCols; pTable->lastCols = lastCols;
pTable->lastColNum = numOfCols; pTable->maxColNum = numOfCols;
if (oldLastCols == NULL) { if (oldLastCols == NULL) {
TSDB_WUNLOCK_TABLE(pTable); TSDB_WUNLOCK_TABLE(pTable);
...@@ -797,8 +798,8 @@ static STable *tsdbNewTable() { ...@@ -797,8 +798,8 @@ static STable *tsdbNewTable() {
pTable->lastKey = TSKEY_INITIAL_VAL; pTable->lastKey = TSKEY_INITIAL_VAL;
pTable->lastCols = NULL; pTable->lastCols = NULL;
pTable->maxColumnNum = 0; pTable->restoreColumnNum = 0;
pTable->lastColNum = 0; pTable->maxColNum = 0;
pTable->lastColSVersion = -1; pTable->lastColSVersion = -1;
return pTable; return pTable;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册