From 5808280aceb55fa760c7215d64f7b4b10b0de5cf Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 21 Dec 2020 07:49:31 +0000 Subject: [PATCH] TD-2354: implement last_row cache --- src/inc/tsdb.h | 3 ++- src/tsdb/inc/tsdbMain.h | 26 ++++++++++++++++++---- src/tsdb/src/tsdbCommit.c | 6 ++--- src/tsdb/src/tsdbMain.c | 41 ++++++++++++++++++++++++++++++---- src/tsdb/src/tsdbMemTable.c | 44 +++++++++++++++++++++++++++++++++---- src/tsdb/src/tsdbMeta.c | 21 ++++++++++++------ 6 files changed, 118 insertions(+), 23 deletions(-) diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 04d6c78815..262bf30309 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -66,6 +66,7 @@ typedef struct { int8_t precision; int8_t compression; int8_t update; + int8_t cacheLastRow; } STsdbCfg; // --------- TSDB REPOSITORY USAGE STATISTICS @@ -119,7 +120,7 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg); int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId); int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg); -TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid); +// TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid); uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size); diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 5c978abd1d..45eb25ee15 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -66,7 +66,10 @@ typedef struct STable { SSkipList* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index void* eventHandler; // TODO void* streamHandler; // TODO - TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure + union { + TSKEY lastKey; + SDataRow lastRow; + }; char* sql; void* cqhandle; SRWLatch latch; // TODO: implementa latch functions @@ -360,8 +363,11 @@ typedef struct { #define TABLE_UID(t) (t)->tableId.uid #define TABLE_TID(t) (t)->tableId.tid #define TABLE_SUID(t) (t)->suid -#define TABLE_LASTKEY(t) (t)->lastKey #define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore) +#define TSDB_RLOCK_TABLE(t) taosRLockLatch(&((t)->latch)) +#define TSDB_RUNLOCK_TABLE(t) taosRUnLockLatch(&((t)->latch)) +#define TSDB_WLOCK_TABLE(t) taosWLockLatch(&((t)->latch)) +#define TSDB_WUNLOCK_TABLE(t) taosWUnLockLatch(&((t)->latch)) STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg); void tsdbFreeMeta(STsdbMeta* pMeta); @@ -391,7 +397,7 @@ static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, STSchema* pSchema = NULL; STSchema* pTSchema = NULL; - if (lock) taosRLockLatch(&(pDTable->latch)); + if (lock) TSDB_RLOCK_TABLE(pDTable); if (version < 0) { // get the latest version of schema pTSchema = pDTable->schema[pDTable->numOfSchemas - 1]; } else { // get the schema with version @@ -413,7 +419,7 @@ static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, } _exit: - if (lock) taosRUnLockLatch(&(pDTable->latch)); + if (lock) TSDB_RUNLOCK_TABLE(pDTable); return pSchema; } @@ -433,6 +439,18 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { } } +static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable, bool cacheLastRow) { + if (cacheLastRow) { + if (pTable->lastRow == NULL) { + return TSKEY_INITIAL_VAL; + } else { + return dataRowKey(pTable->lastRow); + } + } else { + return pTable->lastKey; + } +} + // ------------------ tsdbBuffer.c #define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 637b02cd32..3007d35197 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -220,7 +220,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe SCommitIter *pIter = iters + tid; if (pIter->pTable == NULL) continue; - taosRLockLatch(&(pIter->pTable->latch)); + TSDB_RLOCK_TABLE(pIter->pTable); if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err; @@ -231,7 +231,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe } if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) { - taosRUnLockLatch(&(pIter->pTable->latch)); + TSDB_RUNLOCK_TABLE(pIter->pTable); tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable), tstrerror(terrno)); @@ -239,7 +239,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe } } - taosRUnLockLatch(&(pIter->pTable->latch)); + TSDB_RUNLOCK_TABLE(pIter->pTable); // Move the last block to the new .l file if neccessary if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) { diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 9d65325001..8c8375c0f1 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -77,9 +77,9 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { tsdbDebug( "vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d daysPerFile %d keep " - "%d minRowsPerFileBlock %d maxRowsPerFileBlock %d precision %d compression %d", + "%d minRowsPerFileBlock %d maxRowsPerFileBlock %d precision %d compression %d update %d cacheLastRow %d", pCfg->tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock, - pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression); + pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression, pCfg->update, pCfg->cacheLastRow); return 0; } @@ -475,6 +475,9 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { // update check if (pCfg->update != 0) pCfg->update = 1; + // update cacheLastRow + if (pCfg->cacheLastRow != 0) pCfg->cacheLastRow = 1; + return 0; _err: @@ -692,10 +695,12 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) { } } -static int tsdbRestoreInfo(STsdbRepo *pRepo) { +static int tsdbRestoreInfo(STsdbRepo *pRepo) { // TODO STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; SFileGroup *pFGroup = NULL; + STsdbCfg * pCfg = &(pRepo->config); + SCompBlock *pBlock = NULL; SFileGroupIter iter; SRWHelper rhelper = {0}; @@ -713,7 +718,33 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) { if (tsdbSetHelperTable(&rhelper, pTable, pRepo) < 0) goto _err; SCompIdx *pIdx = &(rhelper.curCompIdx); - if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey; + TSKEY lastKey = tsdbGetTableLastKeyImpl(pTable, pCfg->cacheLastRow); + if (pIdx->offset > 0 && lastKey < pIdx->maxKey) { + if (pCfg->cacheLastRow) { // load the block of data + if (tsdbLoadCompInfo(&rhelper, NULL) < 0) goto _err; + + pBlock = rhelper.pCompInfo->blocks + pIdx->numOfBlocks - 1; + if (tsdbLoadBlockData(&rhelper, pBlock, NULL) < 0) goto _err; + + // construct the data row + ASSERT(pTable->lastRow == NULL); + STSchema *pSchema = tsdbGetTableSchema(pTable); + pTable->lastRow = taosTMalloc(schemaTLen(pSchema)); + if (pTable->lastRow == NULL) { + goto _err; + } + + tdInitDataRow(pTable->lastRow, pSchema); + for (int icol = 0; icol < schemaNCols(pSchema); icol++) { + STColumn *pCol = schemaColAt(pSchema, icol); + SDataCol *pDataCol = rhelper.pDataCols[0]->cols + icol; + tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes, + pCol->offset); + } + } else { + pTable->lastKey = pIdx->maxKey; + } + } } } @@ -800,6 +831,7 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) { tlen += taosEncodeFixedI8(buf, pCfg->precision); tlen += taosEncodeFixedI8(buf, pCfg->compression); tlen += taosEncodeFixedI8(buf, pCfg->update); + tlen += taosEncodeFixedI8(buf, pCfg->cacheLastRow); return tlen; } @@ -817,6 +849,7 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) { buf = taosDecodeFixedI8(buf, &(pCfg->precision)); buf = taosDecodeFixedI8(buf, &(pCfg->compression)); buf = taosDecodeFixedI8(buf, &(pCfg->update)); + buf = taosDecodeFixedI8(buf, &(pCfg->cacheLastRow)); return buf; } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 999e2deb41..9582c0b49e 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -35,6 +35,7 @@ static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPB static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter); static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter); +static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row); static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey, TSKEY now); @@ -663,9 +664,10 @@ static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void return -1; } - if (key > TABLE_LASTKEY(pTable)) { + TSKEY lastKey = tsdbGetTableLastKeyImpl(pTable, pCfg->cacheLastRow); + if (key > lastKey) { tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64, - REPO_ID(pRepo), key, TABLE_LASTKEY(pTable)); + REPO_ID(pRepo), key, lastKey); return 0; } } @@ -846,8 +848,10 @@ static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **ro if (pTableData->keyLast < dataRowKey(rows[rowCounter - 1])) pTableData->keyLast = dataRowKey(rows[rowCounter - 1]); pTableData->numOfRows += dsize; - // TODO: impl delete row thing - if (TABLE_LASTKEY(pTable) < dataRowKey(rows[rowCounter-1])) TABLE_LASTKEY(pTable) = dataRowKey(rows[rowCounter-1]); + // update table latest info + if (tsdbUpdateTableLatestInfo(pRepo, pTable, rows[rowCounter - 1]) < 0) { + return -1; + } return 0; } @@ -889,4 +893,36 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { } } } +} + +static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) { + STsdbCfg *pCfg = &pRepo->config; + + if (tsdbGetTableLastKeyImpl(pTable, pCfg->cacheLastRow) < dataRowKey(row)) { + if (pCfg->cacheLastRow) { + SDataRow nrow = pTable->lastRow; + if (taosTSizeof(nrow) < dataRowLen(row)) { + SDataRow orow = nrow; + nrow = taosTMalloc(dataRowLen(row)); + if (nrow == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + dataRowCpy(nrow, row); + TSDB_WLOCK_TABLE(pTable); + pTable->lastRow = nrow; + TSDB_WUNLOCK_TABLE(pTable); + taosTZfree(orow); + } else { + TSDB_WLOCK_TABLE(pTable); + dataRowCpy(nrow, row); + TSDB_WUNLOCK_TABLE(pTable); + } + } else { + pTable->lastKey = dataRowKey(row); + } + } + + return 0; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 9dfa147c8f..17f5e7052e 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -377,11 +377,11 @@ int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { // Chage in memory if (pNewSchema != NULL) { // change super table tag schema - taosWLockLatch(&(pTable->pSuper->latch)); + TSDB_WLOCK_TABLE(pTable->pSuper); STSchema *pOldSchema = pTable->pSuper->tagSchema; pTable->pSuper->tagSchema = pNewSchema; tdFreeSchema(pOldSchema); - taosWUnLockLatch(&(pTable->pSuper->latch)); + TSDB_WUNLOCK_TABLE(pTable->pSuper); } bool isChangeIndexCol = (pMsg->colId == colColId(schemaColAt(pTable->pSuper->tagSchema, 0))); @@ -392,9 +392,9 @@ int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { tsdbWLockRepoMeta(pRepo); tsdbRemoveTableFromIndex(pMeta, pTable); } - taosWLockLatch(&(pTable->latch)); + TSDB_WLOCK_TABLE(pTable); tdSetKVRowDataOfCol(&(pTable->tagVal), pMsg->colId, pMsg->type, POINTER_SHIFT(pMsg->data, pMsg->schemaLen)); - taosWUnLockLatch(&(pTable->latch)); + TSDB_WUNLOCK_TABLE(pTable); if (isChangeIndexCol) { tsdbAddTableIntoIndex(pMeta, pTable, false); tsdbUnlockRepoMeta(pRepo); @@ -587,7 +587,7 @@ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema, STable *pCTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; ASSERT(schemaVersion(pSchema) > schemaVersion(pCTable->schema[pCTable->numOfSchemas - 1])); - taosWLockLatch(&(pCTable->latch)); + TSDB_WLOCK_TABLE(pCTable); if (pCTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) { pCTable->schema[pCTable->numOfSchemas++] = pSchema; } else { @@ -599,7 +599,7 @@ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema, if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema); if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema); - taosWUnLockLatch(&(pCTable->latch)); + TSDB_WUNLOCK_TABLE(pCTable); if (insertAct) { int tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pCTable); @@ -663,7 +663,7 @@ static STable *tsdbNewTable() { return NULL; } - pTable->lastKey = TSKEY_INITIAL_VAL; + // pTable->lastKey = TSKEY_INITIAL_VAL; return pTable; } @@ -782,6 +782,13 @@ static void tsdbFreeTable(STable *pTable) { static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, bool lock) { STsdbMeta *pMeta = pRepo->tsdbMeta; + STsdbCfg * pCfg = &(pRepo->config); + + if (pCfg->cacheLastRow) { + pTable->lastRow = NULL; + } else { + pTable->lastKey = TSKEY_INITIAL_VAL; + } if (lock && tsdbWLockRepoMeta(pRepo) < 0) { tsdbError("vgId:%d failed to add table %s to meta since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), -- GitLab