提交 906b50df 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/feat/tsdb_refact' into feat/tsdb_refact

......@@ -63,6 +63,8 @@ typedef struct SDataFWriter SDataFWriter;
typedef struct SDataFReader SDataFReader;
typedef struct SDelFWriter SDelFWriter;
typedef struct SDelFReader SDelFReader;
typedef struct SRowIter SRowIter;
typedef struct STsdbFS STsdbFS;
#define TSDB_MAX_SUBBLOCKS 8
......@@ -78,6 +80,9 @@ TSDBKEY tsdbRowKey(TSDBROW *pRow);
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
// SRowIter
#define tRowIterInit(ROW, SCHEMA) ((SRowIter){.pRow = (ROW), .pSchema = (SCHEMA)})
SColVal *tRowIterNext(SRowIter *pIter);
// TABLEID
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
// TSDBKEY
......@@ -107,11 +112,11 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph);
// SColdata
#define tColDataInit() ((SColData){0})
void tColDataReset(SColData *pColData);
void tColDataClear(SColData *pColData);
void tColDataClear(void *ph);
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
int32_t tColDataCmprFn(const void *p1, const void *p2);
// SBlockData
#define tBlockDataInit() ((SBlockData){0})
int32_t tBlockDataInit(SBlockData *pBlockData);
void tBlockDataReset(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
......@@ -160,14 +165,15 @@ void tsdbLastFileName(STsdb *pTsdb, SLastFile *pFile, char fname[]);
// SSmaFile
void tsdbSmaFileName(STsdb *pTsdb, SSmaFile *pFile, char fname[]);
// SDelFile
#define tsdbDelFileCreate() ((SDelFile){.info = KEYINFO_INIT_VAL, .size = 0, .offset = 0})
#define tsdbDelFileCreate() \
((SDelFile){ \
.maxKey = TSKEY_MIN, .minKey = TSKEY_MAX, .maxVersion = -1, .minVersion = INT64_MAX, .size = 0, .offset = 0})
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
// tsdbFS.c ==============================================================================================
typedef struct STsdbFS STsdbFS;
int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS);
int32_t tsdbFSClose(STsdbFS *pFS);
int32_t tsdbFSStart(STsdbFS *pFS);
int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback);
int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS);
int32_t tsdbFSClose(STsdbFS *pFS);
int32_t tsdbFSStart(STsdbFS *pFS);
int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback);
// tsdbReaderWriter.c ==============================================================================================
// SDataFWriter
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
......@@ -200,7 +206,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB
// tsdbCache
int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(SLRUCache *pCache);
void tsdbCloseCache(SLRUCache *pCache);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row);
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow **ppRow);
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid);
......@@ -366,14 +372,12 @@ struct SColData {
};
struct SBlockData {
int32_t maxRow;
int32_t nRow;
int64_t *aVersion;
TSKEY *aTSKEY;
int32_t maxCol;
int32_t nColData;
SColData **apColData;
SColData *aColData;
int32_t maxRow;
int32_t nRow;
int64_t *aVersion;
TSKEY *aTSKEY;
SArray *apColData;
SArray *aColData;
};
// ================== TSDB global config
......@@ -414,7 +418,10 @@ struct SDelIdx {
};
struct SDelFile {
KEYINFO info;
TSKEY minKey;
TSKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
int64_t size;
int64_t offset;
};
......@@ -463,11 +470,19 @@ struct SSmaFile {
struct SDFileSet {
SDiskID diskId;
int32_t nRef;
int32_t fid;
SHeadFile *pHeadFile;
SDataFile *pDataFile;
SLastFile *pLastFile;
SSmaFile *pSmaFile;
int32_t nRef;
};
struct SRowIter {
TSDBROW *pRow;
STSchema *pSchema;
SColVal colVal;
int32_t i;
};
#ifdef __cplusplus
......
......@@ -15,25 +15,364 @@
#include "tsdb.h"
typedef struct {
SDelFile *pDelFile;
SArray *aDFileSet; // SArray<aDFileSet>
} STsdbFSState;
struct STsdbFS {
STsdb *pTsdb;
TdThreadRwlock lock;
int64_t minVersion;
int64_t maxVersion;
SDelFile *pTombstoneF;
STsdbCacheFile *pCacheF;
SArray *pArray;
STsdb *pTsdb;
TdThreadRwlock lock;
int8_t inTxn;
STsdbFSState *cState;
STsdbFSState *nState;
};
int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) {
// =================================================================================================
static int32_t tsdbDelFileToJson(const void *pObj, SJson *pJson) {
int32_t code = 0;
SDelFile *pDelFile = (SDelFile *)pObj;
if (tjsonAddIntegerToObject(pJson, "minKey", pDelFile->minKey) < 0) goto _err;
if (tjsonAddIntegerToObject(pJson, "maxKey", pDelFile->maxKey) < 0) goto _err;
if (tjsonAddIntegerToObject(pJson, "minVer", pDelFile->minVersion) < 0) goto _err;
if (tjsonAddIntegerToObject(pJson, "maxVer", pDelFile->maxVersion) < 0) goto _err;
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
static int32_t tsdbHeadFileToJson(const void *pObj, SJson *pJson) {
int32_t code = 0;
SHeadFile *pHeadFile = (SHeadFile *)pObj;
if (tjsonAddIntegerToObject(pJson, "size", pHeadFile->size) < 0) goto _err;
if (tjsonAddIntegerToObject(pJson, "offset", pHeadFile->offset) < 0) goto _err;
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
static int32_t tsdbDataFileToJson(const void *pObj, SJson *pJson) {
int32_t code = 0;
SDataFile *pDataFile = (SDataFile *)pObj;
if (tjsonAddIntegerToObject(pJson, "size", pDataFile->size) < 0) goto _err;
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
static int32_t tsdbLastFileToJson(const void *pObj, SJson *pJson) {
int32_t code = 0;
SLastFile *pLastFile = (SLastFile *)pObj;
if (tjsonAddIntegerToObject(pJson, "size", pLastFile->size) < 0) goto _err;
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
static int32_t tsdbSmaFileToJson(const void *pObj, SJson *pJson) {
int32_t code = 0;
SSmaFile *pSmaFile = (SSmaFile *)pObj;
if (tjsonAddIntegerToObject(pJson, "size", pSmaFile->size) < 0) goto _err;
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
static int32_t tsdbDFileSetToJson(const void *pObj, SJson *pJson) {
int32_t code = 0;
SDFileSet *pDFileSet = (SDFileSet *)pObj;
if (tjsonAddIntegerToObject(pJson, "level", pDFileSet->diskId.level) < 0) goto _err;
if (tjsonAddIntegerToObject(pJson, "id", pDFileSet->diskId.id) < 0) goto _err;
if (tjsonAddIntegerToObject(pJson, "fid", pDFileSet->fid) < 0) goto _err;
if (tjsonAddObject(pJson, "head", tsdbHeadFileToJson, pDFileSet->pHeadFile) < 0) goto _err;
if (tjsonAddObject(pJson, "data", tsdbDataFileToJson, pDFileSet->pDataFile) < 0) goto _err;
if (tjsonAddObject(pJson, "last", tsdbLastFileToJson, pDFileSet->pLastFile) < 0) goto _err;
if (tjsonAddObject(pJson, "sma", tsdbSmaFileToJson, pDFileSet->pSmaFile) < 0) goto _err;
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
static int32_t tsdbFSStateToJsonStr(STsdbFSState *pState, char **ppData) {
int32_t code = 0;
SJson *pJson = NULL;
pJson = tjsonCreateObject();
if (pJson == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (tjsonAddObject(pJson, "DelFile", tsdbDelFileToJson, pState->pDelFile) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (tjsonAddTArray(pJson, "DFileSet", tsdbDFileSetToJson, pState->aDFileSet) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*ppData = tjsonToString(pJson);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
tjsonDelete(pJson);
return code;
_err:
return code;
}
static int32_t tsdbJsonStrToFSState(char *pData, STsdbFSState *pState) {
int32_t code = 0;
SJson *pJson = NULL;
pJson = tjsonParse(pData);
if (pJson == NULL) goto _err;
// if (tjsonToObject(pJson, "DelFile", tsdbJsonToDelFile, &pState->pDelFile) < 0) goto _err;
// if (tjsonToTArray(pJson, "DFIleSet", tsdbJsonToDFileSet, ) < 0) goto _err;
ASSERT(0);
tjsonDelete(pJson);
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
static int32_t tsdbCreateEmptyCurrent(const char *fname, STsdbFSState *pState) {
int32_t code = 0;
int64_t n;
int64_t size;
char *pData = NULL;
TdFilePtr pFD = NULL;
// to json str
code = tsdbFSStateToJsonStr(pState, &pData);
if (code) goto _err;
// create and write
pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
size = strlen(pData);
n = taosWriteFile(pFD, pData, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile(pFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosCloseFile(&pFD);
if (pData) taosMemoryFree(pData);
return code;
_err:
tsdbError("create empry current failed since %s", tstrerror(code));
if (pData) taosMemoryFree(pData);
return code;
}
static int32_t tsdbLoadCurrentState(STsdbFS *pFS, STsdbFSState *pState) {
int32_t code = 0;
int64_t size;
int64_t n;
char fname[TSDB_FILENAME_LEN];
char *pData = NULL;
TdFilePtr pFD;
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s/CURRENT", pFS->pTsdb->path);
if (!taosCheckExistFile(fname)) {
// create an empry CURRENT file if not exists
code = tsdbCreateEmptyCurrent(fname, pState);
if (code) goto _err;
} else {
// open the file and load
pFD = taosOpenFile(fname, TD_FILE_READ);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFStatFile(pFD, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pData = taosMemoryMalloc(size + 1);
if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pData[size] = '\0';
n = taosReadFile(pFD, pData, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosCloseFile(&pFD);
// decode
code = tsdbJsonStrToFSState(pData, pState);
if (code) goto _err;
}
if (pData) taosMemoryFree(pData);
return code;
_err:
tsdbError("vgId:%d tsdb load current state failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
if (pData) taosMemoryFree(pData);
return code;
}
static int32_t tsdbFSOpenImpl(STsdbFS *pFS) {
int32_t code = 0;
int64_t size;
int64_t n;
// read CURRENT file
code = tsdbLoadCurrentState(pFS, pFS->cState);
if (code) goto _err;
// decode the statue file
// code = tsdbDecodeFSState(pData, pFS->cState);
// if (code) goto _err;
// // scan and fix invalid tsdb FS
// code = tsdbScanAndFixFS(pFS);
// if (code) goto _err;
// if (pData) taosMemoryFree(pData);
return code;
_err:
// if (pData) taosMemoryFree(pData);
tsdbError("vgId:%d tsdb fs open impl failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbFSCloseImpl(STsdbFS *pFS) {
int32_t code = 0;
// TODO
return code;
}
// EXPOSED APIS ====================================================================================
int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) {
int32_t code = 0;
STsdbFS *pFS = NULL;
pFS = (STsdbFS *)taosMemoryCalloc(1, sizeof(*pFS));
if (pFS == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pFS->pTsdb = pTsdb;
code = taosThreadRwlockInit(&pFS->lock, NULL);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
goto _err;
}
pFS->inTxn = 0;
pFS->cState = (STsdbFSState *)taosMemoryCalloc(1, sizeof(STsdbFSState));
if (pFS->cState == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pFS->cState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet));
if (pFS->cState->aDFileSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pFS->nState = (STsdbFSState *)taosMemoryCalloc(1, sizeof(STsdbFSState));
if (pFS->nState == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pFS->nState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet));
if (pFS->nState->aDFileSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tsdbFSOpenImpl(pFS);
if (code) goto _err;
*ppFS = pFS;
return code;
_err:
*ppFS = NULL;
tsdbError("vgId:%d tsdb fs open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbFSClose(STsdbFS *pFS) {
int32_t code = 0;
// TODO
if (pFS) {
code = tsdbFSCloseImpl(pFS);
if (code) goto _err;
taosArrayDestroy(pFS->nState->aDFileSet);
taosMemoryFree(pFS->nState);
taosArrayDestroy(pFS->cState->aDFileSet);
taosMemoryFree(pFS->cState);
taosThreadRwlockDestroy(&pFS->lock);
taosMemoryFree(pFS);
}
return code;
_err:
tsdbError("vgId:%d tsdb fs close failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
return code;
}
......
......@@ -922,8 +922,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
// other columns
offset = 0;
tMapDataClear(&pSubBlock->mBlockCol);
for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) {
SColData *pColData = &pBlockData->aColData[iCol];
for (int32_t iCol = 0; iCol < taosArrayGetSize(pBlockData->apColData); iCol++) {
SColData *pColData = (SColData *)taosArrayGetP(pBlockData->apColData, iCol);
ASSERT(pColData->flags);
......
......@@ -445,7 +445,10 @@ int32_t tGetDelData(uint8_t *p, void *ph) {
int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile) {
int32_t n = 0;
n += tPutKEYINFO(p ? p + n : p, &pDelFile->info);
n += tPutI64(p ? p + n : p, pDelFile->minKey);
n += tPutI64(p ? p + n : p, pDelFile->maxKey);
n += tPutI64v(p ? p + n : p, pDelFile->minVersion);
n += tPutI64v(p ? p + n : p, pDelFile->maxVersion);
n += tPutI64v(p ? p + n : p, pDelFile->size);
n += tPutI64v(p ? p + n : p, pDelFile->offset);
......@@ -455,7 +458,10 @@ int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile) {
int32_t tGetDelFileHdr(uint8_t *p, SDelFile *pDelFile) {
int32_t n = 0;
n += tGetKEYINFO(p + n, &pDelFile->info);
n += tGetI64(p + n, &pDelFile->minKey);
n += tGetI64(p + n, &pDelFile->maxKey);
n += tGetI64v(p + n, &pDelFile->minVersion);
n += tGetI64v(p + n, &pDelFile->maxVersion);
n += tGetI64v(p + n, &pDelFile->size);
n += tGetI64v(p + n, &pDelFile->offset);
......@@ -500,14 +506,18 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
STColumn *pTColumn = &pTSchema->columns[iCol];
SValue value;
ASSERT(iCol > 0);
if (pRow->type == 0) {
// get from row (todo);
} else if (pRow->type == 1) {
SColData *pColData;
void *p;
void *p = NULL;
p = taosbsearch(&(SColData){.cid = pTColumn->colId}, pRow->pBlockData->aColData, pRow->pBlockData->nColData,
sizeof(SBlockCol), tColDataCmprFn, TD_EQ);
// TODO
ASSERT(0);
// p = taosbsearch(&(SColData){.cid = pTColumn->colId}, pRow->pBlockData->apColData, pRow->pBlockData->nColData,
// sizeof(SBlockCol), tColDataCmprFn, TD_EQ);
if (p) {
pColData = (SColData *)p;
ASSERT(pColData->flags);
......@@ -707,8 +717,12 @@ void tColDataReset(SColData *pColData) {
// TODO
}
void tColDataClear(SColData *pColData) {
// TODO
void tColDataClear(void *ph) {
SColData *pColData = (SColData *)ph;
tsdbFree(pColData->pBitMap);
tsdbFree((uint8_t *)pColData->pOfst);
tsdbFree(pColData->pData);
}
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal) {
......@@ -730,50 +744,77 @@ int32_t tColDataCmprFn(const void *p1, const void *p2) {
// SBlockData ======================================================
static int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData) {
int32_t code = 0;
int32_t nColData = pBlockData->nColData;
// int32_t nColData = pBlockData->nColData;
// pBlockData->nColData++;
// if (pBlockData->nColData > pBlockData->maxCol) {
// if (pBlockData->maxCol == 0) {
// pBlockData->maxCol = 16;
// } else {
// pBlockData->maxCol *= 2;
// }
// code = tsdbRealloc((uint8_t **)&pBlockData->apColData, sizeof(SColData *) * pBlockData->maxCol);
// if (code) goto _exit;
// code = tsdbRealloc((uint8_t **)&pBlockData->aColData, sizeof(SColData) * pBlockData->maxCol);
// if (code) goto _exit;
// for (int32_t iColData = nColData; iColData < pBlockData->maxCol; iColData++) {
// pBlockData->aColData[iColData] = tColDataInit();
// }
// }
pBlockData->nColData++;
if (pBlockData->nColData > pBlockData->maxCol) {
if (pBlockData->maxCol == 0) {
pBlockData->maxCol = 16;
} else {
pBlockData->maxCol *= 2;
}
// // memmove (todo)
// // int32_t size = sizeof(SColData *) * (nColData - iColData);
// // if (size) {
// // memmove();
// // }
code = tsdbRealloc((uint8_t **)&pBlockData->apColData, sizeof(SColData *) * pBlockData->maxCol);
if (code) goto _exit;
code = tsdbRealloc((uint8_t **)&pBlockData->aColData, sizeof(SColData) * pBlockData->maxCol);
if (code) goto _exit;
// pBlockData->apColData[iColData] = &pBlockData->aColData[nColData];
for (int32_t iColData = nColData; iColData < pBlockData->maxCol; iColData++) {
pBlockData->aColData[iColData] = tColDataInit();
}
}
_exit:
return code;
}
// memmove (todo)
// int32_t size = sizeof(SColData *) * (nColData - iColData);
// if (size) {
// memmove();
// }
int32_t tBlockDataInit(SBlockData *pBlockData) {
int32_t code = 0;
pBlockData->apColData[iColData] = &pBlockData->aColData[nColData];
*pBlockData = (SBlockData){0};
pBlockData->apColData = taosArrayInit(0, sizeof(SColData *));
if (pBlockData->apColData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pBlockData->aColData = taosArrayInit(0, sizeof(SColData));
if (pBlockData->aColData == NULL) {
taosArrayDestroy(pBlockData->apColData);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
_exit:
return code;
}
static int32_t tBlockDataAppendRow0(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
int32_t nRow = pBlockData->nRow;
TSDBKEY key = tsdbRowKey(pRow);
int32_t iColumn;
int32_t nColumn;
int32_t iColData;
SColVal colVal;
SColVal *pColVal = &colVal;
void tBlockDataReset(SBlockData *pBlockData) {
pBlockData->nRow = 0;
taosArrayClear(pBlockData->apColData);
}
ASSERT(pTSchema);
ASSERT(pTSchema->version == TSDBROW_SVERSION(pRow));
void tBlockDataClear(SBlockData *pBlockData) {
tsdbFree((uint8_t *)pBlockData->aVersion);
tsdbFree((uint8_t *)pBlockData->aTSKEY);
taosArrayDestroy(pBlockData->apColData);
taosArrayDestroyEx(pBlockData->aColData, tColDataClear);
}
static int32_t tBlockDataAppendRow0(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
static int32_t tBlockDataAppendRow1(SBlockData *pBlockData, TSDBROW *pRow);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
int32_t nRow = pBlockData->nRow;
TSDBKEY key = tsdbRowKey(pRow);
pBlockData->nRow++;
......@@ -785,126 +826,45 @@ static int32_t tBlockDataAppendRow0(SBlockData *pBlockData, TSDBROW *pRow, STSch
pBlockData->maxRow *= 2;
}
ASSERT(pBlockData->maxRow >= pBlockData->nRow);
code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->maxRow);
if (code) goto _err;
code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * pBlockData->maxRow);
if (code) goto _err;
code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->maxRow);
if (code) goto _err;
}
pBlockData->aVersion[nRow] = key.version;
pBlockData->aTSKEY[nRow] = key.ts;
// other cols
iColumn = 1;
nColumn = pTSchema->numOfCols;
iColData = 0;
while (iColumn < nColumn || iColData < pBlockData->nColData) {
STColumn *pTColumn = NULL;
SColData *pColData = NULL;
if (iColumn < nColumn) {
pTColumn = &pTSchema->columns[iColumn];
}
if (iColData < pBlockData->nColData) {
pColData = &pBlockData->aColData[iColData];
}
if (pTColumn && pColData) {
if (pTColumn->colId == pColData->cid) {
// tsdbRowGetColVal(pRow, pTSchema, iColumn, &cv);
} else if (pTColumn->colId < pColData->cid) {
// add a new SColData, and append the column value cv to the SColData
} else {
// add a None to the column value
}
} else if (pTColumn) {
// tsdbRowGetColVal(pRow, pTSchema, iColumn, &cv);
// add a new SColData, and append the column value cv to the SColData
} else {
iColData++;
}
}
return code;
_err:
return code;
}
static int32_t tBlockDataAppendRow1(SBlockData *pBlockData, TSDBROW *pRow) {
int32_t code = 0;
int32_t nRow = pBlockData->nRow;
TSDBKEY key = tsdbRowKey(pRow);
int32_t iColData;
int32_t iColDataRow;
int32_t nColDataRow;
SColVal colVal;
SColVal *pColVal = &colVal;
pBlockData->nRow++;
// aKey (TODO)
pBlockData->aVersion[nRow] = key.version;
pBlockData->aTSKEY[nRow] = key.ts;
// other cols
iColData = 0;
iColDataRow = 0;
nColDataRow = pRow->pBlockData->nColData;
while (iColData < pBlockData->nColData || iColDataRow < nColDataRow) {
SColData *pColData = NULL;
SColData *pColDataRow = NULL;
if (iColData < pBlockData->nColData) {
pColData = &pBlockData->aColData[iColData];
}
if (iColDataRow < nColDataRow) {
pColDataRow = &pRow->pBlockData->aColData[iColDataRow];
}
if (pColData && pColDataRow) {
if (pColData->cid == pColDataRow->cid) {
// TODO
} else if (pColData->cid < pColDataRow->cid) {
// TODO
// OTHER
#if 0
int32_t iColData = 0;
int32_t nColData = taosArrayGetSize(pBlockData->apColData);
SRowIter ri = tRowIterInit(pRow, pTSchema);
SColData *pColData = iColData < nColData ? (SColData *)taosArrayGetP(pBlockData->apColData, iColData) : NULL;
SColVal *pColVal = tRowIterNext(&ri);
while (true) {
if (pColData && pColVal) {
if (pColData->cid == pColVal->cid) {
// append SColVal to SColData
pColVal = tRowIterNext(&ri);
iColData++;
pColData = iColData < nColData ? (SColData *)taosArrayGetP(pBlockData->apColData, iColData) : NULL;
} else if (pColData->cid < pColVal->cid) {
// append a NONE
iColData++;
} else {
// TODO
// add a new SColData
}
} else if (pColData) {
// TODO
// add a NONE
} else {
// TODO
// add a new SColData and append value
}
}
#endif
return code;
}
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
TSDBKEY key = tsdbRowKey(pRow);
if (pRow->type == 0) {
code = tBlockDataAppendRow0(pBlockData, pRow, pTSchema);
} else if (pRow->type == 1) {
code = tBlockDataAppendRow1(pBlockData, pRow);
}
_err:
return code;
}
void tBlockDataReset(SBlockData *pBlockData) {
pBlockData->nRow = 0;
pBlockData->nColData = 0;
}
void tBlockDataClear(SBlockData *pBlockData) {
tsdbFree((uint8_t *)pBlockData->aVersion);
tsdbFree((uint8_t *)pBlockData->aTSKEY);
for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) {
tsdbFree(pBlockData->aColData[iCol].pBitMap);
tsdbFree(pBlockData->aColData[iCol].pData);
}
}
}
\ No newline at end of file
......@@ -79,7 +79,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
SJson *pNodeRetentions = tjsonCreateArray();
tjsonAddItemToObject(pJson, "retentions", pNodeRetentions);
for (int32_t i = 0; i < nRetention; ++i) {
SJson *pNodeRetention = tjsonCreateObject();
SJson *pNodeRetention = tjsonCreateObject();
const SRetention *pRetention = pCfg->tsdbCfg.retentions + i;
tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq);
tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit);
......@@ -118,45 +118,45 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
int32_t code;
tjsonGetNumberValue(pJson, "vgId", pCfg->vgId, code);
if(code < 0) return -1;
if (code < 0) return -1;
if (tjsonGetStringValue(pJson, "dbname", pCfg->dbname) < 0) return -1;
tjsonGetNumberValue(pJson, "dbId", pCfg->dbId, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "szPage", pCfg->szPage, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "szCache", pCfg->szCache, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "szBuf", pCfg->szBuf, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeap, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "isTsma", pCfg->isTsma, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "isRsma", pCfg->isRsma, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "slLevel", pCfg->tsdbCfg.slLevel, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2, code);
if(code < 0) return -1;
SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions");
if (code < 0) return -1;
SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions");
int32_t nRetention = tjsonGetArraySize(pNodeRetentions);
if (nRetention > TSDB_RETENTION_MAX) {
nRetention = TSDB_RETENTION_MAX;
......@@ -170,30 +170,30 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit, code);
}
tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "wal.segSize", pCfg->walCfg.segSize, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex, code);
if(code < 0) return -1;
if (code < 0) return -1;
SJson *pNodeInfoArr = tjsonGetObjectItem(pJson, "syncCfg.nodeInfo");
int arraySize = tjsonGetArraySize(pNodeInfoArr);
......
......@@ -311,9 +311,9 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) {
int32_t code;
tjsonGetNumberValue(pJson, "commit version", pState->committed, code);
if(code < 0) return -1;
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "applied version", pState->applied, code);
if(code < 0) return -1;
if (code < 0) return -1;
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册