未验证 提交 0bb787d0 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #1955 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
...@@ -77,8 +77,8 @@ int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg); ...@@ -77,8 +77,8 @@ int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg);
// --------- TSDB TABLE DEFINITION // --------- TSDB TABLE DEFINITION
typedef struct { typedef struct {
int64_t uid; // the unique table ID uint64_t uid; // the unique table ID
int32_t tid; // the table ID in the repository. int32_t tid; // the table ID in the repository.
} STableId; } STableId;
// --------- TSDB TABLE configuration // --------- TSDB TABLE configuration
...@@ -88,14 +88,14 @@ typedef struct { ...@@ -88,14 +88,14 @@ typedef struct {
STableId tableId; STableId tableId;
int32_t sversion; int32_t sversion;
char * sname; // super table name char * sname; // super table name
int64_t superUid; uint64_t superUid;
STSchema * schema; STSchema * schema;
STSchema * tagSchema; STSchema * tagSchema;
SDataRow tagValues; SDataRow tagValues;
} STableCfg; } STableCfg;
int tsdbInitTableCfg(STableCfg *config, ETableType type, int64_t uid, int32_t tid); int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid);
int tsdbTableSetSuperUid(STableCfg *config, int64_t uid); int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid);
int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup); int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup);
int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup); int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup);
int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup); int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup);
...@@ -109,7 +109,7 @@ char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id, int16_t* bytes); ...@@ -109,7 +109,7 @@ char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id, int16_t* bytes);
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg); int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg);
int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId); int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId);
int tsdbAlterTable(TsdbRepoT *repo, STableCfg *pCfg); int tsdbAlterTable(TsdbRepoT *repo, STableCfg *pCfg);
TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, int64_t uid); TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid);
uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, int32_t *size); uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, int32_t *size);
...@@ -278,9 +278,9 @@ SArray *tsdbGetTableList(TsdbQueryHandleT *pQueryHandle); ...@@ -278,9 +278,9 @@ SArray *tsdbGetTableList(TsdbQueryHandleT *pQueryHandle);
* @param stableid. super table sid * @param stableid. super table sid
* @param pTagCond. tag query condition * @param pTagCond. tag query condition
*/ */
int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagCond, size_t len, int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, uint64_t uid, const char *pTagCond, size_t len,
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo *pGroupList, SColIndex *pColIndex, int32_t numOfCols); int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupList,
SColIndex *pColIndex, int32_t numOfCols);
/** /**
* create the table group result including only one table, used to handle the normal table query * create the table group result including only one table, used to handle the normal table query
...@@ -290,7 +290,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagC ...@@ -290,7 +290,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagC
* @param pGroupInfo the generated result * @param pGroupInfo the generated result
* @return * @return
*/ */
int32_t tsdbGetOneTableGroup(TsdbRepoT *tsdb, int64_t uid, STableGroupInfo *pGroupInfo); int32_t tsdbGetOneTableGroup(TsdbRepoT *tsdb, uint64_t uid, STableGroupInfo *pGroupInfo);
/** /**
* clean up the query handle * clean up the query handle
......
...@@ -63,9 +63,9 @@ typedef struct { ...@@ -63,9 +63,9 @@ typedef struct {
} SMetaFile; } SMetaFile;
SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, afterFunc aFunc, void *appH); SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, afterFunc aFunc, void *appH);
int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen); int32_t tsdbInsertMetaRecord(SMetaFile *mfh, uint64_t uid, void *cont, int32_t contLen);
int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, int64_t uid); int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, uint64_t uid);
int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen); int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, uint64_t uid, void *cont, int32_t contLen);
void tsdbCloseMetaFile(SMetaFile *mfh); void tsdbCloseMetaFile(SMetaFile *mfh);
// ------------------------------ TSDB META INTERFACES ------------------------------ // ------------------------------ TSDB META INTERFACES ------------------------------
...@@ -82,7 +82,7 @@ typedef struct { ...@@ -82,7 +82,7 @@ typedef struct {
typedef struct STable { typedef struct STable {
int8_t type; int8_t type;
STableId tableId; STableId tableId;
int64_t superUid; // Super table UID uint64_t superUid; // Super table UID
int32_t sversion; int32_t sversion;
STSchema * schema; STSchema * schema;
STSchema * tagSchema; STSchema * tagSchema;
...@@ -153,7 +153,7 @@ STsdbMeta *tsdbGetMeta(TsdbRepoT *pRepo); ...@@ -153,7 +153,7 @@ STsdbMeta *tsdbGetMeta(TsdbRepoT *pRepo);
STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId); STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId);
// int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable); // int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable);
STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid); STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid);
char *getTSTupleKey(const void * data); char *getTSTupleKey(const void * data);
typedef struct { typedef struct {
...@@ -210,12 +210,17 @@ typedef enum { ...@@ -210,12 +210,17 @@ typedef enum {
extern const char *tsdbFileSuffix[]; extern const char *tsdbFileSuffix[];
typedef struct { typedef struct {
int64_t size; // total size of the file uint32_t offset;
int64_t tombSize; // unused file size uint32_t len;
int32_t totalBlocks; uint64_t size; // total size of the file
int32_t totalSubBlocks; uint64_t tombSize; // unused file size
uint32_t totalBlocks;
uint32_t totalSubBlocks;
} STsdbFileInfo; } STsdbFileInfo;
void *tsdbEncodeSFileInfo(void *buf, const STsdbFileInfo *pInfo);
void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo);
typedef struct { typedef struct {
int fd; int fd;
char fname[128]; char fname[128];
...@@ -242,8 +247,7 @@ typedef struct { ...@@ -242,8 +247,7 @@ typedef struct {
STsdbFileH *tsdbInitFileH(char *dataDir, STsdbCfg *pCfg); STsdbFileH *tsdbInitFileH(char *dataDir, STsdbCfg *pCfg);
void tsdbCloseFileH(STsdbFileH *pFileH); void tsdbCloseFileH(STsdbFileH *pFileH);
int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, int maxTables, SFile *pFile, int writeHeader, int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, SFile *pFile);
int toClose);
SFileGroup *tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables); SFileGroup *tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables);
int tsdbOpenFile(SFile *pFile, int oflag); int tsdbOpenFile(SFile *pFile, int oflag);
int tsdbCloseFile(SFile *pFile); int tsdbCloseFile(SFile *pFile);
...@@ -266,15 +270,18 @@ void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid); ...@@ -266,15 +270,18 @@ void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid);
SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter); SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter);
typedef struct { typedef struct {
int32_t len; uint32_t len;
int32_t offset; uint32_t offset;
int32_t padding; // For padding purpose uint32_t padding; // For padding purpose
int32_t hasLast : 1; uint32_t hasLast : 2;
int32_t numOfBlocks : 31; uint32_t numOfBlocks : 30;
int64_t uid; uint64_t uid;
TSKEY maxKey; TSKEY maxKey;
} SCompIdx; /* sizeof(SCompIdx) = 28 */ } SCompIdx; /* sizeof(SCompIdx) = 28 */
void *tsdbEncodeSCompIdx(void *buf, SCompIdx *pIdx);
void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx);
/** /**
* if numOfSubBlocks == 0, then the SCompBlock is a sub-block * if numOfSubBlocks == 0, then the SCompBlock is a sub-block
* if numOfSubBlocks >= 1, then the SCompBlock is a super-block * if numOfSubBlocks >= 1, then the SCompBlock is a super-block
...@@ -304,7 +311,7 @@ typedef struct { ...@@ -304,7 +311,7 @@ typedef struct {
typedef struct { typedef struct {
int32_t delimiter; // For recovery usage int32_t delimiter; // For recovery usage
int32_t checksum; // TODO: decide if checksum logic in this file or make it one API int32_t checksum; // TODO: decide if checksum logic in this file or make it one API
int64_t uid; uint64_t uid;
SCompBlock blocks[]; SCompBlock blocks[];
} SCompInfo; } SCompInfo;
...@@ -338,7 +345,7 @@ typedef struct { ...@@ -338,7 +345,7 @@ typedef struct {
typedef struct { typedef struct {
int32_t delimiter; // For recovery usage int32_t delimiter; // For recovery usage
int32_t numOfCols; // For recovery usage int32_t numOfCols; // For recovery usage
int64_t uid; // For recovery usage uint64_t uid; // For recovery usage
SCompCol cols[]; SCompCol cols[];
} SCompData; } SCompData;
...@@ -434,9 +441,9 @@ typedef struct { ...@@ -434,9 +441,9 @@ typedef struct {
} SHelperFile; } SHelperFile;
typedef struct { typedef struct {
int64_t uid; uint64_t uid;
int32_t tid; int32_t tid;
int32_t sversion; int32_t sversion;
} SHelperTable; } SHelperTable;
typedef struct { typedef struct {
...@@ -458,7 +465,7 @@ typedef struct { ...@@ -458,7 +465,7 @@ typedef struct {
SCompData *pCompData; SCompData *pCompData;
SDataCols *pDataCols[2]; SDataCols *pDataCols[2];
void *blockBuffer; // Buffer to hold the whole data block void *pBuffer; // Buffer to hold the whole data block
void *compBuffer; // Buffer for temperary compress/decompress purpose void *compBuffer; // Buffer for temperary compress/decompress purpose
} SRWHelper; } SRWHelper;
...@@ -505,6 +512,7 @@ void tsdbFitRetention(STsdbRepo *pRepo); ...@@ -505,6 +512,7 @@ void tsdbFitRetention(STsdbRepo *pRepo);
int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
void tsdbAdjustCacheBlocks(STsdbCache *pCache); void tsdbAdjustCacheBlocks(STsdbCache *pCache);
int32_t tsdbGetMetaFileName(char *rootDir, char *fname); int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
int tsdbUpdateFileHeader(SFile *pFile, uint32_t version);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -37,8 +37,6 @@ const char *tsdbFileSuffix[] = { ...@@ -37,8 +37,6 @@ const char *tsdbFileSuffix[] = {
static int compFGroupKey(const void *key, const void *fgroup); static int compFGroupKey(const void *key, const void *fgroup);
static int compFGroup(const void *arg1, const void *arg2); static int compFGroup(const void *arg1, const void *arg2);
static int tsdbWriteFileHead(SFile *pFile);
static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables);
static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid); static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid);
STsdbFileH *tsdbInitFileH(char *dataDir, STsdbCfg *pCfg) { STsdbFileH *tsdbInitFileH(char *dataDir, STsdbCfg *pCfg) {
...@@ -84,11 +82,23 @@ void tsdbCloseFileH(STsdbFileH *pFileH) { ...@@ -84,11 +82,23 @@ void tsdbCloseFileH(STsdbFileH *pFileH) {
} }
static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile) { static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile) {
uint32_t version;
char buf[512] = "\0";
tsdbGetFileName(dataDir, fid, suffix, pFile->fname); tsdbGetFileName(dataDir, fid, suffix, pFile->fname);
if (access(pFile->fname, F_OK|R_OK|W_OK) < 0) return -1; if (access(pFile->fname, F_OK|R_OK|W_OK) < 0) return -1;
pFile->fd = -1; pFile->fd = -1;
// TODO: recover the file info if (tsdbOpenFile(pFile, O_RDONLY) < 0) return -1;
// pFile->info = {0};
if (tread(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) return -1;
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) return -1;
void *pBuf = buf;
pBuf = taosDecodeFixed32(pBuf, &version);
pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info));
tsdbCloseFile(pFile);
return 0; return 0;
} }
...@@ -121,8 +131,7 @@ SFileGroup *tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int max ...@@ -121,8 +131,7 @@ SFileGroup *tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int max
if (pGroup == NULL) { // if not exists, create one if (pGroup == NULL) { // if not exists, create one
pFGroup->fileId = fid; pFGroup->fileId = fid;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
if (tsdbCreateFile(dataDir, fid, tsdbFileSuffix[type], maxTables, &(pFGroup->files[type]), if (tsdbCreateFile(dataDir, fid, tsdbFileSuffix[type], &(pFGroup->files[type])) < 0)
type == TSDB_FILE_TYPE_HEAD ? 1 : 0, 1) < 0)
goto _err; goto _err;
} }
...@@ -286,41 +295,6 @@ static int compFGroup(const void *arg1, const void *arg2) { ...@@ -286,41 +295,6 @@ static int compFGroup(const void *arg1, const void *arg2) {
return ((SFileGroup *)arg1)->fileId - ((SFileGroup *)arg2)->fileId; return ((SFileGroup *)arg1)->fileId - ((SFileGroup *)arg2)->fileId;
} }
static int tsdbWriteFileHead(SFile *pFile) {
char head[TSDB_FILE_HEAD_SIZE] = "\0";
pFile->info.size += TSDB_FILE_HEAD_SIZE;
// TODO: write version and File statistic to the head
lseek(pFile->fd, 0, SEEK_SET);
if (write(pFile->fd, head, TSDB_FILE_HEAD_SIZE) < 0) return -1;
return 0;
}
static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) {
int size = sizeof(SCompIdx) * maxTables + sizeof(TSCKSUM);
void *buf = calloc(1, size);
if (buf == NULL) return -1;
if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {
free(buf);
return -1;
}
taosCalcChecksumAppend(0, (uint8_t *)buf, size);
if (write(pFile->fd, buf, size) < 0) {
free(buf);
return -1;
}
pFile->info.size += size;
free(buf);
return 0;
}
int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname) { int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname) {
if (dataDir == NULL || fname == NULL) return -1; if (dataDir == NULL || fname == NULL) return -1;
...@@ -354,7 +328,7 @@ SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) { ...@@ -354,7 +328,7 @@ SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) {
return pGroup; return pGroup;
} }
int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose) { int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, SFile *pFile) {
memset((void *)pFile, 0, sizeof(SFile)); memset((void *)pFile, 0, sizeof(SFile));
pFile->fd = -1; pFile->fd = -1;
...@@ -370,19 +344,14 @@ int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, int maxTables, ...@@ -370,19 +344,14 @@ int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, int maxTables,
return -1; return -1;
} }
if (writeHeader) { pFile->info.size = TSDB_FILE_HEAD_SIZE;
if (tsdbWriteHeadFileIdx(pFile, maxTables) < 0) {
tsdbCloseFile(pFile);
return -1;
}
}
if (tsdbWriteFileHead(pFile) < 0) { if (tsdbUpdateFileHeader(pFile, 0) < 0) {
tsdbCloseFile(pFile); tsdbCloseFile(pFile);
return -1; return -1;
} }
if (toClose) tsdbCloseFile(pFile); tsdbCloseFile(pFile);
return 0; return 0;
} }
......
...@@ -392,7 +392,7 @@ int tsdbAlterTable(TsdbRepoT *pRepo, STableCfg *pCfg) { ...@@ -392,7 +392,7 @@ int tsdbAlterTable(TsdbRepoT *pRepo, STableCfg *pCfg) {
return 0; return 0;
} }
TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, int64_t uid) { TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, uid); STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, uid);
...@@ -430,7 +430,7 @@ int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * p ...@@ -430,7 +430,7 @@ int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * p
/** /**
* Initialize a table configuration * Initialize a table configuration
*/ */
int tsdbInitTableCfg(STableCfg *config, ETableType type, int64_t uid, int32_t tid) { int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid) {
if (config == NULL) return -1; if (config == NULL) return -1;
if (type != TSDB_NORMAL_TABLE && type != TSDB_CHILD_TABLE) return -1; if (type != TSDB_NORMAL_TABLE && type != TSDB_CHILD_TABLE) return -1;
...@@ -447,7 +447,7 @@ int tsdbInitTableCfg(STableCfg *config, ETableType type, int64_t uid, int32_t ti ...@@ -447,7 +447,7 @@ int tsdbInitTableCfg(STableCfg *config, ETableType type, int64_t uid, int32_t ti
/** /**
* Set the super table UID of the created table * Set the super table UID of the created table
*/ */
int tsdbTableSetSuperUid(STableCfg *config, int64_t uid) { int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid) {
if (config->type != TSDB_CHILD_TABLE) return -1; if (config->type != TSDB_CHILD_TABLE) return -1;
if (uid == TSDB_INVALID_SUPER_TABLE_ID) return -1; if (uid == TSDB_INVALID_SUPER_TABLE_ID) return -1;
...@@ -1148,8 +1148,16 @@ static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) { ...@@ -1148,8 +1148,16 @@ static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) {
} }
static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) { static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) {
// TODO
int oldMaxTables = pRepo->config.maxTables; int oldMaxTables = pRepo->config.maxTables;
if (oldMaxTables < pRepo->config.maxTables) {
// TODO
}
STsdbMeta *pMeta = pRepo->tsdbMeta;
pMeta->maxTables = maxTables;
pMeta->tables = realloc(pMeta->tables, maxTables * sizeof(STable *));
tsdbTrace("vgId:%d, tsdb maxTables is changed from %d to %d!", pRepo->config.tsdbId, oldMaxTables, maxTables); tsdbTrace("vgId:%d, tsdb maxTables is changed from %d to %d!", pRepo->config.tsdbId, oldMaxTables, maxTables);
} }
......
...@@ -87,9 +87,9 @@ STable *tsdbDecodeTable(void *cont, int contLen) { ...@@ -87,9 +87,9 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
memcpy(pTable->name->data, ptr, len); memcpy(pTable->name->data, ptr, len);
ptr = (char *)ptr + len; ptr = (char *)ptr + len;
T_READ_MEMBER(ptr, int64_t, pTable->tableId.uid); T_READ_MEMBER(ptr, uint64_t, pTable->tableId.uid);
T_READ_MEMBER(ptr, int32_t, pTable->tableId.tid); T_READ_MEMBER(ptr, int32_t, pTable->tableId.tid);
T_READ_MEMBER(ptr, int64_t, pTable->superUid); T_READ_MEMBER(ptr, uint64_t, pTable->superUid);
T_READ_MEMBER(ptr, int32_t, pTable->sversion); T_READ_MEMBER(ptr, int32_t, pTable->sversion);
if (pTable->type == TSDB_SUPER_TABLE) { if (pTable->type == TSDB_SUPER_TABLE) {
...@@ -154,7 +154,6 @@ STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) { ...@@ -154,7 +154,6 @@ STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) {
STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta)); STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta));
if (pMeta == NULL) return NULL; if (pMeta == NULL) return NULL;
pMeta->maxTables = maxTables;
pMeta->nTables = 0; pMeta->nTables = 0;
pMeta->superList = NULL; pMeta->superList = NULL;
pMeta->tables = (STable **)calloc(maxTables, sizeof(STable *)); pMeta->tables = (STable **)calloc(maxTables, sizeof(STable *));
...@@ -456,7 +455,7 @@ static int32_t tsdbCheckTableCfg(STableCfg *pCfg) { ...@@ -456,7 +455,7 @@ static int32_t tsdbCheckTableCfg(STableCfg *pCfg) {
return 0; return 0;
} }
STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid) { STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) {
void *ptr = taosHashGet(pMeta->map, (char *)(&uid), sizeof(uid)); void *ptr = taosHashGet(pMeta->map, (char *)(&uid), sizeof(uid));
if (ptr == NULL) return NULL; if (ptr == NULL) return NULL;
...@@ -509,10 +508,7 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) { ...@@ -509,10 +508,7 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) {
ASSERT(tTable != NULL && tTable->type == TSDB_CHILD_TABLE); ASSERT(tTable != NULL && tTable->type == TSDB_CHILD_TABLE);
pMeta->tables[tTable->tableId.tid] = NULL; tsdbRemoveTableFromMeta(pMeta, tTable);
taosHashRemove(pMeta->map, (char *)(&(pTable->tableId.uid)), sizeof(pTable->tableId.uid));
pMeta->nTables--;
tsdbFreeTable(tTable);
} }
tSkipListDestroyIter(pIter); tSkipListDestroyIter(pIter);
...@@ -535,8 +531,8 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) { ...@@ -535,8 +531,8 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) {
pMeta->nTables--; pMeta->nTables--;
} }
tsdbFreeTable(pTable);
taosHashRemove(pMeta->map, (char *)(&(pTable->tableId.uid)), sizeof(pTable->tableId.uid)); taosHashRemove(pMeta->map, (char *)(&(pTable->tableId.uid)), sizeof(pTable->tableId.uid));
tsdbFreeTable(pTable);
return 0; return 0;
} }
......
...@@ -23,9 +23,9 @@ ...@@ -23,9 +23,9 @@
#define TSDB_META_FILE_HEADER_SIZE 512 #define TSDB_META_FILE_HEADER_SIZE 512
typedef struct { typedef struct {
int32_t offset; int32_t offset;
int32_t size; int32_t size;
int64_t uid; uint64_t uid;
} SRecordInfo; } SRecordInfo;
// static int32_t tsdbGetMetaFileName(char *rootDir, char *fname); // static int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
...@@ -76,7 +76,7 @@ SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, af ...@@ -76,7 +76,7 @@ SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, af
return mfh; return mfh;
} }
int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen) { int32_t tsdbInsertMetaRecord(SMetaFile *mfh, uint64_t uid, void *cont, int32_t contLen) {
if (taosHashGet(mfh->map, (char *)(&uid), sizeof(uid)) != NULL) { if (taosHashGet(mfh->map, (char *)(&uid), sizeof(uid)) != NULL) {
return -1; return -1;
} }
...@@ -112,7 +112,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co ...@@ -112,7 +112,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co
return 0; return 0;
} }
int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, int64_t uid) { int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, uint64_t uid) {
char *ptr = taosHashGet(mfh->map, (char *)(&uid), sizeof(uid)); char *ptr = taosHashGet(mfh->map, (char *)(&uid), sizeof(uid));
if (ptr == NULL) return -1; if (ptr == NULL) return -1;
...@@ -139,7 +139,7 @@ int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, int64_t uid) { ...@@ -139,7 +139,7 @@ int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, int64_t uid) {
return 0; return 0;
} }
int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen) { int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, uint64_t uid, void *cont, int32_t contLen) {
char *ptr = taosHashGet(mfh->map, (char *)(&uid), sizeof(uid)); char *ptr = taosHashGet(mfh->map, (char *)(&uid), sizeof(uid));
if (ptr == NULL) return -1; if (ptr == NULL) return -1;
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "tchecksum.h" #include "tchecksum.h"
#include "tscompression.h" #include "tscompression.h"
#include "talgo.h" #include "talgo.h"
#include "tcoding.h"
// Local function definitions // Local function definitions
// static int tsdbCheckHelperCfg(SHelperCfg *pCfg); // static int tsdbCheckHelperCfg(SHelperCfg *pCfg);
...@@ -131,10 +132,10 @@ static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t ...@@ -131,10 +132,10 @@ static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t
// Init block part // Init block part
if (tsdbInitHelperBlock(pHelper) < 0) goto _err; if (tsdbInitHelperBlock(pHelper) < 0) goto _err;
pHelper->blockBuffer = pHelper->pBuffer =
tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pHelper->config.maxCols + tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pHelper->config.maxCols +
pHelper->config.maxRowSize * pHelper->config.maxRowsPerFileBlock + sizeof(TSCKSUM)); pHelper->config.maxRowSize * pHelper->config.maxRowsPerFileBlock + sizeof(TSCKSUM));
if (pHelper->blockBuffer == NULL) goto _err; if (pHelper->pBuffer == NULL) goto _err;
return 0; return 0;
...@@ -154,7 +155,7 @@ int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { ...@@ -154,7 +155,7 @@ int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
void tsdbDestroyHelper(SRWHelper *pHelper) { void tsdbDestroyHelper(SRWHelper *pHelper) {
if (pHelper) { if (pHelper) {
tzfree(pHelper->blockBuffer); tzfree(pHelper->pBuffer);
tzfree(pHelper->compBuffer); tzfree(pHelper->compBuffer);
tsdbDestroyHelperFile(pHelper); tsdbDestroyHelperFile(pHelper);
tsdbDestroyHelperTable(pHelper); tsdbDestroyHelperTable(pHelper);
...@@ -211,13 +212,15 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { ...@@ -211,13 +212,15 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
// Create and open .h // Create and open .h
if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) return -1; if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) return -1;
size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM); // size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM);
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, tsize) < tsize) goto _err; if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE)
goto _err;
// Create and open .l file if should // Create and open .l file if should
if (tsdbShouldCreateNewLast(pHelper)) { if (tsdbShouldCreateNewLast(pHelper)) {
if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err; if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err;
if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) goto _err; if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE)
goto _err;
} }
} else { } else {
if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err; if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err;
...@@ -238,6 +241,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { ...@@ -238,6 +241,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
pHelper->files.headF.fd = -1; pHelper->files.headF.fd = -1;
} }
if (pHelper->files.dataF.fd > 0) { if (pHelper->files.dataF.fd > 0) {
if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.dataF), 0);
close(pHelper->files.dataF.fd); close(pHelper->files.dataF.fd);
pHelper->files.dataF.fd = -1; pHelper->files.dataF.fd = -1;
} }
...@@ -246,6 +250,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { ...@@ -246,6 +250,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
pHelper->files.lastF.fd = -1; pHelper->files.lastF.fd = -1;
} }
if (pHelper->files.nHeadF.fd > 0) { if (pHelper->files.nHeadF.fd > 0) {
if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nHeadF), 0);
close(pHelper->files.nHeadF.fd); close(pHelper->files.nHeadF.fd);
pHelper->files.nHeadF.fd = -1; pHelper->files.nHeadF.fd = -1;
if (hasError) { if (hasError) {
...@@ -257,6 +262,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { ...@@ -257,6 +262,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
} }
if (pHelper->files.nLastF.fd > 0) { if (pHelper->files.nLastF.fd > 0) {
if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nLastF), 0);
close(pHelper->files.nLastF.fd); close(pHelper->files.nLastF.fd);
pHelper->files.nLastF.fd = -1; pHelper->files.nLastF.fd = -1;
if (hasError) { if (hasError) {
...@@ -416,7 +422,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { ...@@ -416,7 +422,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
pIdx->uid = pHelper->tableInfo.uid; pIdx->uid = pHelper->tableInfo.uid;
if (pIdx->offset < 0) return -1; if (pIdx->offset < 0) return -1;
ASSERT(pIdx->offset >= tsizeof(pHelper->pCompIdx)); ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1; if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
} }
...@@ -426,13 +432,27 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { ...@@ -426,13 +432,27 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
int tsdbWriteCompIdx(SRWHelper *pHelper) { int tsdbWriteCompIdx(SRWHelper *pHelper) {
ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER); ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
if (lseek(pHelper->files.nHeadF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; off_t offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
if (offset < 0) return -1;
SFile *pFile = &(pHelper->files.nHeadF);
pFile->info.offset = offset;
// TODO: change the implementation of pHelper->pBuffer
void *buf = pHelper->pBuffer;
for (uint32_t i = 0; i < pHelper->config.maxTables; i++) {
SCompIdx *pCompIdx = pHelper->pCompIdx + i;
if (pCompIdx->offset > 0) {
buf = taosEncodeVariant32(buf, i);
buf = tsdbEncodeSCompIdx(buf, pCompIdx);
}
}
ASSERT(tsizeof(pHelper->pCompIdx) == sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM)); int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompIdx, tsizeof(pHelper->pCompIdx)); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize);
if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pCompIdx, tsizeof(pHelper->pCompIdx)) < tsizeof(pHelper->pCompIdx)) if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) return -1;
return -1; pFile->info.len = tsize;
return 0; return 0;
} }
...@@ -441,14 +461,36 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { ...@@ -441,14 +461,36 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) { if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) {
// If not load from file, just load it in object // If not load from file, just load it in object
int fd = pHelper->files.headF.fd; SFile *pFile = &(pHelper->files.headF);
int fd = pFile->fd;
memset(pHelper->pCompIdx, 0, tsizeof(pHelper->pCompIdx));
if (pFile->info.offset > 0) {
ASSERT(pFile->info.offset > TSDB_FILE_HEAD_SIZE);
if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) return -1;
if (tread(fd, (void *)(pHelper->pBuffer), pFile->info.len) < pFile->info.len)
return -1;
if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) {
// TODO: File is broken, try to deal with it
return -1;
}
if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; // Decode it
if (tread(fd, (void *)(pHelper->pCompIdx), tsizeof((void *)pHelper->pCompIdx)) < tsizeof(pHelper->pCompIdx)) return -1; void *ptr = pHelper->pBuffer;
if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pCompIdx), tsizeof((void *)pHelper->pCompIdx))) { while (((char *)ptr - (char *)pHelper->pBuffer) < (pFile->info.len - sizeof(TSCKSUM))) {
// TODO: File is broken, try to deal with it uint32_t tid = 0;
return -1; if ((ptr = taosDecodeVariant32(ptr, &tid)) == NULL) return -1;
ASSERT(tid > 0 && tid < pHelper->config.maxTables);
if ((ptr = tsdbDecodeSCompIdx(ptr, pHelper->pCompIdx + tid)) == NULL) return -1;
ASSERT((char *)ptr - (char *)pHelper->pBuffer <= pFile->info.len - sizeof(TSCKSUM));
}
ASSERT(((char *)ptr - (char *)pHelper->pBuffer) == (pFile->info.len - sizeof(TSCKSUM)));
} }
} }
helperSetState(pHelper, TSDB_HELPER_IDX_LOAD); helperSetState(pHelper, TSDB_HELPER_IDX_LOAD);
...@@ -590,9 +632,9 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 ...@@ -590,9 +632,9 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) {
ASSERT(pCompBlock->numOfSubBlocks <= 1); ASSERT(pCompBlock->numOfSubBlocks <= 1);
ASSERT(tsizeof(pHelper->blockBuffer) >= pCompBlock->len); ASSERT(tsizeof(pHelper->pBuffer) >= pCompBlock->len);
SCompData *pCompData = (SCompData *)pHelper->blockBuffer; SCompData *pCompData = (SCompData *)pHelper->pBuffer;
int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) goto _err; if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) goto _err;
...@@ -685,7 +727,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa ...@@ -685,7 +727,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints && ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints &&
rowsToWrite <= pHelper->config.maxRowsPerFileBlock); rowsToWrite <= pHelper->config.maxRowsPerFileBlock);
SCompData *pCompData = (SCompData *)(pHelper->blockBuffer); SCompData *pCompData = (SCompData *)(pHelper->pBuffer);
int64_t offset = 0; int64_t offset = 0;
offset = lseek(pFile->fd, 0, SEEK_END); offset = lseek(pFile->fd, 0, SEEK_END);
...@@ -740,7 +782,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa ...@@ -740,7 +782,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
} }
pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))( pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))(
(char *)pDataCol->pData, tlen, rowsToWrite, tptr, tsizeof(pHelper->blockBuffer) - lsize, (char *)pDataCol->pData, tlen, rowsToWrite, tptr, tsizeof(pHelper->pBuffer) - lsize,
pHelper->config.compress, pHelper->compBuffer, tsizeof(pHelper->compBuffer)); pHelper->config.compress, pHelper->compBuffer, tsizeof(pHelper->compBuffer));
} else { } else {
pCompCol->len = tlen; pCompCol->len = tlen;
...@@ -1168,4 +1210,73 @@ static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) ...@@ -1168,4 +1210,73 @@ static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey)
if ((TSKEY *)ptr2 - (TSKEY *)ptr1 < 0) return 0; if ((TSKEY *)ptr2 - (TSKEY *)ptr1 < 0) return 0;
return ((TSKEY *)ptr2 - (TSKEY *)ptr1) + 1; return ((TSKEY *)ptr2 - (TSKEY *)ptr1) + 1;
}
void *tsdbEncodeSCompIdx(void *buf, SCompIdx *pIdx) {
buf = taosEncodeVariant32(buf, pIdx->len);
buf = taosEncodeVariant32(buf, pIdx->offset);
buf = taosEncodeFixed8(buf, pIdx->hasLast);
buf = taosEncodeVariant32(buf, pIdx->numOfBlocks);
buf = taosEncodeFixed64(buf, pIdx->uid);
buf = taosEncodeFixed64(buf, pIdx->maxKey);
return buf;
}
void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) {
uint8_t hasLast = 0;
uint32_t numOfBlocks = 0;
uint64_t value = 0;
if ((buf = taosDecodeVariant32(buf, &(pIdx->len))) == NULL) return NULL;
if ((buf = taosDecodeVariant32(buf, &(pIdx->offset))) == NULL) return NULL;
if ((buf = taosDecodeFixed8(buf, &(hasLast))) == NULL) return NULL;
pIdx->hasLast = hasLast;
if ((buf = taosDecodeVariant32(buf, &(numOfBlocks))) == NULL) return NULL;
pIdx->numOfBlocks = numOfBlocks;
if ((buf = taosDecodeFixed64(buf, &value)) == NULL) return NULL;
pIdx->uid = (int64_t)value;
if ((buf = taosDecodeFixed64(buf, &value)) == NULL) return NULL;
pIdx->maxKey = (TSKEY)value;
return buf;
}
int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
void *pBuf = (void *)buf;
pBuf = taosEncodeFixed32(pBuf, version);
pBuf = tsdbEncodeSFileInfo(pBuf, &(pFile->info));
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
if (lseek(pFile->fd, 0, SEEK_SET) < 0) return -1;
if (twrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) return -1;
return 0;
}
void *tsdbEncodeSFileInfo(void *buf, const STsdbFileInfo *pInfo) {
buf = taosEncodeFixed32(buf, pInfo->offset);
buf = taosEncodeFixed32(buf, pInfo->len);
buf = taosEncodeFixed64(buf, pInfo->size);
buf = taosEncodeFixed64(buf, pInfo->tombSize);
buf = taosEncodeFixed32(buf, pInfo->totalBlocks);
buf = taosEncodeFixed32(buf, pInfo->totalSubBlocks);
return buf;
}
void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) {
buf = taosDecodeFixed32(buf, &(pInfo->offset));
buf = taosDecodeFixed32(buf, &(pInfo->len));
buf = taosDecodeFixed64(buf, &(pInfo->size));
buf = taosDecodeFixed64(buf, &(pInfo->tombSize));
buf = taosDecodeFixed32(buf, &(pInfo->totalBlocks));
buf = taosDecodeFixed32(buf, &(pInfo->totalSubBlocks));
return buf;
} }
\ No newline at end of file
...@@ -1516,8 +1516,9 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) ...@@ -1516,8 +1516,9 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagCond, size_t len, int16_t tagNameRelType, int32_t tsdbQuerySTableByTagCond(TsdbRepoT* tsdb, uint64_t uid, const char* pTagCond, size_t len,
const char* tbnameCond, STableGroupInfo *pGroupInfo, SColIndex *pColIndex, int32_t numOfCols) { int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
SColIndex* pColIndex, int32_t numOfCols) {
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
if (pTable == NULL) { if (pTable == NULL) {
uError("%p failed to get stable, uid:%" PRIu64, tsdb, uid); uError("%p failed to get stable, uid:%" PRIu64, tsdb, uid);
...@@ -1589,7 +1590,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagC ...@@ -1589,7 +1590,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagC
return ret; return ret;
} }
int32_t tsdbGetOneTableGroup(TsdbRepoT* tsdb, int64_t uid, STableGroupInfo* pGroupInfo) { int32_t tsdbGetOneTableGroup(TsdbRepoT* tsdb, uint64_t uid, STableGroupInfo* pGroupInfo) {
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
if (pTable == NULL) { if (pTable == NULL) {
return TSDB_CODE_INVALID_TABLE_ID; return TSDB_CODE_INVALID_TABLE_ID;
......
...@@ -16,7 +16,7 @@ typedef struct { ...@@ -16,7 +16,7 @@ typedef struct {
TsdbRepoT *pRepo; TsdbRepoT *pRepo;
bool isAscend; bool isAscend;
int tid; int tid;
int64_t uid; uint64_t uid;
int sversion; int sversion;
TSKEY startTime; TSKEY startTime;
TSKEY interval; TSKEY interval;
......
...@@ -29,6 +29,11 @@ extern "C" { ...@@ -29,6 +29,11 @@ extern "C" {
static const int32_t TNUMBER = 1; static const int32_t TNUMBER = 1;
#define IS_LITTLE_ENDIAN() (*(uint8_t *)(&TNUMBER) != 0) #define IS_LITTLE_ENDIAN() (*(uint8_t *)(&TNUMBER) != 0)
static FORCE_INLINE void *taosEncodeFixed8(void *buf, uint8_t value) {
((uint8_t *)buf)[0] = value;
return POINTER_SHIFT(buf, sizeof(value));
}
static FORCE_INLINE void *taosEncodeFixed16(void *buf, uint16_t value) { static FORCE_INLINE void *taosEncodeFixed16(void *buf, uint16_t value) {
if (IS_LITTLE_ENDIAN()) { if (IS_LITTLE_ENDIAN()) {
memcpy(buf, &value, sizeof(value)); memcpy(buf, &value, sizeof(value));
...@@ -70,6 +75,11 @@ static FORCE_INLINE void *taosEncodeFixed64(void *buf, uint64_t value) { ...@@ -70,6 +75,11 @@ static FORCE_INLINE void *taosEncodeFixed64(void *buf, uint64_t value) {
return POINTER_SHIFT(buf, sizeof(value)); return POINTER_SHIFT(buf, sizeof(value));
} }
static FORCE_INLINE void *taosDecodeFixed8(void *buf, uint8_t *value) {
*value = ((uint8_t *)buf)[0];
return POINTER_SHIFT(buf, sizeof(*value));
}
static FORCE_INLINE void *taosDecodeFixed16(void *buf, uint16_t *value) { static FORCE_INLINE void *taosDecodeFixed16(void *buf, uint16_t *value) {
if (IS_LITTLE_ENDIAN()) { if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value)); memcpy(value, buf, sizeof(*value));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册