未验证 提交 1ac6bbf5 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2187 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
......@@ -74,14 +74,16 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
STable *tsdbDecodeTable(void *cont, int contLen) {
STable *pTable = (STable *)calloc(1, sizeof(STable));
if (pTable == NULL) return NULL;
pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS);
if (pTable->schema == NULL) {
free(pTable);
return NULL;
}
void *ptr = cont;
T_READ_MEMBER(ptr, int8_t, pTable->type);
if (pTable->type != TSDB_CHILD_TABLE) {
pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS);
if (pTable->schema == NULL) {
free(pTable);
return NULL;
}
}
int len = *(int *)ptr;
ptr = (char *)ptr + sizeof(int);
pTable->name = calloc(1, len + VARSTR_HEADER_SIZE + 1);
......@@ -620,7 +622,10 @@ static int tsdbFreeTable(STable *pTable) {
if (pTable->type == TSDB_CHILD_TABLE) {
kvRowFree(pTable->tagVal);
} else {
for (int i = 0; i < pTable->numOfSchemas; i++) tdFreeSchema(pTable->schema[i]);
if (pTable->schema) {
for (int i = 0; i < pTable->numOfSchemas; i++) tdFreeSchema(pTable->schema[i]);
free(pTable->schema);
}
}
if (pTable->type == TSDB_STREAM_TABLE) {
......
......@@ -764,8 +764,8 @@ static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock,
bool isLast, bool isSuperBlock) {
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows &&
rowsToWrite <= pHelper->config.maxRowsPerFileBlock);
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && rowsToWrite <= pHelper->config.maxRowsPerFileBlock);
ASSERT(isLast ? rowsToWrite < pHelper->config.minRowsPerFileBlock : true);
SCompData *pCompData = (SCompData *)(pHelper->pBuffer);
int64_t offset = 0;
......@@ -905,14 +905,15 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows);
if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
(blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd) > 0) {
(blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pHelper->config.minRowsPerFileBlock) &&
(pHelper->files.nLastF.fd) < 0) {
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0)
goto _err;
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
} else {
// Load
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
ASSERT(pHelper->pDataCols[0]->numOfRows == blockAtIdx(pHelper, blkIdx)->numOfRows);
ASSERT(pHelper->pDataCols[0]->numOfRows <= blockAtIdx(pHelper, blkIdx)->numOfRows);
// Merge
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
// Write
......@@ -936,21 +937,21 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
// Key must overlap with the block
ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast);
TSKEY keyLimit =
(blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1;
TSKEY keyLimit = (blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : blockAtIdx(pHelper, blkIdx + 1)->keyFirst - 1;
// rows1: number of rows must merge in this block
int rows1 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast);
// rows2: max nuber of rows the block can have more
// rows2: max number of rows the block can have more
int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfRows;
// rows3: number of rows between this block and the next block
int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit);
ASSERT(rows3 >= rows1);
if ((rows2 >= rows1) &&
(( blockAtIdx(pHelper, blkIdx)->last) ||
((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd < 0)))) {
if ((rows2 >= rows1) && (blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
((!blockAtIdx(pHelper, blkIdx)->last) ||
((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock) &&
(pHelper->files.nLastF.fd < 0)))) {
rowsWritten = rows1;
bool isLast = false;
SFile *pFile = NULL;
......@@ -964,7 +965,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err;
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
} else { // Load-Merge-Write
} else { // Load-Merge-Write
// Load
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false;
......@@ -1106,16 +1107,16 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
if (pTCompBlock->numOfSubBlocks > 1) {
ptr = (void *)((char *)(pHelper->pCompInfo) + pTCompBlock->offset + pTCompBlock->len);
ptr = POINTER_SHIFT(pHelper->pCompInfo, pTCompBlock->offset);
break;
}
}
if (ptr == NULL) ptr = (void *)((char *)(pHelper->pCompInfo) + pIdx->len - sizeof(TSCKSUM));
if (ptr == NULL) ptr = POINTER_SHIFT(pHelper->pCompInfo, pIdx->len-sizeof(TSCKSUM));
size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo));
if (tsize > 0) {
memmove((void *)((char *)ptr + sizeof(SCompBlock) * 2), ptr, tsize);
memmove(POINTER_SHIFT(ptr, sizeof(SCompBlock) * 2), ptr, tsize);
for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SCompBlock) * 2);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_KVSTORE_H_
#define _TD_KVSTORE_H_
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
typedef int (*iterFunc)(void *, void *cont, int contLen);
typedef void (*afterFunc)(void *);
typedef struct {
int64_t size;
int64_t tombSize;
int64_t nRecords;
int64_t nDels;
} SStoreInfo;
typedef struct {
char * fname;
int fd;
char * fsnap;
int sfd;
char * fnew;
int nfd;
SHashObj * map;
iterFunc iFunc;
afterFunc aFunc;
void * appH;
SStoreInfo info;
} SKVStore;
int tdCreateKVStore(char *fname);
int tdDestroyKVStore();
SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH);
void tdCloseKVStore(SKVStore *pStore);
int tdKVStoreStartCommit(SKVStore *pStore);
int tdUpdateRecordInKVStore(SKVStore *pStore, uint64_t uid, void *cont, int contLen);
int tdKVStoreEndCommit(SKVStore *pStore);
#ifdef __cplusplus
}
#endif
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <errno.h>
#include <fcntl.h>
#include <libgen.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "hash.h"
#include "os.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "tcoding.h"
#include "tkvstore.h"
#include "tulog.h"
#define TD_KVSTORE_HEADER_SIZE 512
#define TD_KVSTORE_MAJOR_VERSION 1
#define TD_KVSTORE_MAINOR_VERSION 0
#define TD_KVSTORE_SNAP_SUFFIX ".snap"
#define TD_KVSTORE_NEW_SUFFIX ".new"
static int tdInitKVStoreHeader(int fd, char *fname);
static void * tdEncodeStoreInfo(void *buf, SStoreInfo *pInfo);
// static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo);
static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH);
static char * tdGetKVStoreSnapshotFname(char *fdata);
static char * tdGetKVStoreNewFname(char *fdata);
static void tdFreeKVStore(SKVStore *pStore);
static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo);
int tdCreateKVStore(char *fname) {
char *tname = strdup(fname);
if (tname == NULL) return TSDB_CODE_COM_OUT_OF_MEMORY;
int fd = open(fname, O_RDWR | O_CREAT, 0755);
if (fd < 0) {
uError("failed to open file %s since %s", fname, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
int code = tdInitKVStoreHeader(fd, fname);
if (code != TSDB_CODE_SUCCESS) return code;
if (fsync(fd) < 0) {
uError("failed to fsync file %s since %s", fname, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
if (close(fd) < 0) {
uError("failed to close file %s since %s", fname, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
return TSDB_CODE_SUCCESS;
}
SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) {
SKVStore *pStore = tdNewKVStore(fname, iFunc, aFunc, appH);
if (pStore == NULL) return NULL;
pStore->fd = open(pStore->fname, O_RDWR);
if (pStore->fd < 0) {
uError("failed to open file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (access(pStore->fsnap, F_OK) == 0) {
uTrace("file %s exists, try to recover the KV store", pStore->fsnap);
pStore->sfd = open(pStore->fsnap, O_RDONLY);
if (pStore->sfd < 0) {
uError("failed to open file %s since %s", pStore->fsnap, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// TODO: rewind the file
close(pStore->sfd);
pStore->sfd = -1;
remove(pStore->fsnap);
}
// TODO: Recover from the file
return pStore;
_err:
if (pStore->fd > 0) {
close(pStore->fd);
pStore->fd = -1;
}
if (pStore->sfd > 0) {
close(pStore->sfd);
pStore->sfd = -1;
}
tdFreeKVStore(pStore);
return NULL;
}
int tdKVStoreStartCommit(SKVStore *pStore) {
pStore->fd = open(pStore->fname, O_RDWR);
if (pStore->fd < 0) {
uError("failed to open file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pStore->sfd = open(pStore->fsnap, O_WRONLY | O_CREAT, 0755);
if (pStore->sfd < 0) {
uError("failed to open file %s since %s", pStore->fsnap, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (tsendfile(pStore->sfd, pStore->fd, NULL, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
uError("failed to send file %d bytes since %s", TD_KVSTORE_HEADER_SIZE, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (fsync(pStore->sfd) < 0) {
uError("failed to fsync file %s since %s", pStore->fsnap, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (close(pStore->sfd) < 0) {
uError("failed to close file %s since %s", pStore->fsnap, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pStore->sfd = -1;
return 0;
_err:
if (pStore->sfd > 0) {
close(pStore->sfd);
pStore->sfd = -1;
remove(pStore->fsnap);
}
if (pStore->fd > 0) {
close(pStore->fd);
pStore->fd = -1;
}
return -1;
}
int tdKVStoreEndCommit(SKVStore *pStore) {
ASSERT(pStore->fd > 0);
terrno = tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info));
if (terrno != TSDB_CODE_SUCCESS) return -1;
if (fsync(pStore->fd) < 0) {
uError("failed to fsync file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (close(pStore->fd) < 0) {
uError("failed to close file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
remove(pStore->fsnap);
return 0;
}
static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
char buf[TD_KVSTORE_HEADER_SIZE] = "\0";
if (lseek(fd, 0, SEEK_SET) < 0) {
uError("failed to lseek file %s since %s", fname, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
tdEncodeStoreInfo(buf, pInfo);
taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE);
if (twrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
uError("failed to write file %s %d bytes since %s", fname, TD_KVSTORE_HEADER_SIZE, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
return TSDB_CODE_SUCCESS;
}
static int tdInitKVStoreHeader(int fd, char *fname) {
SStoreInfo info = {TD_KVSTORE_HEADER_SIZE, 0, 0, 0};
return tdUpdateKVStoreHeader(fd, fname, &info);
}
static void *tdEncodeStoreInfo(void *buf, SStoreInfo *pInfo) {
buf = taosEncodeVariantI64(buf, pInfo->size);
buf = taosEncodeVariantI64(buf, pInfo->tombSize);
buf = taosEncodeVariantI64(buf, pInfo->nRecords);
buf = taosEncodeVariantI64(buf, pInfo->nDels);
return buf;
}
// static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) {
// buf = taosDecodeVariantI64(buf, &(pInfo->size));
// buf = taosDecodeVariantI64(buf, &(pInfo->tombSize));
// buf = taosDecodeVariantI64(buf, &(pInfo->nRecords));
// buf = taosDecodeVariantI64(buf, &(pInfo->nDels));
// return buf;
// }
static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) {
SKVStore *pStore = (SKVStore *)malloc(sizeof(SKVStore));
if (pStore == NULL) goto _err;
pStore->fname = strdup(fname);
if (pStore->map == NULL) goto _err;
pStore->fsnap = tdGetKVStoreSnapshotFname(fname);
if (pStore->fsnap == NULL) goto _err;
pStore->fnew = tdGetKVStoreNewFname(fname);
if (pStore->fnew == NULL) goto _err;
pStore->fd = -1;
pStore->sfd = -1;
pStore->nfd = -1;
pStore->iFunc = iFunc;
pStore->aFunc = aFunc;
pStore->appH = appH;
pStore->map = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
if (pStore->map == NULL) {
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
goto _err;
}
return pStore;
_err:
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
tdFreeKVStore(pStore);
return NULL;
}
static void tdFreeKVStore(SKVStore *pStore) {
if (pStore) {
tfree(pStore->fname);
tfree(pStore->fsnap);
tfree(pStore->fnew);
taosHashCleanup(pStore->map);
free(pStore);
}
}
static char *tdGetKVStoreSnapshotFname(char *fdata) {
size_t size = strlen(fdata) + strlen(TD_KVSTORE_SNAP_SUFFIX) + 1;
char * fname = malloc(size);
if (fname == NULL) {
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
return NULL;
}
sprintf(fname, "%s%s", fdata, TD_KVSTORE_SNAP_SUFFIX);
return fname;
}
static char *tdGetKVStoreNewFname(char *fdata) {
size_t size = strlen(fdata) + strlen(TD_KVSTORE_NEW_SUFFIX) + 1;
char * fname = malloc(size);
if (fname == NULL) {
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
return NULL;
}
sprintf(fname, "%s%s", fdata, TD_KVSTORE_NEW_SUFFIX);
return fname;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册