From fe8b9e03c317ca9ba82345c17ec1b6b2f60df539 Mon Sep 17 00:00:00 2001 From: Tao Liu Date: Tue, 9 Jun 2020 03:13:58 +0000 Subject: [PATCH] [TD-553] add usage statistics function --- src/common/inc/tdataformat.h | 7 +++++-- src/common/src/tdataformat.c | 10 +++++++++- src/inc/tsdb.h | 16 ++++++++++++++++ src/tsdb/inc/tsdbMain.h | 2 ++ src/tsdb/src/tsdbMain.c | 16 +++++++++++++++- src/vnode/inc/vnodeInt.h | 3 --- src/vnode/src/vnodeMain.c | 10 ++++++---- 7 files changed, 53 insertions(+), 11 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index ec4e544e18..441af9a215 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -69,7 +69,8 @@ typedef struct { int version; // version int numOfCols; // Number of columns appended int tlen; // maximum length of a SDataRow without the header part - int flen; // First part length in a SDataRow after the header part + int16_t flen; // First part length in a SDataRow after the header part + int16_t vlen; // pure value part length, excluded the overhead STColumn columns[]; } STSchema; @@ -77,6 +78,7 @@ typedef struct { #define schemaVersion(s) ((s)->version) #define schemaTLen(s) ((s)->tlen) #define schemaFLen(s) ((s)->flen) +#define schemaVLen(s) ((s)->vlen) #define schemaColAt(s, i) ((s)->columns + i) #define tdFreeSchema(s) tfree((s)) @@ -105,7 +107,8 @@ typedef struct { int tCols; int nCols; int tlen; - int flen; + int16_t flen; + int16_t vlen; int version; STColumn *columns; } STSchemaBuilder; diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 7880a4b302..cb84c376eb 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -37,6 +37,7 @@ void *tdEncodeSchema(void *dst, STSchema *pSchema) { T_APPEND_MEMBER(dst, pSchema, STSchema, version); T_APPEND_MEMBER(dst, pSchema, STSchema, numOfCols); + T_APPEND_MEMBER(dst, pSchema, STSchema, vlen); for (int i = 0; i < schemaNCols(pSchema); i++) { STColumn *pCol = schemaColAt(pSchema, i); T_APPEND_MEMBER(dst, pCol, STColumn, type); @@ -53,10 +54,12 @@ void *tdEncodeSchema(void *dst, STSchema *pSchema) { STSchema *tdDecodeSchema(void **psrc) { int totalCols = 0; int version = 0; + int16_t vlen = 0; STSchemaBuilder schemaBuilder = {0}; T_READ_MEMBER(*psrc, int, version); T_READ_MEMBER(*psrc, int, totalCols); + T_READ_MEMBER(*psrc, int16_t, vlen); if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL; @@ -75,6 +78,7 @@ STSchema *tdDecodeSchema(void **psrc) { } STSchema *pSchema = tdGetSchemaFromBuilder(&schemaBuilder); + pSchema->vlen = vlen; tdDestroyTSchemaBuilder(&schemaBuilder); return pSchema; } @@ -100,6 +104,7 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) { pBuilder->nCols = 0; pBuilder->tlen = 0; pBuilder->flen = 0; + pBuilder->vlen = 0; pBuilder->version = version; } @@ -124,10 +129,12 @@ int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int3 if (IS_VAR_DATA_TYPE(type)) { colSetBytes(pCol, bytes); - pBuilder->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes); + pBuilder->tlen += (TYPE_BYTES[type] + bytes); + pBuilder->vlen += bytes - sizeof(VarDataLenT); } else { colSetBytes(pCol, TYPE_BYTES[type]); pBuilder->tlen += TYPE_BYTES[type]; + pBuilder->vlen += TYPE_BYTES[type]; } pBuilder->nCols++; @@ -150,6 +157,7 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) { schemaNCols(pSchema) = pBuilder->nCols; schemaTLen(pSchema) = pBuilder->tlen; schemaFLen(pSchema) = pBuilder->flen; + schemaVLen(pSchema) = pBuilder->vlen; memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols); diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index a678f213bb..bdb61e6ee1 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -65,6 +65,13 @@ typedef struct { int8_t compression; } STsdbCfg; +// --------- TSDB REPOSITORY USAGE STATISTICS +typedef struct { + int64_t totalStorage; //total bytes occupie + int64_t compStorage; + int64_t pointsWritten; //total data points written +}STsdbStat; + typedef void TsdbRepoT; // use void to hide implementation details from outside void tsdbSetDefaultCfg(STsdbCfg *pCfg); @@ -306,6 +313,15 @@ int32_t tsdbGetOneTableGroup(TsdbRepoT *tsdb, uint64_t uid, STableGroupInfo *pGr */ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle); +/** + * get the statistics of repo usage + * @param repo. point to the tsdbrepo + * @param totalPoints. total data point written + * @param totalStorage. total bytes took by the tsdb + * @param compStorage. total bytes took by the tsdb after compressed + */ +void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage); + #ifdef __cplusplus } #endif diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index e06778a872..5926f2f243 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -357,6 +357,8 @@ typedef struct STsdbRepo { STsdbAppH appH; + STsdbStat stat; + // The meter meta handle of this TSDB repository STsdbMeta *tsdbMeta; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 5e32a9e7d7..5526ad0d6e 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -953,6 +953,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows) { STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbMeta *pMeta = pRepo->tsdbMeta; + int64_t points = 0; STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid}; STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId); @@ -964,7 +965,9 @@ static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY // Check schema version int32_t tversion = pBlock->sversion; - int16_t nversion = schemaVersion(tsdbGetTableSchema(pMeta, pTable)); + STSchema * pSchema = tsdbGetTableSchema(pMeta, pTable); + ASSERT(pSchema != NULL); + int16_t nversion = schemaVersion(pSchema); if (tversion > nversion) { tsdbTrace("vgId:%d table:%s tid:%d server schema version %d is older than clien version %d, try to config.", pRepo->config.tsdbId, varDataVal(pTable->name), pTable->tableId.tid, nversion, tversion); @@ -1014,7 +1017,10 @@ static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY return -1; } (*affectedrows)++; + points++; } + atomic_fetch_add_64(&(pRepo->stat.pointsWritten), points * (pSchema->numOfCols)); + atomic_fetch_add_64(&(pRepo->stat.totalStorage), points * pSchema->vlen); return TSDB_CODE_SUCCESS; } @@ -1381,3 +1387,11 @@ uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, uint32_t return magic; } + +void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage){ + ASSERT(repo != NULL); + STsdbRepo * pRepo = repo; + *totalPoints = pRepo->stat.pointsWritten; + *totalStorage = pRepo->stat.totalStorage; + *compStorage = pRepo->stat.compStorage; +} \ No newline at end of file diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index af7f764717..7c95e81cf5 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -39,9 +39,6 @@ typedef struct { int8_t role; int64_t version; // current version int64_t fversion; // version on saved data file - int64_t totalStorage; //total bytes occupie - int64_t compStorage; - int64_t pointsWritten; //total data points written void *wqueue; void *rqueue; void *wal; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 256ef3c72b..59f2a21b37 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -371,16 +371,18 @@ void *vnodeGetWal(void *pVnode) { static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) { if (pVnode->status == TAOS_VN_STATUS_DELETING) return; if (pStatus->openVnodes >= TSDB_MAX_VNODES) return; + int64_t totalStorage, compStorage, pointsWritten = 0; + tsdbReportStat(pVnode->tsdb, &pointsWritten, &totalStorage, &compStorage); SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++]; pLoad->vgId = htonl(pVnode->vgId); pLoad->cfgVersion = htonl(pVnode->cfgVersion); - pLoad->totalStorage = htobe64(pLoad->totalStorage); - pLoad->compStorage = htobe64(pLoad->compStorage); - pLoad->pointsWritten = htobe64(pLoad->pointsWritten); + pLoad->totalStorage = htobe64(totalStorage); + pLoad->compStorage = htobe64(compStorage); + pLoad->pointsWritten = htobe64(pointsWritten); pLoad->status = pVnode->status; pLoad->role = pVnode->role; - pLoad->replica = pVnode->syncCfg.replica; + pLoad->replica = pVnode->syncCfg.replica; } void vnodeBuildStatusMsg(void *param) { -- GitLab