diff --git a/src/common/src/ttypes.c b/src/common/src/ttypes.c index 6587a27760bd009d6d41e4b26a6218b22502ff51..6fa27a029bfd5356cca3e34dffe8d3018ade9fd8 100644 --- a/src/common/src/ttypes.c +++ b/src/common/src/ttypes.c @@ -417,18 +417,6 @@ void setVardataNull(char* val, int32_t type) { } } -bool isVardataNull(char* val, int32_t type) { - if (type == TSDB_DATA_TYPE_BINARY) { - return *(uint8_t*) varDataVal(val) == TSDB_DATA_BINARY_NULL; - } else if (type == TSDB_DATA_TYPE_NCHAR) { - return *(uint32_t*) varDataVal(val) == TSDB_DATA_NCHAR_NULL; - } else { - assert(0); - } - - return false; -} - void setNull(char *val, int32_t type, int32_t bytes) { setNullN(val, type, bytes, 1); } void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) { @@ -504,55 +492,6 @@ void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) { } } -bool isNullN(char *val, int32_t type) { - switch (type) { - case TSDB_DATA_TYPE_BOOL: - return *(uint8_t *)(val) == TSDB_DATA_BOOL_NULL; - break; - case TSDB_DATA_TYPE_TINYINT: - return *(uint8_t *)(val) == TSDB_DATA_TINYINT_NULL; - break; - case TSDB_DATA_TYPE_SMALLINT: - return *(uint16_t *)(val) == TSDB_DATA_SMALLINT_NULL; - break; - case TSDB_DATA_TYPE_INT: - return *(uint32_t *)(val) == TSDB_DATA_INT_NULL; - break; - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_TIMESTAMP: - return *(uint64_t *)(val) == TSDB_DATA_BIGINT_NULL; - break; - case TSDB_DATA_TYPE_UTINYINT: - return *(uint8_t *)(val) == TSDB_DATA_UTINYINT_NULL; - break; - case TSDB_DATA_TYPE_USMALLINT: - return *(uint16_t *)(val) == TSDB_DATA_USMALLINT_NULL; - break; - case TSDB_DATA_TYPE_UINT: - return *(uint32_t *)(val) == TSDB_DATA_UINT_NULL; - break; - case TSDB_DATA_TYPE_UBIGINT: - return *(uint64_t *)(val) == TSDB_DATA_UBIGINT_NULL; - break; - case TSDB_DATA_TYPE_FLOAT: - return *(uint32_t *)(val) == TSDB_DATA_FLOAT_NULL; - break; - case TSDB_DATA_TYPE_DOUBLE: - return *(uint64_t *)(val) == TSDB_DATA_DOUBLE_NULL; - break; - case TSDB_DATA_TYPE_NCHAR: - case TSDB_DATA_TYPE_BINARY: - return isVardataNull(val, type); - break; - default: { - return *(uint32_t *)(val) == TSDB_DATA_INT_NULL; - break; - } - } - - return false; -} - static uint8_t nullBool = TSDB_DATA_BOOL_NULL; static uint8_t nullTinyInt = TSDB_DATA_TINYINT_NULL; static uint16_t nullSmallInt = TSDB_DATA_SMALLINT_NULL; diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 7c28d3e485b70051ecb1a3bba315b7fc882c8bb1..5f2d866fdba1bfb2947aabe626612fba6366cd07 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -69,7 +69,7 @@ typedef struct { int8_t precision; int8_t compression; int8_t update; - int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column + int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2 } STsdbCfg; #define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0) diff --git a/src/inc/ttype.h b/src/inc/ttype.h index 43dbeb76403777927ab70970ab5f1bbd898450eb..9949f31c59f5805e381fbfbd8d5a65ba24eb40eb 100644 --- a/src/inc/ttype.h +++ b/src/inc/ttype.h @@ -178,9 +178,6 @@ void setNull(char *val, int32_t type, int32_t bytes); void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems); void *getNullValue(int32_t type); -bool isVardataNull(char* val, int32_t type); -bool isNullN(char *val, int32_t type); - void assignVal(char *val, const char *src, int32_t len, int32_t type); void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf); diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index 14d5a417684e40ff9cca18eaf672033075ced656..45bbd5a7c6911fed4ea7309a77d3ac144109a34b 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -45,9 +45,6 @@ typedef struct STable { T_REF_DECLARE() } STable; -#define TSDB_LATEST_COLUMN_ARRAY_SIZE 20 -#define TSDB_LATEST_COLUMN_ARRAY_ADD_SIZE 5 - typedef struct { pthread_rwlock_t rwLock; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index cc2fca420c09520a6a5c1e3d1c04ce8d954ce644..afbedd5b2fd231606902db104916e4ff4f10ba67 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -27,6 +27,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo); static void tsdbStartStream(STsdbRepo *pRepo); static void tsdbStopStream(STsdbRepo *pRepo); static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh); +static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx); // Function declaration int32_t tsdbCreateRepo(int repoid) { @@ -270,9 +271,7 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { pthread_mutex_unlock(&repo->save_mutex); // schedule a commit msg then the new config will be applied immediatly - if (tsdbLockRepo(repo) < 0) return -1; - tsdbScheduleCommit(repo); - if (tsdbUnlockRepo(repo) < 0) return -1; + tsdbAsyncCommit(repo); return 0; #if 0 @@ -645,6 +644,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea numColumns = schemaNCols(pSchema); if (numColumns <= pTable->restoreColumnNum) { + pTable->hasRestoreLastColumn = true; return 0; } if (pTable->lastColSVersion != schemaVersion(pSchema)) { @@ -719,7 +719,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); //SDataCol *pDataCol = readh.pDCols[0]->cols + j; void* value = tdGetRowDataOfCol(row, (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); - if (isNullN(value, pCol->type)) { + if (isNull(value, pCol->type)) { continue; } @@ -753,16 +753,51 @@ out: taosTZfree(row); tfree(pBlockStatis); + if (err == 0 && numColumns <= pTable->restoreColumnNum) { + pTable->hasRestoreLastColumn = true; + } + return err; } +static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) { + ASSERT(pTable->lastRow == NULL); + if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { + return -1; + } + + SBlock* pBlock = pReadh->pBlkInfo->blocks + pIdx->numOfBlocks - 1; + + if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) { + return -1; + } + + // Get the data in row + + STSchema *pSchema = tsdbGetTableSchema(pTable); + pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); + if (pTable->lastRow == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + tdInitDataRow(pTable->lastRow, pSchema); + for (int icol = 0; icol < schemaNCols(pSchema); icol++) { + STColumn *pCol = schemaColAt(pSchema, icol); + SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol; + tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes, + pCol->offset); + } + + return 0; +} + int tsdbRestoreInfo(STsdbRepo *pRepo) { SFSIter fsiter; SReadH readh; SDFileSet *pSet; STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbCfg * pCfg = REPO_CFG(pRepo); - SBlock * pBlock; if (tsdbInitReadH(&readh, pRepo) < 0) { return -1; @@ -805,41 +840,14 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { if (pIdx && lastKey < pIdx->maxKey) { pTable->lastKey = pIdx->maxKey; - if (CACHE_LAST_ROW(pCfg)) { - if (tsdbLoadBlockInfo(&readh, NULL) < 0) { - tsdbDestroyReadH(&readh); - return -1; - } - - pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1; - - if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) { - tsdbDestroyReadH(&readh); - return -1; - } - - // Get the data in row - ASSERT(pTable->lastRow == NULL); - STSchema *pSchema = tsdbGetTableSchema(pTable); - pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); - if (pTable->lastRow == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbDestroyReadH(&readh); - return -1; - } - - tdInitDataRow(pTable->lastRow, pSchema); - for (int icol = 0; icol < schemaNCols(pSchema); icol++) { - STColumn *pCol = schemaColAt(pSchema, icol); - SDataCol *pDataCol = readh.pDCols[0]->cols + icol; - tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes, - pCol->offset); - } + if (CACHE_LAST_ROW(pCfg) && tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) { + tsdbDestroyReadH(&readh); + return -1; } } // restore NULL columns - if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg)) { + if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg) && !pTable->hasRestoreLastColumn) { if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) { tsdbDestroyReadH(&readh); return -1; @@ -865,8 +873,6 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { SReadH readh; SDFileSet *pSet; STsdbMeta *pMeta = pRepo->tsdbMeta; - //STsdbCfg * pCfg = REPO_CFG(pRepo); - SBlock * pBlock; int tableNum = 0; int maxTableIdx = 0; int cacheLastRowTableNum = 0; @@ -955,35 +961,10 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) { pTable->lastKey = pIdx->maxKey; - if (tsdbLoadBlockInfo(&readh, NULL) < 0) { - tsdbDestroyReadH(&readh); - return -1; - } - - pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1; - - if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) { - tsdbDestroyReadH(&readh); - return -1; - } - - // Get the data in row - ASSERT(pTable->lastRow == NULL); - STSchema *pSchema = tsdbGetTableSchema(pTable); - pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); - if (pTable->lastRow == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) { tsdbDestroyReadH(&readh); return -1; } - - tdInitDataRow(pTable->lastRow, pSchema); - for (int icol = 0; icol < schemaNCols(pSchema); icol++) { - STColumn *pCol = schemaColAt(pSchema, icol); - SDataCol *pDataCol = readh.pDCols[0]->cols + icol; - tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes, - pCol->offset); - } cacheLastRowTableNum -= 1; } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index ff2a870f3f4272ce371659c0a767eabb1a48e468..79dbb8be5d8719d7a9df7fe71695d013dffdefc1 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -274,7 +274,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { int tsdbAsyncCommit(STsdbRepo *pRepo) { tsem_wait(&(pRepo->readyToCommit)); - ASSERT(pRepo->imem == NULL); + //ASSERT(pRepo->imem == NULL); if (pRepo->mem == NULL) { tsem_post(&(pRepo->readyToCommit)); return 0; @@ -965,7 +965,7 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { } static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) { - tsdbInfo("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, dataRowVersion(row)); + tsdbDebug("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, dataRowVersion(row)); STSchema* pSchema = tsdbGetTableLatestSchema(pTable); if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) { @@ -988,7 +988,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r } void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); - if (isNullN(value, pTCol->type)) { + if (isNull(value, pTCol->type)) { continue; }