diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 92f02d9ed084249f2843df02c27afae7d6531fd1..cb3509e983d77df8b3bc19971e83bb499ce204ed 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -63,6 +63,8 @@ typedef struct SDataFWriter SDataFWriter; typedef struct SDataFReader SDataFReader; typedef struct SDelFWriter SDelFWriter; typedef struct SDelFReader SDelFReader; +typedef struct SRowIter SRowIter; +typedef struct STsdbFS STsdbFS; #define TSDB_MAX_SUBBLOCKS 8 @@ -78,6 +80,9 @@ TSDBKEY tsdbRowKey(TSDBROW *pRow); void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); +// SRowIter +#define tRowIterInit(ROW, SCHEMA) ((SRowIter){.pRow = (ROW), .pSchema = (SCHEMA)}) +SColVal *tRowIterNext(SRowIter *pIter); // TABLEID int32_t tTABLEIDCmprFn(const void *p1, const void *p2); // TSDBKEY @@ -107,11 +112,11 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph); // SColdata #define tColDataInit() ((SColData){0}) void tColDataReset(SColData *pColData); -void tColDataClear(SColData *pColData); +void tColDataClear(void *ph); int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); int32_t tColDataCmprFn(const void *p1, const void *p2); // SBlockData -#define tBlockDataInit() ((SBlockData){0}) +int32_t tBlockDataInit(SBlockData *pBlockData); void tBlockDataReset(SBlockData *pBlockData); void tBlockDataClear(SBlockData *pBlockData); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema); @@ -160,14 +165,15 @@ void tsdbLastFileName(STsdb *pTsdb, SLastFile *pFile, char fname[]); // SSmaFile void tsdbSmaFileName(STsdb *pTsdb, SSmaFile *pFile, char fname[]); // SDelFile -#define tsdbDelFileCreate() ((SDelFile){.info = KEYINFO_INIT_VAL, .size = 0, .offset = 0}) +#define tsdbDelFileCreate() \ + ((SDelFile){ \ + .maxKey = TSKEY_MIN, .minKey = TSKEY_MAX, .maxVersion = -1, .minVersion = INT64_MAX, .size = 0, .offset = 0}) void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]); // tsdbFS.c ============================================================================================== -typedef struct STsdbFS STsdbFS; -int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS); -int32_t tsdbFSClose(STsdbFS *pFS); -int32_t tsdbFSStart(STsdbFS *pFS); -int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback); +int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS); +int32_t tsdbFSClose(STsdbFS *pFS); +int32_t tsdbFSStart(STsdbFS *pFS); +int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback); // tsdbReaderWriter.c ============================================================================================== // SDataFWriter int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet); @@ -200,7 +206,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB // tsdbCache int32_t tsdbOpenCache(STsdb *pTsdb); -void tsdbCloseCache(SLRUCache *pCache); +void tsdbCloseCache(SLRUCache *pCache); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row); int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow **ppRow); int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid); @@ -366,14 +372,12 @@ struct SColData { }; struct SBlockData { - int32_t maxRow; - int32_t nRow; - int64_t *aVersion; - TSKEY *aTSKEY; - int32_t maxCol; - int32_t nColData; - SColData **apColData; - SColData *aColData; + int32_t maxRow; + int32_t nRow; + int64_t *aVersion; + TSKEY *aTSKEY; + SArray *apColData; + SArray *aColData; }; // ================== TSDB global config @@ -414,7 +418,10 @@ struct SDelIdx { }; struct SDelFile { - KEYINFO info; + TSKEY minKey; + TSKEY maxKey; + int64_t minVersion; + int64_t maxVersion; int64_t size; int64_t offset; }; @@ -463,11 +470,19 @@ struct SSmaFile { struct SDFileSet { SDiskID diskId; - int32_t nRef; + int32_t fid; SHeadFile *pHeadFile; SDataFile *pDataFile; SLastFile *pLastFile; SSmaFile *pSmaFile; + int32_t nRef; +}; + +struct SRowIter { + TSDBROW *pRow; + STSchema *pSchema; + SColVal colVal; + int32_t i; }; #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index f5b89769359fa3abb7120d86c8ba88bafdbef5b3..04041f1dd10a854b9988a125999ad3a34965d74d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -15,25 +15,364 @@ #include "tsdb.h" +typedef struct { + SDelFile *pDelFile; + SArray *aDFileSet; // SArray +} STsdbFSState; + struct STsdbFS { - STsdb *pTsdb; - TdThreadRwlock lock; - int64_t minVersion; - int64_t maxVersion; - SDelFile *pTombstoneF; - STsdbCacheFile *pCacheF; - SArray *pArray; + STsdb *pTsdb; + TdThreadRwlock lock; + int8_t inTxn; + STsdbFSState *cState; + STsdbFSState *nState; }; -int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) { +// ================================================================================================= +static int32_t tsdbDelFileToJson(const void *pObj, SJson *pJson) { + int32_t code = 0; + SDelFile *pDelFile = (SDelFile *)pObj; + + if (tjsonAddIntegerToObject(pJson, "minKey", pDelFile->minKey) < 0) goto _err; + if (tjsonAddIntegerToObject(pJson, "maxKey", pDelFile->maxKey) < 0) goto _err; + if (tjsonAddIntegerToObject(pJson, "minVer", pDelFile->minVersion) < 0) goto _err; + if (tjsonAddIntegerToObject(pJson, "maxVer", pDelFile->maxVersion) < 0) goto _err; + + return code; + +_err: + code = TSDB_CODE_OUT_OF_MEMORY; + return code; +} + +static int32_t tsdbHeadFileToJson(const void *pObj, SJson *pJson) { + int32_t code = 0; + SHeadFile *pHeadFile = (SHeadFile *)pObj; + + if (tjsonAddIntegerToObject(pJson, "size", pHeadFile->size) < 0) goto _err; + if (tjsonAddIntegerToObject(pJson, "offset", pHeadFile->offset) < 0) goto _err; + + return code; + +_err: + code = TSDB_CODE_OUT_OF_MEMORY; + return code; +} + +static int32_t tsdbDataFileToJson(const void *pObj, SJson *pJson) { + int32_t code = 0; + SDataFile *pDataFile = (SDataFile *)pObj; + + if (tjsonAddIntegerToObject(pJson, "size", pDataFile->size) < 0) goto _err; + + return code; + +_err: + code = TSDB_CODE_OUT_OF_MEMORY; + return code; +} + +static int32_t tsdbLastFileToJson(const void *pObj, SJson *pJson) { + int32_t code = 0; + SLastFile *pLastFile = (SLastFile *)pObj; + + if (tjsonAddIntegerToObject(pJson, "size", pLastFile->size) < 0) goto _err; + + return code; + +_err: + code = TSDB_CODE_OUT_OF_MEMORY; + return code; +} + +static int32_t tsdbSmaFileToJson(const void *pObj, SJson *pJson) { + int32_t code = 0; + SSmaFile *pSmaFile = (SSmaFile *)pObj; + + if (tjsonAddIntegerToObject(pJson, "size", pSmaFile->size) < 0) goto _err; + + return code; + +_err: + code = TSDB_CODE_OUT_OF_MEMORY; + return code; +} + +static int32_t tsdbDFileSetToJson(const void *pObj, SJson *pJson) { + int32_t code = 0; + SDFileSet *pDFileSet = (SDFileSet *)pObj; + + if (tjsonAddIntegerToObject(pJson, "level", pDFileSet->diskId.level) < 0) goto _err; + if (tjsonAddIntegerToObject(pJson, "id", pDFileSet->diskId.id) < 0) goto _err; + if (tjsonAddIntegerToObject(pJson, "fid", pDFileSet->fid) < 0) goto _err; + if (tjsonAddObject(pJson, "head", tsdbHeadFileToJson, pDFileSet->pHeadFile) < 0) goto _err; + if (tjsonAddObject(pJson, "data", tsdbDataFileToJson, pDFileSet->pDataFile) < 0) goto _err; + if (tjsonAddObject(pJson, "last", tsdbLastFileToJson, pDFileSet->pLastFile) < 0) goto _err; + if (tjsonAddObject(pJson, "sma", tsdbSmaFileToJson, pDFileSet->pSmaFile) < 0) goto _err; + + return code; + +_err: + code = TSDB_CODE_OUT_OF_MEMORY; + return code; +} + +static int32_t tsdbFSStateToJsonStr(STsdbFSState *pState, char **ppData) { + int32_t code = 0; + SJson *pJson = NULL; + + pJson = tjsonCreateObject(); + if (pJson == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + if (tjsonAddObject(pJson, "DelFile", tsdbDelFileToJson, pState->pDelFile) < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + if (tjsonAddTArray(pJson, "DFileSet", tsdbDFileSetToJson, pState->aDFileSet) < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + *ppData = tjsonToString(pJson); + if (*ppData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + tjsonDelete(pJson); + + return code; + +_err: + return code; +} + +static int32_t tsdbJsonStrToFSState(char *pData, STsdbFSState *pState) { + int32_t code = 0; + SJson *pJson = NULL; + + pJson = tjsonParse(pData); + if (pJson == NULL) goto _err; + + // if (tjsonToObject(pJson, "DelFile", tsdbJsonToDelFile, &pState->pDelFile) < 0) goto _err; + // if (tjsonToTArray(pJson, "DFIleSet", tsdbJsonToDFileSet, ) < 0) goto _err; + ASSERT(0); + + tjsonDelete(pJson); + + return code; + +_err: + code = TSDB_CODE_OUT_OF_MEMORY; + return code; +} + +static int32_t tsdbCreateEmptyCurrent(const char *fname, STsdbFSState *pState) { + int32_t code = 0; + int64_t n; + int64_t size; + char *pData = NULL; + TdFilePtr pFD = NULL; + + // to json str + code = tsdbFSStateToJsonStr(pState, &pData); + if (code) goto _err; + + // create and write + pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE); + if (pFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + size = strlen(pData); + n = taosWriteFile(pFD, pData, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosFsyncFile(pFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + taosCloseFile(&pFD); + + if (pData) taosMemoryFree(pData); + return code; + +_err: + tsdbError("create empry current failed since %s", tstrerror(code)); + if (pData) taosMemoryFree(pData); + return code; +} + +static int32_t tsdbLoadCurrentState(STsdbFS *pFS, STsdbFSState *pState) { + int32_t code = 0; + int64_t size; + int64_t n; + char fname[TSDB_FILENAME_LEN]; + char *pData = NULL; + TdFilePtr pFD; + + snprintf(fname, TSDB_FILENAME_LEN - 1, "%s/CURRENT", pFS->pTsdb->path); + + if (!taosCheckExistFile(fname)) { + // create an empry CURRENT file if not exists + code = tsdbCreateEmptyCurrent(fname, pState); + if (code) goto _err; + } else { + // open the file and load + pFD = taosOpenFile(fname, TD_FILE_READ); + if (pFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosFStatFile(pFD, &size, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + pData = taosMemoryMalloc(size + 1); + if (pData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pData[size] = '\0'; + + n = taosReadFile(pFD, pData, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + taosCloseFile(&pFD); + + // decode + code = tsdbJsonStrToFSState(pData, pState); + if (code) goto _err; + } + + if (pData) taosMemoryFree(pData); + return code; + +_err: + tsdbError("vgId:%d tsdb load current state failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code)); + if (pData) taosMemoryFree(pData); + return code; +} + +static int32_t tsdbFSOpenImpl(STsdbFS *pFS) { + int32_t code = 0; + int64_t size; + int64_t n; + + // read CURRENT file + code = tsdbLoadCurrentState(pFS, pFS->cState); + if (code) goto _err; + + // decode the statue file + // code = tsdbDecodeFSState(pData, pFS->cState); + // if (code) goto _err; + + // // scan and fix invalid tsdb FS + // code = tsdbScanAndFixFS(pFS); + // if (code) goto _err; + + // if (pData) taosMemoryFree(pData); + return code; + +_err: + // if (pData) taosMemoryFree(pData); + tsdbError("vgId:%d tsdb fs open impl failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code)); + return code; +} + +int32_t tsdbFSCloseImpl(STsdbFS *pFS) { int32_t code = 0; // TODO return code; } +// EXPOSED APIS ==================================================================================== +int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) { + int32_t code = 0; + STsdbFS *pFS = NULL; + + pFS = (STsdbFS *)taosMemoryCalloc(1, sizeof(*pFS)); + if (pFS == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pFS->pTsdb = pTsdb; + + code = taosThreadRwlockInit(&pFS->lock, NULL); + if (code) { + code = TAOS_SYSTEM_ERROR(code); + goto _err; + } + + pFS->inTxn = 0; + + pFS->cState = (STsdbFSState *)taosMemoryCalloc(1, sizeof(STsdbFSState)); + if (pFS->cState == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pFS->cState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet)); + if (pFS->cState->aDFileSet == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + pFS->nState = (STsdbFSState *)taosMemoryCalloc(1, sizeof(STsdbFSState)); + if (pFS->nState == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pFS->nState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet)); + if (pFS->nState->aDFileSet == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + code = tsdbFSOpenImpl(pFS); + if (code) goto _err; + + *ppFS = pFS; + return code; + +_err: + *ppFS = NULL; + tsdbError("vgId:%d tsdb fs open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + int32_t tsdbFSClose(STsdbFS *pFS) { int32_t code = 0; - // TODO + + if (pFS) { + code = tsdbFSCloseImpl(pFS); + if (code) goto _err; + + taosArrayDestroy(pFS->nState->aDFileSet); + taosMemoryFree(pFS->nState); + taosArrayDestroy(pFS->cState->aDFileSet); + taosMemoryFree(pFS->cState); + taosThreadRwlockDestroy(&pFS->lock); + taosMemoryFree(pFS); + } + + return code; + +_err: + tsdbError("vgId:%d tsdb fs close failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index bb5b8769c18a7461c5102d994765ea50464fec71..f68771633b35876df41d05a1497fdb5164abc67d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -922,8 +922,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ // other columns offset = 0; tMapDataClear(&pSubBlock->mBlockCol); - for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) { - SColData *pColData = &pBlockData->aColData[iCol]; + for (int32_t iCol = 0; iCol < taosArrayGetSize(pBlockData->apColData); iCol++) { + SColData *pColData = (SColData *)taosArrayGetP(pBlockData->apColData, iCol); ASSERT(pColData->flags); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 5df0773ca43def878b95d0eb568fad317c85cc9d..cbc8c9424545e232c8fbc4b37859b30e558ce1e4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -445,7 +445,10 @@ int32_t tGetDelData(uint8_t *p, void *ph) { int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile) { int32_t n = 0; - n += tPutKEYINFO(p ? p + n : p, &pDelFile->info); + n += tPutI64(p ? p + n : p, pDelFile->minKey); + n += tPutI64(p ? p + n : p, pDelFile->maxKey); + n += tPutI64v(p ? p + n : p, pDelFile->minVersion); + n += tPutI64v(p ? p + n : p, pDelFile->maxVersion); n += tPutI64v(p ? p + n : p, pDelFile->size); n += tPutI64v(p ? p + n : p, pDelFile->offset); @@ -455,7 +458,10 @@ int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile) { int32_t tGetDelFileHdr(uint8_t *p, SDelFile *pDelFile) { int32_t n = 0; - n += tGetKEYINFO(p + n, &pDelFile->info); + n += tGetI64(p + n, &pDelFile->minKey); + n += tGetI64(p + n, &pDelFile->maxKey); + n += tGetI64v(p + n, &pDelFile->minVersion); + n += tGetI64v(p + n, &pDelFile->maxVersion); n += tGetI64v(p + n, &pDelFile->size); n += tGetI64v(p + n, &pDelFile->offset); @@ -500,14 +506,18 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal * STColumn *pTColumn = &pTSchema->columns[iCol]; SValue value; + ASSERT(iCol > 0); + if (pRow->type == 0) { // get from row (todo); } else if (pRow->type == 1) { SColData *pColData; - void *p; + void *p = NULL; - p = taosbsearch(&(SColData){.cid = pTColumn->colId}, pRow->pBlockData->aColData, pRow->pBlockData->nColData, - sizeof(SBlockCol), tColDataCmprFn, TD_EQ); + // TODO + ASSERT(0); + // p = taosbsearch(&(SColData){.cid = pTColumn->colId}, pRow->pBlockData->apColData, pRow->pBlockData->nColData, + // sizeof(SBlockCol), tColDataCmprFn, TD_EQ); if (p) { pColData = (SColData *)p; ASSERT(pColData->flags); @@ -707,8 +717,12 @@ void tColDataReset(SColData *pColData) { // TODO } -void tColDataClear(SColData *pColData) { - // TODO +void tColDataClear(void *ph) { + SColData *pColData = (SColData *)ph; + + tsdbFree(pColData->pBitMap); + tsdbFree((uint8_t *)pColData->pOfst); + tsdbFree(pColData->pData); } int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal) { @@ -730,50 +744,77 @@ int32_t tColDataCmprFn(const void *p1, const void *p2) { // SBlockData ====================================================== static int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData) { int32_t code = 0; - int32_t nColData = pBlockData->nColData; + // int32_t nColData = pBlockData->nColData; + + // pBlockData->nColData++; + // if (pBlockData->nColData > pBlockData->maxCol) { + // if (pBlockData->maxCol == 0) { + // pBlockData->maxCol = 16; + // } else { + // pBlockData->maxCol *= 2; + // } + + // code = tsdbRealloc((uint8_t **)&pBlockData->apColData, sizeof(SColData *) * pBlockData->maxCol); + // if (code) goto _exit; + // code = tsdbRealloc((uint8_t **)&pBlockData->aColData, sizeof(SColData) * pBlockData->maxCol); + // if (code) goto _exit; + + // for (int32_t iColData = nColData; iColData < pBlockData->maxCol; iColData++) { + // pBlockData->aColData[iColData] = tColDataInit(); + // } + // } - pBlockData->nColData++; - if (pBlockData->nColData > pBlockData->maxCol) { - if (pBlockData->maxCol == 0) { - pBlockData->maxCol = 16; - } else { - pBlockData->maxCol *= 2; - } + // // memmove (todo) + // // int32_t size = sizeof(SColData *) * (nColData - iColData); + // // if (size) { + // // memmove(); + // // } - code = tsdbRealloc((uint8_t **)&pBlockData->apColData, sizeof(SColData *) * pBlockData->maxCol); - if (code) goto _exit; - code = tsdbRealloc((uint8_t **)&pBlockData->aColData, sizeof(SColData) * pBlockData->maxCol); - if (code) goto _exit; + // pBlockData->apColData[iColData] = &pBlockData->aColData[nColData]; - for (int32_t iColData = nColData; iColData < pBlockData->maxCol; iColData++) { - pBlockData->aColData[iColData] = tColDataInit(); - } - } +_exit: + return code; +} - // memmove (todo) - // int32_t size = sizeof(SColData *) * (nColData - iColData); - // if (size) { - // memmove(); - // } +int32_t tBlockDataInit(SBlockData *pBlockData) { + int32_t code = 0; - pBlockData->apColData[iColData] = &pBlockData->aColData[nColData]; + *pBlockData = (SBlockData){0}; + pBlockData->apColData = taosArrayInit(0, sizeof(SColData *)); + if (pBlockData->apColData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + pBlockData->aColData = taosArrayInit(0, sizeof(SColData)); + if (pBlockData->aColData == NULL) { + taosArrayDestroy(pBlockData->apColData); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } _exit: return code; } -static int32_t tBlockDataAppendRow0(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { - int32_t code = 0; - int32_t nRow = pBlockData->nRow; - TSDBKEY key = tsdbRowKey(pRow); - int32_t iColumn; - int32_t nColumn; - int32_t iColData; - SColVal colVal; - SColVal *pColVal = &colVal; +void tBlockDataReset(SBlockData *pBlockData) { + pBlockData->nRow = 0; + taosArrayClear(pBlockData->apColData); +} - ASSERT(pTSchema); - ASSERT(pTSchema->version == TSDBROW_SVERSION(pRow)); +void tBlockDataClear(SBlockData *pBlockData) { + tsdbFree((uint8_t *)pBlockData->aVersion); + tsdbFree((uint8_t *)pBlockData->aTSKEY); + taosArrayDestroy(pBlockData->apColData); + taosArrayDestroyEx(pBlockData->aColData, tColDataClear); +} + +static int32_t tBlockDataAppendRow0(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema); +static int32_t tBlockDataAppendRow1(SBlockData *pBlockData, TSDBROW *pRow); + +int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { + int32_t code = 0; + int32_t nRow = pBlockData->nRow; + TSDBKEY key = tsdbRowKey(pRow); pBlockData->nRow++; @@ -785,126 +826,45 @@ static int32_t tBlockDataAppendRow0(SBlockData *pBlockData, TSDBROW *pRow, STSch pBlockData->maxRow *= 2; } - ASSERT(pBlockData->maxRow >= pBlockData->nRow); - - code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->maxRow); - if (code) goto _err; - code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * pBlockData->maxRow); if (code) goto _err; + code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->maxRow); + if (code) goto _err; } pBlockData->aVersion[nRow] = key.version; pBlockData->aTSKEY[nRow] = key.ts; - // other cols - iColumn = 1; - nColumn = pTSchema->numOfCols; - iColData = 0; - while (iColumn < nColumn || iColData < pBlockData->nColData) { - STColumn *pTColumn = NULL; - SColData *pColData = NULL; - - if (iColumn < nColumn) { - pTColumn = &pTSchema->columns[iColumn]; - } - if (iColData < pBlockData->nColData) { - pColData = &pBlockData->aColData[iColData]; - } - - if (pTColumn && pColData) { - if (pTColumn->colId == pColData->cid) { - // tsdbRowGetColVal(pRow, pTSchema, iColumn, &cv); - } else if (pTColumn->colId < pColData->cid) { - // add a new SColData, and append the column value cv to the SColData - } else { - // add a None to the column value - } - } else if (pTColumn) { - // tsdbRowGetColVal(pRow, pTSchema, iColumn, &cv); - // add a new SColData, and append the column value cv to the SColData - } else { - iColData++; - } - } - - return code; - -_err: - return code; -} - -static int32_t tBlockDataAppendRow1(SBlockData *pBlockData, TSDBROW *pRow) { - int32_t code = 0; - int32_t nRow = pBlockData->nRow; - TSDBKEY key = tsdbRowKey(pRow); - int32_t iColData; - int32_t iColDataRow; - int32_t nColDataRow; - SColVal colVal; - SColVal *pColVal = &colVal; - - pBlockData->nRow++; - - // aKey (TODO) - pBlockData->aVersion[nRow] = key.version; - pBlockData->aTSKEY[nRow] = key.ts; - - // other cols - iColData = 0; - iColDataRow = 0; - nColDataRow = pRow->pBlockData->nColData; - while (iColData < pBlockData->nColData || iColDataRow < nColDataRow) { - SColData *pColData = NULL; - SColData *pColDataRow = NULL; - - if (iColData < pBlockData->nColData) { - pColData = &pBlockData->aColData[iColData]; - } - if (iColDataRow < nColDataRow) { - pColDataRow = &pRow->pBlockData->aColData[iColDataRow]; - } - - if (pColData && pColDataRow) { - if (pColData->cid == pColDataRow->cid) { - // TODO - } else if (pColData->cid < pColDataRow->cid) { - // TODO +// OTHER +#if 0 + int32_t iColData = 0; + int32_t nColData = taosArrayGetSize(pBlockData->apColData); + SRowIter ri = tRowIterInit(pRow, pTSchema); + + SColData *pColData = iColData < nColData ? (SColData *)taosArrayGetP(pBlockData->apColData, iColData) : NULL; + SColVal *pColVal = tRowIterNext(&ri); + while (true) { + if (pColData && pColVal) { + if (pColData->cid == pColVal->cid) { + // append SColVal to SColData + pColVal = tRowIterNext(&ri); + iColData++; + pColData = iColData < nColData ? (SColData *)taosArrayGetP(pBlockData->apColData, iColData) : NULL; + } else if (pColData->cid < pColVal->cid) { + // append a NONE + iColData++; } else { - // TODO + // add a new SColData } } else if (pColData) { - // TODO + // add a NONE } else { - // TODO + // add a new SColData and append value } } +#endif return code; -} - -int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { - int32_t code = 0; - TSDBKEY key = tsdbRowKey(pRow); - - if (pRow->type == 0) { - code = tBlockDataAppendRow0(pBlockData, pRow, pTSchema); - } else if (pRow->type == 1) { - code = tBlockDataAppendRow1(pBlockData, pRow); - } +_err: return code; -} - -void tBlockDataReset(SBlockData *pBlockData) { - pBlockData->nRow = 0; - pBlockData->nColData = 0; -} - -void tBlockDataClear(SBlockData *pBlockData) { - tsdbFree((uint8_t *)pBlockData->aVersion); - tsdbFree((uint8_t *)pBlockData->aTSKEY); - for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) { - tsdbFree(pBlockData->aColData[iCol].pBitMap); - tsdbFree(pBlockData->aColData[iCol].pData); - } -} +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 79e63d5abc2a8947e7e8de3feeea267829c5f2f8..0ad388e726eb12902d080d714f25d943c993708d 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -79,7 +79,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { SJson *pNodeRetentions = tjsonCreateArray(); tjsonAddItemToObject(pJson, "retentions", pNodeRetentions); for (int32_t i = 0; i < nRetention; ++i) { - SJson *pNodeRetention = tjsonCreateObject(); + SJson *pNodeRetention = tjsonCreateObject(); const SRetention *pRetention = pCfg->tsdbCfg.retentions + i; tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq); tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit); @@ -118,45 +118,45 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { int32_t code; tjsonGetNumberValue(pJson, "vgId", pCfg->vgId, code); - if(code < 0) return -1; + if (code < 0) return -1; if (tjsonGetStringValue(pJson, "dbname", pCfg->dbname) < 0) return -1; tjsonGetNumberValue(pJson, "dbId", pCfg->dbId, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "szPage", pCfg->szPage, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "szCache", pCfg->szCache, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "szBuf", pCfg->szBuf, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeap, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "isTsma", pCfg->isTsma, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "isRsma", pCfg->isRsma, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "slLevel", pCfg->tsdbCfg.slLevel, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2, code); - if(code < 0) return -1; - SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions"); + if (code < 0) return -1; + SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions"); int32_t nRetention = tjsonGetArraySize(pNodeRetentions); if (nRetention > TSDB_RETENTION_MAX) { nRetention = TSDB_RETENTION_MAX; @@ -170,30 +170,30 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit, code); } tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "wal.segSize", pCfg->walCfg.segSize, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex, code); - if(code < 0) return -1; + if (code < 0) return -1; SJson *pNodeInfoArr = tjsonGetObjectItem(pJson, "syncCfg.nodeInfo"); int arraySize = tjsonGetArraySize(pNodeInfoArr); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 3715866bb88f3ae030d5f65c7ce938e69120f466..3d9d2d8efbdd5bd5d54afa554a1ddcba6bdc4c59 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -311,9 +311,9 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) { int32_t code; tjsonGetNumberValue(pJson, "commit version", pState->committed, code); - if(code < 0) return -1; + if (code < 0) return -1; tjsonGetNumberValue(pJson, "applied version", pState->applied, code); - if(code < 0) return -1; + if (code < 0) return -1; return 0; }