未验证 提交 aeef63f0 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1537 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
...@@ -110,7 +110,8 @@ typedef struct SDataCol { ...@@ -110,7 +110,8 @@ typedef struct SDataCol {
int bytes; int bytes;
int len; int len;
int offset; int offset;
void * pData; void * pData; // Original data
void * pCData; // Compressed data
} SDataCol; } SDataCol;
typedef struct { typedef struct {
......
...@@ -317,14 +317,17 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { ...@@ -317,14 +317,17 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
pCols->numOfCols = schemaNCols(pSchema); pCols->numOfCols = schemaNCols(pSchema);
pCols->cols[0].pData = pCols->buf; pCols->cols[0].pData = pCols->buf;
int offset = TD_DATA_ROW_HEAD_SIZE;
for (int i = 0; i < schemaNCols(pSchema); i++) { for (int i = 0; i < schemaNCols(pSchema); i++) {
if (i > 0) { if (i > 0) {
pCols->cols[i].pData = (char *)(pCols->cols[i - 1].pData) + schemaColAt(pSchema, i - 1)->bytes * pCols->maxPoints; pCols->cols[i].pData = (char *)(pCols->cols[i - 1].pData) + schemaColAt(pSchema, i - 1)->bytes * pCols->maxPoints;
} }
pCols->cols[i].type = colType(schemaColAt(pSchema, i)); pCols->cols[i].type = colType(schemaColAt(pSchema, i));
pCols->cols[i].bytes = colBytes(schemaColAt(pSchema, i)); pCols->cols[i].bytes = colBytes(schemaColAt(pSchema, i));
pCols->cols[i].offset = colOffset(schemaColAt(pSchema, i)); pCols->cols[i].offset = offset;
pCols->cols[i].colId = colColId(schemaColAt(pSchema, i)); pCols->cols[i].colId = colColId(schemaColAt(pSchema, i));
offset += TYPE_BYTES[pCols->cols[i].type];
} }
} }
...@@ -343,7 +346,6 @@ void tdResetDataCols(SDataCols *pCols) { ...@@ -343,7 +346,6 @@ void tdResetDataCols(SDataCols *pCols) {
} }
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) {
TSKEY key = dataRowKey(row);
for (int i = 0; i < pCols->numOfCols; i++) { for (int i = 0; i < pCols->numOfCols; i++) {
SDataCol *pCol = pCols->cols + i; SDataCol *pCol = pCols->cols + i;
memcpy((void *)((char *)(pCol->pData) + pCol->len), dataRowAt(row, pCol->offset), pCol->bytes); memcpy((void *)((char *)(pCol->pData) + pCol->len), dataRowAt(row, pCol->offset), pCol->bytes);
...@@ -379,3 +381,7 @@ static int tdFLenFromSchema(STSchema *pSchema) { ...@@ -379,3 +381,7 @@ static int tdFLenFromSchema(STSchema *pSchema) {
return ret; return ret;
} }
int tdMergeDataCols(SDataCols *target, SDataCols *source) {
return 0;
}
\ No newline at end of file
...@@ -205,6 +205,8 @@ ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size); ...@@ -205,6 +205,8 @@ ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size);
ssize_t twrite(int fd, void *buf, size_t n); ssize_t twrite(int fd, void *buf, size_t n);
ssize_t tread(int fd, void *buf, size_t count);
bool taosCheckPthreadValid(pthread_t thread); bool taosCheckPthreadValid(pthread_t thread);
void taosResetPthread(pthread_t *thread); void taosResetPthread(pthread_t *thread);
......
...@@ -291,6 +291,30 @@ int taosInitTimer(void (*callback)(int), int ms) { ...@@ -291,6 +291,30 @@ int taosInitTimer(void (*callback)(int), int ms) {
return 0; return 0;
} }
ssize_t tread(int fd, void *buf, size_t count) {
size_t leftbytes = count;
ssize_t readbytes;
char * tbuf = (char *)buf;
while (leftbytes > 0) {
readbytes = read(fd, (void *)tbuf, leftbytes);
if (readbytes < 0) {
if (errno == EINTR) {
continue;
} else {
return -1;
}
} else if (readbytes == 0) {
return (ssize_t)(count - leftbytes);
}
leftbytes -= readbytes;
tbuf += readbytes;
}
return (ssize_t)count;
}
ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size) { ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size) {
size_t leftbytes = size; size_t leftbytes = size;
ssize_t sentbytes; ssize_t sentbytes;
...@@ -308,6 +332,8 @@ ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size) { ...@@ -308,6 +332,8 @@ ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size) {
else { else {
return -1; return -1;
} }
} else if (sentbytes == 0) {
return (ssize_t)(size - leftbytes);
} }
leftbytes -= sentbytes; leftbytes -= sentbytes;
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
static void *tsDnodeVnodesHash; static void *tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode); static void vnodeCleanUp(SVnodeObj *pVnode);
static void vnodeBuildVloadMsg(char *pNode, void * param); static void vnodeBuildVloadMsg(char *pNode, void * param);
static int vnodeWALCallback(void *arg);
static int tsOpennedVnodes; static int tsOpennedVnodes;
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
...@@ -120,24 +121,29 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -120,24 +121,29 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
vnodeObj.version = 0; vnodeObj.version = 0;
SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj)); SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj));
sprintf(temp, "%s/tsdb", rootDir);
void *pTsdb = tsdbOpenRepo(temp);
if (pTsdb == NULL) {
dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno));
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
return terrno;
}
pVnode->wqueue = dnodeAllocateWqueue(pVnode); pVnode->wqueue = dnodeAllocateWqueue(pVnode);
pVnode->rqueue = dnodeAllocateRqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode);
sprintf(temp, "%s/wal", rootDir); sprintf(temp, "%s/wal", rootDir);
pVnode->wal = walOpen(temp, 3, tsCommitLog); pVnode->wal = walOpen(temp, 3, tsCommitLog);
pVnode->tsdb = pTsdb;
pVnode->sync = NULL; pVnode->sync = NULL;
pVnode->events = NULL; pVnode->events = NULL;
pVnode->cq = NULL; pVnode->cq = NULL;
STsdbAppH appH = {0};
appH.appH = (void *)pVnode;
appH.walCallBack = vnodeWALCallback;
sprintf(temp, "%s/tsdb", rootDir);
void *pTsdb = tsdbOpenRepo(temp, &appH);
if (pTsdb == NULL) {
dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno));
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
return terrno;
}
pVnode->tsdb = pTsdb;
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
pVnode->status = VN_STATUS_READY; pVnode->status = VN_STATUS_READY;
...@@ -249,3 +255,8 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { ...@@ -249,3 +255,8 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
static int vnodeWALCallback(void *arg) {
SVnodeObj *pVnode = arg;
return walRenew(pVnode->wal);
}
\ No newline at end of file
...@@ -34,6 +34,15 @@ extern "C" { ...@@ -34,6 +34,15 @@ extern "C" {
#define TSDB_INVALID_SUPER_TABLE_ID -1 #define TSDB_INVALID_SUPER_TABLE_ID -1
// --------- TSDB APPLICATION HANDLE DEFINITION
typedef struct {
// WAL handle
void *appH;
int (*walCallBack)(void *);
int (*eventCallBack)(void *);
int (*cqueryCallBack)(void *);
} STsdbAppH;
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION // --------- TSDB REPOSITORY CONFIGURATION DEFINITION
typedef struct { typedef struct {
int8_t precision; int8_t precision;
...@@ -55,7 +64,7 @@ typedef void tsdb_repo_t; // use void to hide implementation details from outsi ...@@ -55,7 +64,7 @@ typedef void tsdb_repo_t; // use void to hide implementation details from outsi
int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter); int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter);
int32_t tsdbDropRepo(tsdb_repo_t *repo); int32_t tsdbDropRepo(tsdb_repo_t *repo);
tsdb_repo_t * tsdbOpenRepo(char *tsdbDir); tsdb_repo_t * tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH);
int32_t tsdbCloseRepo(tsdb_repo_t *repo); int32_t tsdbCloseRepo(tsdb_repo_t *repo);
int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg); int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg);
int32_t tsdbTriggerCommit(tsdb_repo_t *repo); int32_t tsdbTriggerCommit(tsdb_repo_t *repo);
......
...@@ -322,6 +322,8 @@ typedef struct _tsdb_repo { ...@@ -322,6 +322,8 @@ typedef struct _tsdb_repo {
// TSDB configuration // TSDB configuration
STsdbCfg config; STsdbCfg config;
STsdbAppH appH;
// The meter meta handle of this TSDB repository // The meter meta handle of this TSDB repository
STsdbMeta *tsdbMeta; STsdbMeta *tsdbMeta;
......
...@@ -177,7 +177,7 @@ int32_t tsdbDropRepo(tsdb_repo_t *repo) { ...@@ -177,7 +177,7 @@ int32_t tsdbDropRepo(tsdb_repo_t *repo) {
* *
* @return a TSDB repository handle on success, NULL for failure and the error number is set * @return a TSDB repository handle on success, NULL for failure and the error number is set
*/ */
tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { tsdb_repo_t *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
char dataDir[128] = "\0"; char dataDir[128] = "\0";
if (access(tsdbDir, F_OK | W_OK | R_OK) < 0) { if (access(tsdbDir, F_OK | W_OK | R_OK) < 0) {
return NULL; return NULL;
...@@ -191,6 +191,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { ...@@ -191,6 +191,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) {
pRepo->rootDir = strdup(tsdbDir); pRepo->rootDir = strdup(tsdbDir);
tsdbRestoreCfg(pRepo, &(pRepo->config)); tsdbRestoreCfg(pRepo, &(pRepo->config));
if (pAppH) pRepo->appH = *pAppH;
pRepo->tsdbMeta = tsdbInitMeta(tsdbDir, pRepo->config.maxTables); pRepo->tsdbMeta = tsdbInitMeta(tsdbDir, pRepo->config.maxTables);
if (pRepo->tsdbMeta == NULL) { if (pRepo->tsdbMeta == NULL) {
......
...@@ -140,7 +140,7 @@ TEST(TsdbTest, createRepo) { ...@@ -140,7 +140,7 @@ TEST(TsdbTest, createRepo) {
// TEST(TsdbTest, DISABLED_openRepo) { // TEST(TsdbTest, DISABLED_openRepo) {
TEST(TsdbTest, openRepo) { TEST(TsdbTest, openRepo) {
tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0"); tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
ASSERT_NE(repo, nullptr); ASSERT_NE(repo, nullptr);
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册