提交 29e65ce7 编写于 作者: H Hongze Cheng

more tsdb fs work

上级 a55f40a1
...@@ -63,6 +63,8 @@ typedef struct SDataFWriter SDataFWriter; ...@@ -63,6 +63,8 @@ typedef struct SDataFWriter SDataFWriter;
typedef struct SDataFReader SDataFReader; typedef struct SDataFReader SDataFReader;
typedef struct SDelFWriter SDelFWriter; typedef struct SDelFWriter SDelFWriter;
typedef struct SDelFReader SDelFReader; typedef struct SDelFReader SDelFReader;
typedef struct SRowIter SRowIter;
typedef struct STsdbFS STsdbFS;
#define TSDB_MAX_SUBBLOCKS 8 #define TSDB_MAX_SUBBLOCKS 8
...@@ -78,6 +80,9 @@ TSDBKEY tsdbRowKey(TSDBROW *pRow); ...@@ -78,6 +80,9 @@ TSDBKEY tsdbRowKey(TSDBROW *pRow);
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
// SRowIter
#define tRowIterInit(ROW, SCHEMA) ((SRowIter){.pRow = (ROW), .pSchema = (SCHEMA)})
SColVal *tRowIterNext(SRowIter *pIter);
// TABLEID // TABLEID
int32_t tTABLEIDCmprFn(const void *p1, const void *p2); int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
// TSDBKEY // TSDBKEY
...@@ -107,11 +112,11 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph); ...@@ -107,11 +112,11 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph);
// SColdata // SColdata
#define tColDataInit() ((SColData){0}) #define tColDataInit() ((SColData){0})
void tColDataReset(SColData *pColData); void tColDataReset(SColData *pColData);
void tColDataClear(SColData *pColData); void tColDataClear(void *ph);
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
int32_t tColDataCmprFn(const void *p1, const void *p2); int32_t tColDataCmprFn(const void *p1, const void *p2);
// SBlockData // SBlockData
#define tBlockDataInit() ((SBlockData){0}) int32_t tBlockDataInit(SBlockData *pBlockData);
void tBlockDataReset(SBlockData *pBlockData); void tBlockDataReset(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData); void tBlockDataClear(SBlockData *pBlockData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
...@@ -160,14 +165,15 @@ void tsdbLastFileName(STsdb *pTsdb, SLastFile *pFile, char fname[]); ...@@ -160,14 +165,15 @@ void tsdbLastFileName(STsdb *pTsdb, SLastFile *pFile, char fname[]);
// SSmaFile // SSmaFile
void tsdbSmaFileName(STsdb *pTsdb, SSmaFile *pFile, char fname[]); void tsdbSmaFileName(STsdb *pTsdb, SSmaFile *pFile, char fname[]);
// SDelFile // SDelFile
#define tsdbDelFileCreate() ((SDelFile){.info = KEYINFO_INIT_VAL, .size = 0, .offset = 0}) #define tsdbDelFileCreate() \
((SDelFile){ \
.maxKey = TSKEY_MIN, .minKey = TSKEY_MAX, .maxVersion = -1, .minVersion = INT64_MAX, .size = 0, .offset = 0})
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]); void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
// tsdbFS.c ============================================================================================== // tsdbFS.c ==============================================================================================
typedef struct STsdbFS STsdbFS; int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS);
int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS); int32_t tsdbFSClose(STsdbFS *pFS);
int32_t tsdbFSClose(STsdbFS *pFS); int32_t tsdbFSStart(STsdbFS *pFS);
int32_t tsdbFSStart(STsdbFS *pFS); int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback);
int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback);
// tsdbReaderWriter.c ============================================================================================== // tsdbReaderWriter.c ==============================================================================================
// SDataFWriter // SDataFWriter
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
...@@ -200,7 +206,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB ...@@ -200,7 +206,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB
// tsdbCache // tsdbCache
int32_t tsdbOpenCache(STsdb *pTsdb); int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(SLRUCache *pCache); void tsdbCloseCache(SLRUCache *pCache);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row);
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow **ppRow); int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow **ppRow);
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid); int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid);
...@@ -366,14 +372,12 @@ struct SColData { ...@@ -366,14 +372,12 @@ struct SColData {
}; };
struct SBlockData { struct SBlockData {
int32_t maxRow; int32_t maxRow;
int32_t nRow; int32_t nRow;
int64_t *aVersion; int64_t *aVersion;
TSKEY *aTSKEY; TSKEY *aTSKEY;
int32_t maxCol; SArray *apColData;
int32_t nColData; SArray *aColData;
SColData **apColData;
SColData *aColData;
}; };
// ================== TSDB global config // ================== TSDB global config
...@@ -414,7 +418,10 @@ struct SDelIdx { ...@@ -414,7 +418,10 @@ struct SDelIdx {
}; };
struct SDelFile { struct SDelFile {
KEYINFO info; TSKEY minKey;
TSKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
int64_t size; int64_t size;
int64_t offset; int64_t offset;
}; };
...@@ -463,11 +470,19 @@ struct SSmaFile { ...@@ -463,11 +470,19 @@ struct SSmaFile {
struct SDFileSet { struct SDFileSet {
SDiskID diskId; SDiskID diskId;
int32_t nRef; int32_t fid;
SHeadFile *pHeadFile; SHeadFile *pHeadFile;
SDataFile *pDataFile; SDataFile *pDataFile;
SLastFile *pLastFile; SLastFile *pLastFile;
SSmaFile *pSmaFile; SSmaFile *pSmaFile;
int32_t nRef;
};
struct SRowIter {
TSDBROW *pRow;
STSchema *pSchema;
SColVal colVal;
int32_t i;
}; };
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -15,25 +15,364 @@ ...@@ -15,25 +15,364 @@
#include "tsdb.h" #include "tsdb.h"
typedef struct {
SDelFile *pDelFile;
SArray *aDFileSet; // SArray<aDFileSet>
} STsdbFSState;
struct STsdbFS { struct STsdbFS {
STsdb *pTsdb; STsdb *pTsdb;
TdThreadRwlock lock; TdThreadRwlock lock;
int64_t minVersion; int8_t inTxn;
int64_t maxVersion; STsdbFSState *cState;
SDelFile *pTombstoneF; STsdbFSState *nState;
STsdbCacheFile *pCacheF;
SArray *pArray;
}; };
int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) { // =================================================================================================
static int32_t tsdbDelFileToJson(const void *pObj, SJson *pJson) {
int32_t code = 0;
SDelFile *pDelFile = (SDelFile *)pObj;
if (tjsonAddIntegerToObject(pJson, "minKey", pDelFile->minKey) < 0) goto _err;
if (tjsonAddIntegerToObject(pJson, "maxKey", pDelFile->maxKey) < 0) goto _err;
if (tjsonAddIntegerToObject(pJson, "minVer", pDelFile->minVersion) < 0) goto _err;
if (tjsonAddIntegerToObject(pJson, "maxVer", pDelFile->maxVersion) < 0) goto _err;
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
static int32_t tsdbHeadFileToJson(const void *pObj, SJson *pJson) {
int32_t code = 0;
SHeadFile *pHeadFile = (SHeadFile *)pObj;
if (tjsonAddIntegerToObject(pJson, "size", pHeadFile->size) < 0) goto _err;
if (tjsonAddIntegerToObject(pJson, "offset", pHeadFile->offset) < 0) goto _err;
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
static int32_t tsdbDataFileToJson(const void *pObj, SJson *pJson) {
int32_t code = 0;
SDataFile *pDataFile = (SDataFile *)pObj;
if (tjsonAddIntegerToObject(pJson, "size", pDataFile->size) < 0) goto _err;
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
static int32_t tsdbLastFileToJson(const void *pObj, SJson *pJson) {
int32_t code = 0;
SLastFile *pLastFile = (SLastFile *)pObj;
if (tjsonAddIntegerToObject(pJson, "size", pLastFile->size) < 0) goto _err;
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
static int32_t tsdbSmaFileToJson(const void *pObj, SJson *pJson) {
int32_t code = 0;
SSmaFile *pSmaFile = (SSmaFile *)pObj;
if (tjsonAddIntegerToObject(pJson, "size", pSmaFile->size) < 0) goto _err;
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
static int32_t tsdbDFileSetToJson(const void *pObj, SJson *pJson) {
int32_t code = 0;
SDFileSet *pDFileSet = (SDFileSet *)pObj;
if (tjsonAddIntegerToObject(pJson, "level", pDFileSet->diskId.level) < 0) goto _err;
if (tjsonAddIntegerToObject(pJson, "id", pDFileSet->diskId.id) < 0) goto _err;
if (tjsonAddIntegerToObject(pJson, "fid", pDFileSet->fid) < 0) goto _err;
if (tjsonAddObject(pJson, "head", tsdbHeadFileToJson, pDFileSet->pHeadFile) < 0) goto _err;
if (tjsonAddObject(pJson, "data", tsdbDataFileToJson, pDFileSet->pDataFile) < 0) goto _err;
if (tjsonAddObject(pJson, "last", tsdbLastFileToJson, pDFileSet->pLastFile) < 0) goto _err;
if (tjsonAddObject(pJson, "sma", tsdbSmaFileToJson, pDFileSet->pSmaFile) < 0) goto _err;
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
static int32_t tsdbFSStateToJsonStr(STsdbFSState *pState, char **ppData) {
int32_t code = 0;
SJson *pJson = NULL;
pJson = tjsonCreateObject();
if (pJson == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (tjsonAddObject(pJson, "DelFile", tsdbDelFileToJson, pState->pDelFile) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (tjsonAddTArray(pJson, "DFileSet", tsdbDFileSetToJson, pState->aDFileSet) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*ppData = tjsonToString(pJson);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
tjsonDelete(pJson);
return code;
_err:
return code;
}
static int32_t tsdbJsonStrToFSState(char *pData, STsdbFSState *pState) {
int32_t code = 0;
SJson *pJson = NULL;
pJson = tjsonParse(pData);
if (pJson == NULL) goto _err;
// if (tjsonToObject(pJson, "DelFile", tsdbJsonToDelFile, &pState->pDelFile) < 0) goto _err;
// if (tjsonToTArray(pJson, "DFIleSet", tsdbJsonToDFileSet, ) < 0) goto _err;
ASSERT(0);
tjsonDelete(pJson);
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
static int32_t tsdbCreateEmptyCurrent(const char *fname, STsdbFSState *pState) {
int32_t code = 0;
int64_t n;
int64_t size;
char *pData = NULL;
TdFilePtr pFD = NULL;
// to json str
code = tsdbFSStateToJsonStr(pState, &pData);
if (code) goto _err;
// create and write
pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
size = strlen(pData);
n = taosWriteFile(pFD, pData, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile(pFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosCloseFile(&pFD);
if (pData) taosMemoryFree(pData);
return code;
_err:
tsdbError("create empry current failed since %s", tstrerror(code));
if (pData) taosMemoryFree(pData);
return code;
}
static int32_t tsdbLoadCurrentState(STsdbFS *pFS, STsdbFSState *pState) {
int32_t code = 0;
int64_t size;
int64_t n;
char fname[TSDB_FILENAME_LEN];
char *pData = NULL;
TdFilePtr pFD;
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s/CURRENT", pFS->pTsdb->path);
if (!taosCheckExistFile(fname)) {
// create an empry CURRENT file if not exists
code = tsdbCreateEmptyCurrent(fname, pState);
if (code) goto _err;
} else {
// open the file and load
pFD = taosOpenFile(fname, TD_FILE_READ);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFStatFile(pFD, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pData = taosMemoryMalloc(size + 1);
if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pData[size] = '\0';
n = taosReadFile(pFD, pData, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosCloseFile(&pFD);
// decode
code = tsdbJsonStrToFSState(pData, pState);
if (code) goto _err;
}
if (pData) taosMemoryFree(pData);
return code;
_err:
tsdbError("vgId:%d tsdb load current state failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
if (pData) taosMemoryFree(pData);
return code;
}
static int32_t tsdbFSOpenImpl(STsdbFS *pFS) {
int32_t code = 0;
int64_t size;
int64_t n;
// read CURRENT file
code = tsdbLoadCurrentState(pFS, pFS->cState);
if (code) goto _err;
// decode the statue file
// code = tsdbDecodeFSState(pData, pFS->cState);
// if (code) goto _err;
// // scan and fix invalid tsdb FS
// code = tsdbScanAndFixFS(pFS);
// if (code) goto _err;
// if (pData) taosMemoryFree(pData);
return code;
_err:
// if (pData) taosMemoryFree(pData);
tsdbError("vgId:%d tsdb fs open impl failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbFSCloseImpl(STsdbFS *pFS) {
int32_t code = 0; int32_t code = 0;
// TODO // TODO
return code; return code;
} }
// EXPOSED APIS ====================================================================================
int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) {
int32_t code = 0;
STsdbFS *pFS = NULL;
pFS = (STsdbFS *)taosMemoryCalloc(1, sizeof(*pFS));
if (pFS == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pFS->pTsdb = pTsdb;
code = taosThreadRwlockInit(&pFS->lock, NULL);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
goto _err;
}
pFS->inTxn = 0;
pFS->cState = (STsdbFSState *)taosMemoryCalloc(1, sizeof(STsdbFSState));
if (pFS->cState == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pFS->cState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet));
if (pFS->cState->aDFileSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pFS->nState = (STsdbFSState *)taosMemoryCalloc(1, sizeof(STsdbFSState));
if (pFS->nState == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pFS->nState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet));
if (pFS->nState->aDFileSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tsdbFSOpenImpl(pFS);
if (code) goto _err;
*ppFS = pFS;
return code;
_err:
*ppFS = NULL;
tsdbError("vgId:%d tsdb fs open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbFSClose(STsdbFS *pFS) { int32_t tsdbFSClose(STsdbFS *pFS) {
int32_t code = 0; int32_t code = 0;
// TODO
if (pFS) {
code = tsdbFSCloseImpl(pFS);
if (code) goto _err;
taosArrayDestroy(pFS->nState->aDFileSet);
taosMemoryFree(pFS->nState);
taosArrayDestroy(pFS->cState->aDFileSet);
taosMemoryFree(pFS->cState);
taosThreadRwlockDestroy(&pFS->lock);
taosMemoryFree(pFS);
}
return code;
_err:
tsdbError("vgId:%d tsdb fs close failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
......
...@@ -922,8 +922,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ ...@@ -922,8 +922,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
// other columns // other columns
offset = 0; offset = 0;
tMapDataClear(&pSubBlock->mBlockCol); tMapDataClear(&pSubBlock->mBlockCol);
for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) { for (int32_t iCol = 0; iCol < taosArrayGetSize(pBlockData->apColData); iCol++) {
SColData *pColData = &pBlockData->aColData[iCol]; SColData *pColData = (SColData *)taosArrayGetP(pBlockData->apColData, iCol);
ASSERT(pColData->flags); ASSERT(pColData->flags);
......
...@@ -445,7 +445,10 @@ int32_t tGetDelData(uint8_t *p, void *ph) { ...@@ -445,7 +445,10 @@ int32_t tGetDelData(uint8_t *p, void *ph) {
int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile) { int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile) {
int32_t n = 0; int32_t n = 0;
n += tPutKEYINFO(p ? p + n : p, &pDelFile->info); n += tPutI64(p ? p + n : p, pDelFile->minKey);
n += tPutI64(p ? p + n : p, pDelFile->maxKey);
n += tPutI64v(p ? p + n : p, pDelFile->minVersion);
n += tPutI64v(p ? p + n : p, pDelFile->maxVersion);
n += tPutI64v(p ? p + n : p, pDelFile->size); n += tPutI64v(p ? p + n : p, pDelFile->size);
n += tPutI64v(p ? p + n : p, pDelFile->offset); n += tPutI64v(p ? p + n : p, pDelFile->offset);
...@@ -455,7 +458,10 @@ int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile) { ...@@ -455,7 +458,10 @@ int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile) {
int32_t tGetDelFileHdr(uint8_t *p, SDelFile *pDelFile) { int32_t tGetDelFileHdr(uint8_t *p, SDelFile *pDelFile) {
int32_t n = 0; int32_t n = 0;
n += tGetKEYINFO(p + n, &pDelFile->info); n += tGetI64(p + n, &pDelFile->minKey);
n += tGetI64(p + n, &pDelFile->maxKey);
n += tGetI64v(p + n, &pDelFile->minVersion);
n += tGetI64v(p + n, &pDelFile->maxVersion);
n += tGetI64v(p + n, &pDelFile->size); n += tGetI64v(p + n, &pDelFile->size);
n += tGetI64v(p + n, &pDelFile->offset); n += tGetI64v(p + n, &pDelFile->offset);
...@@ -500,14 +506,18 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal * ...@@ -500,14 +506,18 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
STColumn *pTColumn = &pTSchema->columns[iCol]; STColumn *pTColumn = &pTSchema->columns[iCol];
SValue value; SValue value;
ASSERT(iCol > 0);
if (pRow->type == 0) { if (pRow->type == 0) {
// get from row (todo); // get from row (todo);
} else if (pRow->type == 1) { } else if (pRow->type == 1) {
SColData *pColData; SColData *pColData;
void *p; void *p = NULL;
p = taosbsearch(&(SColData){.cid = pTColumn->colId}, pRow->pBlockData->aColData, pRow->pBlockData->nColData, // TODO
sizeof(SBlockCol), tColDataCmprFn, TD_EQ); ASSERT(0);
// p = taosbsearch(&(SColData){.cid = pTColumn->colId}, pRow->pBlockData->apColData, pRow->pBlockData->nColData,
// sizeof(SBlockCol), tColDataCmprFn, TD_EQ);
if (p) { if (p) {
pColData = (SColData *)p; pColData = (SColData *)p;
ASSERT(pColData->flags); ASSERT(pColData->flags);
...@@ -707,8 +717,12 @@ void tColDataReset(SColData *pColData) { ...@@ -707,8 +717,12 @@ void tColDataReset(SColData *pColData) {
// TODO // TODO
} }
void tColDataClear(SColData *pColData) { void tColDataClear(void *ph) {
// TODO SColData *pColData = (SColData *)ph;
tsdbFree(pColData->pBitMap);
tsdbFree((uint8_t *)pColData->pOfst);
tsdbFree(pColData->pData);
} }
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal) { int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal) {
...@@ -730,50 +744,77 @@ int32_t tColDataCmprFn(const void *p1, const void *p2) { ...@@ -730,50 +744,77 @@ int32_t tColDataCmprFn(const void *p1, const void *p2) {
// SBlockData ====================================================== // SBlockData ======================================================
static int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData) { static int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData) {
int32_t code = 0; int32_t code = 0;
int32_t nColData = pBlockData->nColData; // int32_t nColData = pBlockData->nColData;
// pBlockData->nColData++;
// if (pBlockData->nColData > pBlockData->maxCol) {
// if (pBlockData->maxCol == 0) {
// pBlockData->maxCol = 16;
// } else {
// pBlockData->maxCol *= 2;
// }
// code = tsdbRealloc((uint8_t **)&pBlockData->apColData, sizeof(SColData *) * pBlockData->maxCol);
// if (code) goto _exit;
// code = tsdbRealloc((uint8_t **)&pBlockData->aColData, sizeof(SColData) * pBlockData->maxCol);
// if (code) goto _exit;
// for (int32_t iColData = nColData; iColData < pBlockData->maxCol; iColData++) {
// pBlockData->aColData[iColData] = tColDataInit();
// }
// }
pBlockData->nColData++; // // memmove (todo)
if (pBlockData->nColData > pBlockData->maxCol) { // // int32_t size = sizeof(SColData *) * (nColData - iColData);
if (pBlockData->maxCol == 0) { // // if (size) {
pBlockData->maxCol = 16; // // memmove();
} else { // // }
pBlockData->maxCol *= 2;
}
code = tsdbRealloc((uint8_t **)&pBlockData->apColData, sizeof(SColData *) * pBlockData->maxCol); // pBlockData->apColData[iColData] = &pBlockData->aColData[nColData];
if (code) goto _exit;
code = tsdbRealloc((uint8_t **)&pBlockData->aColData, sizeof(SColData) * pBlockData->maxCol);
if (code) goto _exit;
for (int32_t iColData = nColData; iColData < pBlockData->maxCol; iColData++) { _exit:
pBlockData->aColData[iColData] = tColDataInit(); return code;
} }
}
// memmove (todo) int32_t tBlockDataInit(SBlockData *pBlockData) {
// int32_t size = sizeof(SColData *) * (nColData - iColData); int32_t code = 0;
// if (size) {
// memmove();
// }
pBlockData->apColData[iColData] = &pBlockData->aColData[nColData]; *pBlockData = (SBlockData){0};
pBlockData->apColData = taosArrayInit(0, sizeof(SColData *));
if (pBlockData->apColData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pBlockData->aColData = taosArrayInit(0, sizeof(SColData));
if (pBlockData->aColData == NULL) {
taosArrayDestroy(pBlockData->apColData);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
_exit: _exit:
return code; return code;
} }
static int32_t tBlockDataAppendRow0(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { void tBlockDataReset(SBlockData *pBlockData) {
int32_t code = 0; pBlockData->nRow = 0;
int32_t nRow = pBlockData->nRow; taosArrayClear(pBlockData->apColData);
TSDBKEY key = tsdbRowKey(pRow); }
int32_t iColumn;
int32_t nColumn;
int32_t iColData;
SColVal colVal;
SColVal *pColVal = &colVal;
ASSERT(pTSchema); void tBlockDataClear(SBlockData *pBlockData) {
ASSERT(pTSchema->version == TSDBROW_SVERSION(pRow)); tsdbFree((uint8_t *)pBlockData->aVersion);
tsdbFree((uint8_t *)pBlockData->aTSKEY);
taosArrayDestroy(pBlockData->apColData);
taosArrayDestroyEx(pBlockData->aColData, tColDataClear);
}
static int32_t tBlockDataAppendRow0(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
static int32_t tBlockDataAppendRow1(SBlockData *pBlockData, TSDBROW *pRow);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
int32_t nRow = pBlockData->nRow;
TSDBKEY key = tsdbRowKey(pRow);
pBlockData->nRow++; pBlockData->nRow++;
...@@ -785,126 +826,45 @@ static int32_t tBlockDataAppendRow0(SBlockData *pBlockData, TSDBROW *pRow, STSch ...@@ -785,126 +826,45 @@ static int32_t tBlockDataAppendRow0(SBlockData *pBlockData, TSDBROW *pRow, STSch
pBlockData->maxRow *= 2; pBlockData->maxRow *= 2;
} }
ASSERT(pBlockData->maxRow >= pBlockData->nRow);
code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->maxRow);
if (code) goto _err;
code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * pBlockData->maxRow); code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * pBlockData->maxRow);
if (code) goto _err; if (code) goto _err;
code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->maxRow);
if (code) goto _err;
} }
pBlockData->aVersion[nRow] = key.version; pBlockData->aVersion[nRow] = key.version;
pBlockData->aTSKEY[nRow] = key.ts; pBlockData->aTSKEY[nRow] = key.ts;
// other cols // OTHER
iColumn = 1; #if 0
nColumn = pTSchema->numOfCols; int32_t iColData = 0;
iColData = 0; int32_t nColData = taosArrayGetSize(pBlockData->apColData);
while (iColumn < nColumn || iColData < pBlockData->nColData) { SRowIter ri = tRowIterInit(pRow, pTSchema);
STColumn *pTColumn = NULL;
SColData *pColData = NULL; SColData *pColData = iColData < nColData ? (SColData *)taosArrayGetP(pBlockData->apColData, iColData) : NULL;
SColVal *pColVal = tRowIterNext(&ri);
if (iColumn < nColumn) { while (true) {
pTColumn = &pTSchema->columns[iColumn]; if (pColData && pColVal) {
} if (pColData->cid == pColVal->cid) {
if (iColData < pBlockData->nColData) { // append SColVal to SColData
pColData = &pBlockData->aColData[iColData]; pColVal = tRowIterNext(&ri);
} iColData++;
pColData = iColData < nColData ? (SColData *)taosArrayGetP(pBlockData->apColData, iColData) : NULL;
if (pTColumn && pColData) { } else if (pColData->cid < pColVal->cid) {
if (pTColumn->colId == pColData->cid) { // append a NONE
// tsdbRowGetColVal(pRow, pTSchema, iColumn, &cv); iColData++;
} else if (pTColumn->colId < pColData->cid) {
// add a new SColData, and append the column value cv to the SColData
} else {
// add a None to the column value
}
} else if (pTColumn) {
// tsdbRowGetColVal(pRow, pTSchema, iColumn, &cv);
// add a new SColData, and append the column value cv to the SColData
} else {
iColData++;
}
}
return code;
_err:
return code;
}
static int32_t tBlockDataAppendRow1(SBlockData *pBlockData, TSDBROW *pRow) {
int32_t code = 0;
int32_t nRow = pBlockData->nRow;
TSDBKEY key = tsdbRowKey(pRow);
int32_t iColData;
int32_t iColDataRow;
int32_t nColDataRow;
SColVal colVal;
SColVal *pColVal = &colVal;
pBlockData->nRow++;
// aKey (TODO)
pBlockData->aVersion[nRow] = key.version;
pBlockData->aTSKEY[nRow] = key.ts;
// other cols
iColData = 0;
iColDataRow = 0;
nColDataRow = pRow->pBlockData->nColData;
while (iColData < pBlockData->nColData || iColDataRow < nColDataRow) {
SColData *pColData = NULL;
SColData *pColDataRow = NULL;
if (iColData < pBlockData->nColData) {
pColData = &pBlockData->aColData[iColData];
}
if (iColDataRow < nColDataRow) {
pColDataRow = &pRow->pBlockData->aColData[iColDataRow];
}
if (pColData && pColDataRow) {
if (pColData->cid == pColDataRow->cid) {
// TODO
} else if (pColData->cid < pColDataRow->cid) {
// TODO
} else { } else {
// TODO // add a new SColData
} }
} else if (pColData) { } else if (pColData) {
// TODO // add a NONE
} else { } else {
// TODO // add a new SColData and append value
} }
} }
#endif
return code; return code;
}
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
TSDBKEY key = tsdbRowKey(pRow);
if (pRow->type == 0) {
code = tBlockDataAppendRow0(pBlockData, pRow, pTSchema);
} else if (pRow->type == 1) {
code = tBlockDataAppendRow1(pBlockData, pRow);
}
_err:
return code; return code;
} }
\ No newline at end of file
void tBlockDataReset(SBlockData *pBlockData) {
pBlockData->nRow = 0;
pBlockData->nColData = 0;
}
void tBlockDataClear(SBlockData *pBlockData) {
tsdbFree((uint8_t *)pBlockData->aVersion);
tsdbFree((uint8_t *)pBlockData->aTSKEY);
for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) {
tsdbFree(pBlockData->aColData[iCol].pBitMap);
tsdbFree(pBlockData->aColData[iCol].pData);
}
}
...@@ -79,7 +79,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { ...@@ -79,7 +79,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
SJson *pNodeRetentions = tjsonCreateArray(); SJson *pNodeRetentions = tjsonCreateArray();
tjsonAddItemToObject(pJson, "retentions", pNodeRetentions); tjsonAddItemToObject(pJson, "retentions", pNodeRetentions);
for (int32_t i = 0; i < nRetention; ++i) { for (int32_t i = 0; i < nRetention; ++i) {
SJson *pNodeRetention = tjsonCreateObject(); SJson *pNodeRetention = tjsonCreateObject();
const SRetention *pRetention = pCfg->tsdbCfg.retentions + i; const SRetention *pRetention = pCfg->tsdbCfg.retentions + i;
tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq); tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq);
tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit); tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit);
...@@ -118,45 +118,45 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { ...@@ -118,45 +118,45 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
int32_t code; int32_t code;
tjsonGetNumberValue(pJson, "vgId", pCfg->vgId, code); tjsonGetNumberValue(pJson, "vgId", pCfg->vgId, code);
if(code < 0) return -1; if (code < 0) return -1;
if (tjsonGetStringValue(pJson, "dbname", pCfg->dbname) < 0) return -1; if (tjsonGetStringValue(pJson, "dbname", pCfg->dbname) < 0) return -1;
tjsonGetNumberValue(pJson, "dbId", pCfg->dbId, code); tjsonGetNumberValue(pJson, "dbId", pCfg->dbId, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "szPage", pCfg->szPage, code); tjsonGetNumberValue(pJson, "szPage", pCfg->szPage, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "szCache", pCfg->szCache, code); tjsonGetNumberValue(pJson, "szCache", pCfg->szCache, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "szBuf", pCfg->szBuf, code); tjsonGetNumberValue(pJson, "szBuf", pCfg->szBuf, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeap, code); tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeap, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak, code); tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "isTsma", pCfg->isTsma, code); tjsonGetNumberValue(pJson, "isTsma", pCfg->isTsma, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "isRsma", pCfg->isRsma, code); tjsonGetNumberValue(pJson, "isRsma", pCfg->isRsma, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision, code); tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update, code); tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression, code); tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "slLevel", pCfg->tsdbCfg.slLevel, code); tjsonGetNumberValue(pJson, "slLevel", pCfg->tsdbCfg.slLevel, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days, code); tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows, code); tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows, code); tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0, code); tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1, code); tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2, code); tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2, code);
if(code < 0) return -1; if (code < 0) return -1;
SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions"); SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions");
int32_t nRetention = tjsonGetArraySize(pNodeRetentions); int32_t nRetention = tjsonGetArraySize(pNodeRetentions);
if (nRetention > TSDB_RETENTION_MAX) { if (nRetention > TSDB_RETENTION_MAX) {
nRetention = TSDB_RETENTION_MAX; nRetention = TSDB_RETENTION_MAX;
...@@ -170,30 +170,30 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { ...@@ -170,30 +170,30 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit, code); tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit, code);
} }
tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId, code); tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod, code); tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod, code); tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod, code); tjsonGetNumberValue(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize, code); tjsonGetNumberValue(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "wal.segSize", pCfg->walCfg.segSize, code); tjsonGetNumberValue(pJson, "wal.segSize", pCfg->walCfg.segSize, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level, code); tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin, code); tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd, code); tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod, code); tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum, code); tjsonGetNumberValue(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex, code); tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex, code);
if(code < 0) return -1; if (code < 0) return -1;
SJson *pNodeInfoArr = tjsonGetObjectItem(pJson, "syncCfg.nodeInfo"); SJson *pNodeInfoArr = tjsonGetObjectItem(pJson, "syncCfg.nodeInfo");
int arraySize = tjsonGetArraySize(pNodeInfoArr); int arraySize = tjsonGetArraySize(pNodeInfoArr);
......
...@@ -311,9 +311,9 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) { ...@@ -311,9 +311,9 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) {
int32_t code; int32_t code;
tjsonGetNumberValue(pJson, "commit version", pState->committed, code); tjsonGetNumberValue(pJson, "commit version", pState->committed, code);
if(code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "applied version", pState->applied, code); tjsonGetNumberValue(pJson, "applied version", pState->applied, code);
if(code < 0) return -1; if (code < 0) return -1;
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册