diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index 4e8afd4f0eec26b42842fd4b4ddfcff05d876a13..04fa7dcc7dd5e1cb8498b4fa4eb033dfcc92e7c3 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -110,7 +110,8 @@ typedef struct SDataCol { int bytes; int len; int offset; - void * pData; + void * pData; // Original data + void * pCData; // Compressed data } SDataCol; typedef struct { diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index 436426c86b5a3279587cb71e3f7b60bdbb2bc271..bff041df1b98c955b67bf9e8a44fa360362908f4 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -317,14 +317,17 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { pCols->numOfCols = schemaNCols(pSchema); pCols->cols[0].pData = pCols->buf; + int offset = TD_DATA_ROW_HEAD_SIZE; for (int i = 0; i < schemaNCols(pSchema); i++) { if (i > 0) { pCols->cols[i].pData = (char *)(pCols->cols[i - 1].pData) + schemaColAt(pSchema, i - 1)->bytes * pCols->maxPoints; } pCols->cols[i].type = colType(schemaColAt(pSchema, i)); pCols->cols[i].bytes = colBytes(schemaColAt(pSchema, i)); - pCols->cols[i].offset = colOffset(schemaColAt(pSchema, i)); + pCols->cols[i].offset = offset; pCols->cols[i].colId = colColId(schemaColAt(pSchema, i)); + + offset += TYPE_BYTES[pCols->cols[i].type]; } } @@ -343,7 +346,6 @@ void tdResetDataCols(SDataCols *pCols) { } void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { - TSKEY key = dataRowKey(row); for (int i = 0; i < pCols->numOfCols; i++) { SDataCol *pCol = pCols->cols + i; memcpy((void *)((char *)(pCol->pData) + pCol->len), dataRowAt(row, pCol->offset), pCol->bytes); @@ -379,3 +381,7 @@ static int tdFLenFromSchema(STSchema *pSchema) { return ret; } + +int tdMergeDataCols(SDataCols *target, SDataCols *source) { + return 0; +} \ No newline at end of file diff --git a/src/os/linux/inc/os.h b/src/os/linux/inc/os.h index d3c401e3cfc3d4e6ee1d1ebca4baa6b7752355ba..ee75e1a0795323e83ed90243beb9f8ff11738485 100644 --- a/src/os/linux/inc/os.h +++ b/src/os/linux/inc/os.h @@ -205,6 +205,8 @@ ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size); ssize_t twrite(int fd, void *buf, size_t n); +ssize_t tread(int fd, void *buf, size_t count); + bool taosCheckPthreadValid(pthread_t thread); void taosResetPthread(pthread_t *thread); diff --git a/src/os/linux/src/tlinux.c b/src/os/linux/src/tlinux.c index 780e2903a064c45211ea3877c3fb2fef3a6c8bef..b819514be7299b73158b3ff7e9dbc07f4109ea76 100644 --- a/src/os/linux/src/tlinux.c +++ b/src/os/linux/src/tlinux.c @@ -291,6 +291,30 @@ int taosInitTimer(void (*callback)(int), int ms) { return 0; } +ssize_t tread(int fd, void *buf, size_t count) { + size_t leftbytes = count; + ssize_t readbytes; + char * tbuf = (char *)buf; + + while (leftbytes > 0) { + readbytes = read(fd, (void *)tbuf, leftbytes); + if (readbytes < 0) { + if (errno == EINTR) { + continue; + } else { + return -1; + } + } else if (readbytes == 0) { + return (ssize_t)(count - leftbytes); + } + + leftbytes -= readbytes; + tbuf += readbytes; + } + + return (ssize_t)count; +} + ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size) { size_t leftbytes = size; ssize_t sentbytes; @@ -308,6 +332,8 @@ ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size) { else { return -1; } + } else if (sentbytes == 0) { + return (ssize_t)(size - leftbytes); } leftbytes -= sentbytes; diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index 1be9bbb64be2807c3c44d3ca83d789b82f563bb7..ea1859729e55ab4ca35e0aa86461164afa7feb51 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -32,6 +32,7 @@ static void *tsDnodeVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); static void vnodeBuildVloadMsg(char *pNode, void * param); +static int vnodeWALCallback(void *arg); static int tsOpennedVnodes; static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; @@ -120,24 +121,29 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { vnodeObj.version = 0; SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj)); - sprintf(temp, "%s/tsdb", rootDir); - void *pTsdb = tsdbOpenRepo(temp); - if (pTsdb == NULL) { - dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno)); - taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); - return terrno; - } - pVnode->wqueue = dnodeAllocateWqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode); sprintf(temp, "%s/wal", rootDir); pVnode->wal = walOpen(temp, 3, tsCommitLog); - pVnode->tsdb = pTsdb; pVnode->sync = NULL; pVnode->events = NULL; pVnode->cq = NULL; + STsdbAppH appH = {0}; + appH.appH = (void *)pVnode; + appH.walCallBack = vnodeWALCallback; + + sprintf(temp, "%s/tsdb", rootDir); + void *pTsdb = tsdbOpenRepo(temp, &appH); + if (pTsdb == NULL) { + dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno)); + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); + return terrno; + } + + pVnode->tsdb = pTsdb; + walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); pVnode->status = VN_STATUS_READY; @@ -249,3 +255,8 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { vnodeRelease(pVnode); } + +static int vnodeWALCallback(void *arg) { + SVnodeObj *pVnode = arg; + return walRenew(pVnode->wal); +} \ No newline at end of file diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index 85575e1a8b0f77f2456e29acb207ba70d7afd1f5..b20711df1dc4b4a515dad83105bc9977588f5483 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -34,6 +34,15 @@ extern "C" { #define TSDB_INVALID_SUPER_TABLE_ID -1 +// --------- TSDB APPLICATION HANDLE DEFINITION +typedef struct { + // WAL handle + void *appH; + int (*walCallBack)(void *); + int (*eventCallBack)(void *); + int (*cqueryCallBack)(void *); +} STsdbAppH; + // --------- TSDB REPOSITORY CONFIGURATION DEFINITION typedef struct { int8_t precision; @@ -55,7 +64,7 @@ typedef void tsdb_repo_t; // use void to hide implementation details from outsi int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter); int32_t tsdbDropRepo(tsdb_repo_t *repo); -tsdb_repo_t * tsdbOpenRepo(char *tsdbDir); +tsdb_repo_t * tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH); int32_t tsdbCloseRepo(tsdb_repo_t *repo); int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg); int32_t tsdbTriggerCommit(tsdb_repo_t *repo); diff --git a/src/vnode/tsdb/inc/tsdbMain.h b/src/vnode/tsdb/inc/tsdbMain.h index a9021550451c387aa85643b17bec81f02fa03ae3..06f62ea6f728620f01b70a16794b566d6eae273b 100644 --- a/src/vnode/tsdb/inc/tsdbMain.h +++ b/src/vnode/tsdb/inc/tsdbMain.h @@ -322,6 +322,8 @@ typedef struct _tsdb_repo { // TSDB configuration STsdbCfg config; + STsdbAppH appH; + // The meter meta handle of this TSDB repository STsdbMeta *tsdbMeta; diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index cb59c004efed1533326a8517e657f177aef16c29..1a8e50d0eeab0ded741d19ae71b796d9f19343fd 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -177,7 +177,7 @@ int32_t tsdbDropRepo(tsdb_repo_t *repo) { * * @return a TSDB repository handle on success, NULL for failure and the error number is set */ -tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { +tsdb_repo_t *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) { char dataDir[128] = "\0"; if (access(tsdbDir, F_OK | W_OK | R_OK) < 0) { return NULL; @@ -191,6 +191,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { pRepo->rootDir = strdup(tsdbDir); tsdbRestoreCfg(pRepo, &(pRepo->config)); + if (pAppH) pRepo->appH = *pAppH; pRepo->tsdbMeta = tsdbInitMeta(tsdbDir, pRepo->config.maxTables); if (pRepo->tsdbMeta == NULL) { diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index d3a5b680aad5999d573442278f1a8928fad29ed9..9ee49d6a708a65b3344fc5fd56f458b0a9dd23cc 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -140,7 +140,7 @@ TEST(TsdbTest, createRepo) { // TEST(TsdbTest, DISABLED_openRepo) { TEST(TsdbTest, openRepo) { - tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0"); + tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL); ASSERT_NE(repo, nullptr); STsdbRepo *pRepo = (STsdbRepo *)repo;