diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index 8d7a7cb1d066ed6a07d49025d4b5fb4930f86ba6..bff041df1b98c955b67bf9e8a44fa360362908f4 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -346,7 +346,6 @@ void tdResetDataCols(SDataCols *pCols) { } void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { - TSKEY key = dataRowKey(row); for (int i = 0; i < pCols->numOfCols; i++) { SDataCol *pCol = pCols->cols + i; memcpy((void *)((char *)(pCol->pData) + pCol->len), dataRowAt(row, pCol->offset), pCol->bytes); @@ -384,5 +383,5 @@ static int tdFLenFromSchema(STSchema *pSchema) { } int tdMergeDataCols(SDataCols *target, SDataCols *source) { - + return 0; } \ No newline at end of file diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index 1be9bbb64be2807c3c44d3ca83d789b82f563bb7..ea1859729e55ab4ca35e0aa86461164afa7feb51 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -32,6 +32,7 @@ static void *tsDnodeVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); static void vnodeBuildVloadMsg(char *pNode, void * param); +static int vnodeWALCallback(void *arg); static int tsOpennedVnodes; static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; @@ -120,24 +121,29 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { vnodeObj.version = 0; SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj)); - sprintf(temp, "%s/tsdb", rootDir); - void *pTsdb = tsdbOpenRepo(temp); - if (pTsdb == NULL) { - dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno)); - taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); - return terrno; - } - pVnode->wqueue = dnodeAllocateWqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode); sprintf(temp, "%s/wal", rootDir); pVnode->wal = walOpen(temp, 3, tsCommitLog); - pVnode->tsdb = pTsdb; pVnode->sync = NULL; pVnode->events = NULL; pVnode->cq = NULL; + STsdbAppH appH = {0}; + appH.appH = (void *)pVnode; + appH.walCallBack = vnodeWALCallback; + + sprintf(temp, "%s/tsdb", rootDir); + void *pTsdb = tsdbOpenRepo(temp, &appH); + if (pTsdb == NULL) { + dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno)); + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); + return terrno; + } + + pVnode->tsdb = pTsdb; + walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); pVnode->status = VN_STATUS_READY; @@ -249,3 +255,8 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { vnodeRelease(pVnode); } + +static int vnodeWALCallback(void *arg) { + SVnodeObj *pVnode = arg; + return walRenew(pVnode->wal); +} \ No newline at end of file diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index 85575e1a8b0f77f2456e29acb207ba70d7afd1f5..b20711df1dc4b4a515dad83105bc9977588f5483 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -34,6 +34,15 @@ extern "C" { #define TSDB_INVALID_SUPER_TABLE_ID -1 +// --------- TSDB APPLICATION HANDLE DEFINITION +typedef struct { + // WAL handle + void *appH; + int (*walCallBack)(void *); + int (*eventCallBack)(void *); + int (*cqueryCallBack)(void *); +} STsdbAppH; + // --------- TSDB REPOSITORY CONFIGURATION DEFINITION typedef struct { int8_t precision; @@ -55,7 +64,7 @@ typedef void tsdb_repo_t; // use void to hide implementation details from outsi int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter); int32_t tsdbDropRepo(tsdb_repo_t *repo); -tsdb_repo_t * tsdbOpenRepo(char *tsdbDir); +tsdb_repo_t * tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH); int32_t tsdbCloseRepo(tsdb_repo_t *repo); int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg); int32_t tsdbTriggerCommit(tsdb_repo_t *repo); diff --git a/src/vnode/tsdb/inc/tsdbMain.h b/src/vnode/tsdb/inc/tsdbMain.h index a9021550451c387aa85643b17bec81f02fa03ae3..06f62ea6f728620f01b70a16794b566d6eae273b 100644 --- a/src/vnode/tsdb/inc/tsdbMain.h +++ b/src/vnode/tsdb/inc/tsdbMain.h @@ -322,6 +322,8 @@ typedef struct _tsdb_repo { // TSDB configuration STsdbCfg config; + STsdbAppH appH; + // The meter meta handle of this TSDB repository STsdbMeta *tsdbMeta; diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index cb59c004efed1533326a8517e657f177aef16c29..1a8e50d0eeab0ded741d19ae71b796d9f19343fd 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -177,7 +177,7 @@ int32_t tsdbDropRepo(tsdb_repo_t *repo) { * * @return a TSDB repository handle on success, NULL for failure and the error number is set */ -tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { +tsdb_repo_t *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) { char dataDir[128] = "\0"; if (access(tsdbDir, F_OK | W_OK | R_OK) < 0) { return NULL; @@ -191,6 +191,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { pRepo->rootDir = strdup(tsdbDir); tsdbRestoreCfg(pRepo, &(pRepo->config)); + if (pAppH) pRepo->appH = *pAppH; pRepo->tsdbMeta = tsdbInitMeta(tsdbDir, pRepo->config.maxTables); if (pRepo->tsdbMeta == NULL) { diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index d3a5b680aad5999d573442278f1a8928fad29ed9..9ee49d6a708a65b3344fc5fd56f458b0a9dd23cc 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -140,7 +140,7 @@ TEST(TsdbTest, createRepo) { // TEST(TsdbTest, DISABLED_openRepo) { TEST(TsdbTest, openRepo) { - tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0"); + tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL); ASSERT_NE(repo, nullptr); STsdbRepo *pRepo = (STsdbRepo *)repo;