提交 7bc38466 编写于 作者: H Hongze Cheng

more code

上级 2b7fa84e
...@@ -171,3 +171,14 @@ option( ...@@ -171,3 +171,14 @@ option(
ON ON
) )
# open this flag to use dev code, make sure it is off in release version
option(
USE_DEV_CODE
"If use dev code"
ON
)
if (${USE_DEV_CODE})
add_definitions(-DUSE_DEV_CODE)
endif(USE_DEV_CODE)
\ No newline at end of file
...@@ -18,9 +18,9 @@ ...@@ -18,9 +18,9 @@
// extern dependencies // extern dependencies
typedef struct SSttFWriter SSttFWriter; typedef struct SSttFWriter SSttFWriter;
extern int32_t tsdbSttFWriterOpen(STsdb *pTsdb, SSttFile *pSttFile, SSttFWriter **ppWritter); extern int32_t tsdbSttFWriterOpen(STsdb *pTsdb, const SSttFile *pSttFile, SSttFWriter **ppWriter);
extern int32_t tsdbSttFWriterClose(SSttFWriter *pWritter); extern int32_t tsdbSttFWriterClose(SSttFWriter *pWriter);
extern int32_t tsdbSttFWriteRow(SSttFWriter *pWritter, int64_t suid, int64_t uid, TSDBROW *pRow); extern int32_t tsdbSttFWriteTSData(SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow);
typedef struct { typedef struct {
STsdb *pTsdb; STsdb *pTsdb;
...@@ -43,12 +43,23 @@ typedef struct { ...@@ -43,12 +43,23 @@ typedef struct {
} SCommitter; } SCommitter;
static int32_t tsdbCommitOpenWriter(SCommitter *pCommitter) { static int32_t tsdbCommitOpenWriter(SCommitter *pCommitter) {
int32_t code = 0; int32_t code;
// TODO int32_t lino;
SSttFile sttFile = {0}; // TODO
code = tsdbSttFWriterOpen(pCommitter->pTsdb, &sttFile, &pCommitter->pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
tstrerror(code), pCommitter->fid);
}
return code; return code;
} }
static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, int64_t suid, int64_t uid, TSDBROW *pRow) { static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDBROW *pRow) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
...@@ -57,7 +68,7 @@ static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, int64_t suid, int64 ...@@ -57,7 +68,7 @@ static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, int64_t suid, int64
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tsdbSttFWriteRow(pCommitter->pWriter, suid, uid, pRow); code = tsdbSttFWriteTSData(pCommitter->pWriter, tbid, pRow);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
...@@ -65,7 +76,7 @@ _exit: ...@@ -65,7 +76,7 @@ _exit:
tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code)); tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
} else { } else {
tsdbTrace("vgId:%d %s done, fid:%d suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64, tsdbTrace("vgId:%d %s done, fid:%d suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64,
TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, suid, uid, TSDBROW_KEY(pRow).ts, TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, tbid->suid, tbid->uid, TSDBROW_KEY(pRow).ts,
TSDBROW_KEY(pRow).version); TSDBROW_KEY(pRow).version);
} }
return 0; return 0;
...@@ -106,7 +117,7 @@ static int32_t tsdbCommitTSData(SCommitter *pCommitter) { ...@@ -106,7 +117,7 @@ static int32_t tsdbCommitTSData(SCommitter *pCommitter) {
nRow++; nRow++;
code = tsdbCommitWriteTSData(pCommitter, pTbData->suid, pTbData->uid, pRow); code = tsdbCommitWriteTSData(pCommitter, (TABLEID *)pTbData, pRow);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
...@@ -161,22 +172,16 @@ _exit: ...@@ -161,22 +172,16 @@ _exit:
} }
static int32_t tsdbCommitFSetStart(SCommitter *pCommitter) { static int32_t tsdbCommitFSetStart(SCommitter *pCommitter) {
int32_t code = 0;
int32_t lino = 0;
pCommitter->fid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision); pCommitter->fid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
tsdbFidKeyRange(pCommitter->fid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey, tsdbFidKeyRange(pCommitter->fid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
&pCommitter->maxKey); &pCommitter->maxKey);
pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec()); pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec());
pCommitter->nextKey = TSKEY_MAX; pCommitter->nextKey = TSKEY_MAX;
// TODO tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d",
TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, pCommitter->minKey, pCommitter->maxKey,
_exit: pCommitter->expLevel);
if (code) { return 0;
tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
}
return code;
} }
static int32_t tsdbCommitFSetEnd(SCommitter *pCommitter) { static int32_t tsdbCommitFSetEnd(SCommitter *pCommitter) {
...@@ -221,13 +226,26 @@ _exit: ...@@ -221,13 +226,26 @@ _exit:
static int32_t tsdbCommitterOpen(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCommitter) { static int32_t tsdbCommitterOpen(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino;
// set config
memset(pCommitter, 0, sizeof(SCommitter)); memset(pCommitter, 0, sizeof(SCommitter));
pCommitter->pTsdb = pTsdb; pCommitter->pTsdb = pTsdb;
pCommitter->nextKey = pTsdb->imem->minKey; // TODO pCommitter->minutes = pTsdb->keepCfg.days;
pCommitter->precision = pTsdb->keepCfg.precision;
pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows;
pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows;
pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression;
pCommitter->sttTrigger = 0; // TODO
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
if (pCommitter->aTbDataP == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
// TODO // start loop
pCommitter->nextKey = pTsdb->imem->minKey; // TODO
_exit: _exit:
if (code) { if (code) {
......
...@@ -7,44 +7,97 @@ extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsd ...@@ -7,44 +7,97 @@ extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsd
extern void tsdbCloseFile(STsdbFD **ppFD); extern void tsdbCloseFile(STsdbFD **ppFD);
struct SSttFWriter { struct SSttFWriter {
STsdb *pTsdb; STsdb *pTsdb;
STsdbFD *pFd;
SSttFile file; SSttFile file;
SBlockData bData; SBlockData bData;
SArray *aSttBlk; SArray *aSttBlk;
STsdbFD *pFd;
SSkmInfo *pSkmTb;
SSkmInfo *pSkmRow;
int32_t maxRow;
}; };
int32_t tsdbSttFWriterOpen(STsdb *pTsdb, SSttFile *pSttFile, SSttFWriter **ppWritter) { static int32_t tsdbSttFWriteTSDataBlock(SSttFWriter *pWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; // TODO
return code;
}
int32_t tsdbSttFWriterOpen(STsdb *pTsdb, const SSttFile *pSttFile, SSttFWriter **ppWriter) {
int32_t code = 0;
int32_t lino = 0;
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; // TODO int32_t flag = TD_FILE_READ | TD_FILE_WRITE;
char fname[TSDB_FILENAME_LEN];
ppWritter[0] = taosMemoryCalloc(1, sizeof(SSttFWriter)); ppWriter[0] = taosMemoryCalloc(1, sizeof(SSttFWriter));
if (ppWritter[0] == NULL) { if (ppWriter[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
ppWritter[0]->pTsdb = pTsdb; ppWriter[0]->pTsdb = pTsdb;
ppWritter[0]->file = pSttFile[0]; ppWriter[0]->file = pSttFile[0];
code = tsdbOpenFile(NULL, szPage, flag, &ppWritter[0]->pFd); tBlockDataCreate(&ppWriter[0]->bData);
ppWriter[0]->aSttBlk = taosArrayInit(64, sizeof(SSttBlk));
if (ppWriter[0]->aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (pSttFile->size == 0) flag |= (TD_FILE_CREATE | TD_FILE_TRUNC);
tsdbSttFileName(pTsdb, (SDiskID){0}, 0, &ppWriter[0]->file, fname); // TODO
code = tsdbOpenFile(fname, szPage, flag, &ppWriter[0]->pFd);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
if (ppWriter[0]) {
tBlockDataDestroy(&ppWriter[0]->bData);
taosArrayDestroy(ppWriter[0]->aSttBlk);
taosMemoryFree(ppWriter[0]);
ppWriter[0] = NULL;
}
} }
return 0; return 0;
} }
int32_t tsdbSttFWriterClose(SSttFWriter *pWritter) { int32_t tsdbSttFWriterClose(SSttFWriter *pWriter) {
// TODO // TODO
return 0; return 0;
} }
int32_t tsdbSttFWriteRow(SSttFWriter *pWritter, int64_t suid, int64_t uid, TSDBROW *pRow) { int32_t tsdbSttFWriteTSData(SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow) {
int32_t code = 0;
int32_t lino = 0;
if (!TABLE_SAME_SCHEMA(pWriter->bData.suid, pWriter->bData.uid, tbid->suid, tbid->uid)) {
if (pWriter->bData.nRow > 0) {
code = tsdbSttFWriteTSDataBlock(pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataInit(&pWriter->bData, tbid, NULL /* TODO */, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataAppendRow(&pWriter->bData, pRow, NULL /* TODO */, tbid->uid);
TSDB_CHECK_CODE(code, lino, _exit);
if (pWriter->bData.nRow >= pWriter->maxRow) {
code = tsdbSttFWriteTSDataBlock(pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return 0;
}
int32_t tsdbSttFWriteDLData(SSttFWriter *pWriter, int64_t suid, int64_t uid, int64_t version) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
......
...@@ -1161,8 +1161,6 @@ int32_t tsdbBuildDeleteSkyline2(SArray *aDelData, int32_t sidx, int32_t eidx, SA ...@@ -1161,8 +1161,6 @@ int32_t tsdbBuildDeleteSkyline2(SArray *aDelData, int32_t sidx, int32_t eidx, SA
// SBlockData ====================================================== // SBlockData ======================================================
int32_t tBlockDataCreate(SBlockData *pBlockData) { int32_t tBlockDataCreate(SBlockData *pBlockData) {
int32_t code = 0;
pBlockData->suid = 0; pBlockData->suid = 0;
pBlockData->uid = 0; pBlockData->uid = 0;
pBlockData->nRow = 0; pBlockData->nRow = 0;
...@@ -1171,9 +1169,7 @@ int32_t tBlockDataCreate(SBlockData *pBlockData) { ...@@ -1171,9 +1169,7 @@ int32_t tBlockDataCreate(SBlockData *pBlockData) {
pBlockData->aTSKEY = NULL; pBlockData->aTSKEY = NULL;
pBlockData->nColData = 0; pBlockData->nColData = 0;
pBlockData->aColData = NULL; pBlockData->aColData = NULL;
return 0;
_exit:
return code;
} }
void tBlockDataDestroy(SBlockData *pBlockData) { void tBlockDataDestroy(SBlockData *pBlockData) {
......
...@@ -16,6 +16,11 @@ ...@@ -16,6 +16,11 @@
#include "vnd.h" #include "vnd.h"
#include "vnodeInt.h" #include "vnodeInt.h"
#ifdef USE_DEV_CODE
extern int32_t tsdbPreCommit(STsdb *pTsdb);
extern int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo);
#endif
#define VND_INFO_FNAME_TMP "vnode_tmp.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json"
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
...@@ -314,7 +319,11 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { ...@@ -314,7 +319,11 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
#ifdef USE_DEV_CODE
tsdbPreCommit(pVnode->pTsdb);
#else
tsdbPrepareCommit(pVnode->pTsdb); tsdbPrepareCommit(pVnode->pTsdb);
#endif
metaPrepareAsyncCommit(pVnode->pMeta); metaPrepareAsyncCommit(pVnode->pMeta);
...@@ -449,8 +458,12 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { ...@@ -449,8 +458,12 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed); syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed);
// commit each sub-system // commit each sub-system
#ifdef USE_DEV_CODE
code = tsdbCommitBegin(pVnode->pTsdb, pInfo);
#else
code = tsdbCommit(pVnode->pTsdb, pInfo); code = tsdbCommit(pVnode->pTsdb, pInfo);
#endif
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (VND_IS_RSMA(pVnode)) { if (VND_IS_RSMA(pVnode)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册