提交 c95e2d89 编写于 作者: C Cary Xu

code optimization

上级 76721f8a
......@@ -148,7 +148,7 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
if (tsdbLockRepo(pRepo) < 0) return;
// tsdbCacheLastData(pRepo, &oldCfg);
// lazy load last cache when query or update
pRepo->cacheLastConfigVersion += 1;
++pRepo->cacheLastConfigVersion;
tsdbUnlockRepo(pRepo);
}
......
......@@ -658,6 +658,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
}
if (pTable->lastColSVersion != schemaVersion(pSchema)) {
if (tsdbInitColIdCacheWithSchema(pTable, pSchema) < 0) {
TSDB_WUNLOCK_TABLE(pTable);
return -1;
}
}
......@@ -711,7 +712,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns);
loadStatisData = true;
}
TSDB_WLOCK_TABLE(pTable); // lock when update pTable->lastCols[]
for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) {
STColumn *pCol = schemaColAt(pSchema, i);
// ignore loaded columns
......@@ -760,6 +761,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
break;
}
}
TSDB_WUNLOCK_TABLE(pTable);
}
out:
......@@ -800,23 +802,17 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh,
tdAppendColVal(memRowDataBody(lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type,
pCol->offset);
}
TSKEY lastKey = memRowKey(lastRow);
if (!pCfg->cacheLastRow && pTable->lastRow != NULL) {
SMemRow cachedLastRow = pTable->lastRow;
TSDB_WLOCK_TABLE(pTable);
pTable->lastRow = NULL;
TSDB_WUNLOCK_TABLE(pTable);
taosTZfree(cachedLastRow);
}
// during the load data in file, new data would be inserted and last row has been updated
if (tsdbGetTableLastKeyImpl(pTable) <= lastKey) {
TSDB_WLOCK_TABLE(pTable);
pTable->lastRow = lastRow;
TSDB_WLOCK_TABLE(pTable);
if (tsdbGetTableLastKeyImpl(pTable) < lastKey) {
pTable->lastKey = lastKey;
pTable->lastRow = lastRow;
TSDB_WUNLOCK_TABLE(pTable);
} else {
TSDB_WUNLOCK_TABLE(pTable);
taosTZfree(lastRow);
}
......@@ -897,15 +893,16 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
return 0;
}
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable) {
bool cacheLastRow = CACHE_LAST_ROW(&(pRepo->config));
bool cacheLastCol = CACHE_LAST_NULL_COLUMN(&(pRepo->config));
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) {
SFSIter fsiter;
SReadH readh;
SDFileSet *pSet;
int cacheLastRowTableNum = 0;
int cacheLastColTableNum = 0;
bool cacheLastRow = CACHE_LAST_ROW(&(pRepo->config));
bool cacheLastCol = CACHE_LAST_NULL_COLUMN(&(pRepo->config));
tsdbDebug("tsdbLoadLastCache for %s, cacheLastRow:%d, cacheLastCol:%d", pTable->name->data, cacheLastRow, cacheLastCol);
pTable->cacheLastConfigVersion = pRepo->cacheLastConfigVersion;
......@@ -933,10 +930,12 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable) {
return -1;
}
tsdbRLockFS(REPO_FS(pRepo));
tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD);
while ((pSet = tsdbFSIterNext(&fsiter)) != NULL && (cacheLastRowTableNum > 0 || cacheLastColTableNum > 0)) {
while ((cacheLastRowTableNum > 0 || cacheLastColTableNum > 0) && (pSet = tsdbFSIterNext(&fsiter)) != NULL) {
if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
......@@ -946,19 +945,25 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable) {
return -1;
}
//tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data);
// tsdbDebug("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data);
if (tsdbSetReadTable(&readh, pTable) < 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
SBlockIdx *pIdx = readh.pBlkIdx;
if (cacheLastRow && pIdx && pTable->lastRow == NULL && cacheLastRowTableNum > 0) {
if (pIdx && (cacheLastRowTableNum > 0) && (pTable->lastRow == NULL)) {
if (tsdbGetTableLastKeyImpl(pTable) < pTable->lastKey) {
TSDB_WLOCK_TABLE(pTable);
pTable->lastKey = pIdx->maxKey;
TSDB_WUNLOCK_TABLE(pTable);
}
if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
......@@ -966,8 +971,9 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable) {
}
// restore NULL columns
if (pIdx && cacheLastColTableNum > 0 && !pTable->hasRestoreLastColumn) {
if (pIdx && (cacheLastColTableNum > 0) && !pTable->hasRestoreLastColumn) {
if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
......@@ -977,6 +983,7 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable) {
}
}
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return 0;
......
......@@ -996,7 +996,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
if ((value == NULL) || isNull(value, pTCol->type)) {
continue;
}
// lock
TSDB_WLOCK_TABLE(pTable);
SDataCol *pDataCol = &(pLatestCols[idx]);
if (pDataCol->pData == NULL) {
pDataCol->pData = malloc(pTCol->bytes);
......@@ -1012,6 +1013,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
memcpy(pDataCol->pData, value, bytes);
//tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData);
pDataCol->ts = memRowKey(row);
// unlock
TSDB_WUNLOCK_TABLE(pTable);
}
}
......
......@@ -627,27 +627,30 @@ int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) {
}
int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) {
ASSERT(pTable->lastCols == NULL);
TSDB_WLOCK_TABLE(pTable);
if (pTable->lastCols == NULL) {
int16_t numOfColumn = pSchema->numOfCols;
int16_t numOfColumn = pSchema->numOfCols;
pTable->lastCols = (SDataCol *)malloc(numOfColumn * sizeof(SDataCol));
if (pTable->lastCols == NULL) {
TSDB_WUNLOCK_TABLE(pTable);
return -1;
}
pTable->lastCols = (SDataCol*)malloc(numOfColumn * sizeof(SDataCol));
if (pTable->lastCols == NULL) {
return -1;
}
for (int16_t i = 0; i < numOfColumn; ++i) {
STColumn *pCol = schemaColAt(pSchema, i);
SDataCol *pDataCol = &(pTable->lastCols[i]);
pDataCol->bytes = 0;
pDataCol->pData = NULL;
pDataCol->colId = pCol->colId;
}
for (int16_t i = 0; i < numOfColumn; ++i) {
STColumn *pCol = schemaColAt(pSchema, i);
SDataCol* pDataCol = &(pTable->lastCols[i]);
pDataCol->bytes = 0;
pDataCol->pData = NULL;
pDataCol->colId = pCol->colId;
pTable->lastColSVersion = schemaVersion(pSchema);
pTable->maxColNum = numOfColumn;
pTable->restoreColumnNum = 0;
pTable->hasRestoreLastColumn = false;
}
pTable->lastColSVersion = schemaVersion(pSchema);
pTable->maxColNum = numOfColumn;
pTable->restoreColumnNum = 0;
pTable->hasRestoreLastColumn = false;
TSDB_WUNLOCK_TABLE(pTable);
return 0;
}
......
......@@ -590,6 +590,28 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon
pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next);
}
static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle) {
STsdbRepo* pRepo = pQueryHandle->pTsdb;
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
int32_t code = 0;
for (size_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
STable* pTable = pCheckInfo->pTableObj;
if (pTable->cacheLastConfigVersion == pRepo->cacheLastConfigVersion) {
continue;
}
code = tsdbLoadLastCache(pRepo, pTable);
if (code != 0) {
tsdbError("%p uid:%" PRId64 ", tid:%d, failed to load last cache since %s", pQueryHandle, pTable->tableId.uid,
pTable->tableId.tid, tstrerror(terrno));
break;
}
}
return code;
}
TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) {
pCond->twindow = updateLastrowForEachGroup(groupList);
......@@ -619,30 +641,6 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
return pQueryHandle;
}
static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle) {
STsdbRepo* pRepo = pQueryHandle->pTsdb;
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
size_t i = 0;
int32_t code = 0;
for (i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
STable* pTable = pCheckInfo->pTableObj;
if (pTable->cacheLastConfigVersion == pRepo->cacheLastConfigVersion) {
continue;
}
if (tsdbLockRepo(pRepo) < 0) return -1;
code = tsdbLoadLastCache(pRepo, pTable);
if (tsdbUnlockRepo(pRepo) != 0) return -1;
if (code != 0) {
break;
}
}
return code;
}
TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
if (pQueryHandle == NULL) {
......@@ -2809,6 +2807,9 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) {
}
int32_t i = 0, j = 0;
// lock pTable->lastCols[i] as it would be released when schema update(tsdbUpdateLastColSchema)
TSDB_RLOCK_TABLE(pTable);
while(i < tgNumOfCols && j < numOfCols) {
pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (pTable->lastCols[j].colId < pColInfo->info.colId) {
......@@ -2895,6 +2896,7 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) {
i++;
j++;
}
TSDB_RUNLOCK_TABLE(pTable);
// leave the real ts column as the last row, because last function only (not stable) use the last row as res
if (priKey != TSKEY_INITIAL_VAL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册