diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index e12f51fd44cb768ba530441705acce1f3e1a0cb6..4964ac673f1b25d351d3eb5f0e8e146d510776db 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -58,6 +58,7 @@ int32_t tsdbDropRepo(tsdb_repo_t *repo); tsdb_repo_t * tsdbOpenRepo(char *tsdbDir); int32_t tsdbCloseRepo(tsdb_repo_t *repo); int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg); +int32_t tsdbTriggerCommit(tsdb_repo_t *repo); // --------- TSDB TABLE DEFINITION typedef struct { diff --git a/src/vnode/tsdb/inc/tsdbMeta.h b/src/vnode/tsdb/inc/tsdbMeta.h index b18d16d0d94a6e242cb2f198ddfc960b366ea629..38f0818dfba826ee2300b6c0f2eee3fe2232423f 100644 --- a/src/vnode/tsdb/inc/tsdbMeta.h +++ b/src/vnode/tsdb/inc/tsdbMeta.h @@ -35,20 +35,21 @@ extern "C" { // ---------- TSDB TABLE DEFINITION typedef struct STable { - int8_t type; - STableId tableId; - int32_t superUid; // Super table UID - int32_t sversion; - STSchema * schema; - STSchema * tagSchema; - SDataRow tagVal; + int8_t type; + STableId tableId; + int32_t superUid; // Super table UID + int32_t sversion; + STSchema *schema; + STSchema *tagSchema; + SDataRow tagVal; union { void *pData; // For TSDB_NORMAL_TABLE and TSDB_CHILD_TABLE, it is the skiplist for cache data void *pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index } content; + void * iData; // Skiplist to commit void * eventHandler; // TODO void * streamHandler; // TODO - struct STable *next; // TODO: remove the next + struct STable *next; // TODO: remove the next } STable; void * tsdbEncodeTable(STable *pTable, int *contLen); diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 87cd23eb92e502fb959d7338b1f3adcc974e0067..34fca7e428e051be33d855af21949f0618b5aac1 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -64,7 +64,10 @@ typedef struct _tsdb_repo { // Disk tier handle for multi-tier storage void *diskTier; - pthread_mutex_t tsdbMutex; + pthread_mutex_t mutex; + + int commit; + pthread_t commitThread; // A limiter to monitor the resources used by tsdb void *limiter; @@ -80,6 +83,7 @@ static int tsdbOpenMetaFile(char *tsdbDir); static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock); static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); +static void * tsdbCommitToFile(void *arg); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -298,6 +302,18 @@ int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg) { return 0; } +int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { + STsdbRepo *pRepo = (STsdbRepo *)repo; + + if (pthread_mutex_lock(&(pRepo->mutex)) < 0) return -1; + if (pRepo->commit) return 0; + pRepo->commit = 1; + pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo); + pthread_mutex_unlock(&(pRepo->mutex)); + + return 0; +} + /** * Get the TSDB repository information, including some statistics * @param pRepo the TSDB repository handle @@ -673,4 +689,10 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { } return 0; +} + +static void *tsdbCommitToFile(void *arg) { + STsdbRepo *pRepo = (STsdbRepo *)arg; + // TODO + return NULL; } \ No newline at end of file