提交 393ab4de 编写于 作者: H Hongze Cheng

more code

上级 abd7f45f
...@@ -20,6 +20,32 @@ typedef struct { ...@@ -20,6 +20,32 @@ typedef struct {
STSchema *pTSchema; STSchema *pTSchema;
} SSkmInfo; } SSkmInfo;
typedef struct {
int64_t suid;
int64_t uid;
TSDBROW row;
} SRowInfo;
typedef struct {
SRBTreeNode n;
SRowInfo r;
int8_t type;
union {
struct {
SArray *aTbDataP;
int32_t iTbDataP;
STbDataIter iter;
}; // memory data iter
struct {
int32_t iLast;
SArray *aBlockL;
int32_t iBlockL;
SBlockData bData;
int32_t iRow;
}; // last file data iter
};
} SDataIter;
typedef struct { typedef struct {
STsdb *pTsdb; STsdb *pTsdb;
int8_t toMerge; int8_t toMerge;
...@@ -47,6 +73,10 @@ typedef struct { ...@@ -47,6 +73,10 @@ typedef struct {
SMapData mBlock; // SMapData<SBlock> SMapData mBlock; // SMapData<SBlock>
SBlockData bData; SBlockData bData;
} dReader; } dReader;
struct {
SDataIter *pIter;
SRBTree rbt;
};
struct { struct {
SDataFWriter *pWriter; SDataFWriter *pWriter;
SArray *aBlockIdx; // SArray<SBlockIdx> SArray *aBlockIdx; // SArray<SBlockIdx>
...@@ -65,6 +95,9 @@ typedef struct { ...@@ -65,6 +95,9 @@ typedef struct {
SArray *aDelData; // SArray<SDelData> SArray *aDelData; // SArray<SDelData>
} SCommitter; } SCommitter;
extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL,
SBlockData *pBlockData); // todo
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter); static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter); static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter); static int32_t tsdbCommitDel(SCommitter *pCommitter);
...@@ -1289,28 +1322,7 @@ _err: ...@@ -1289,28 +1322,7 @@ _err:
return code; return code;
} }
// Merger ===================================================================== // ================================================================================
typedef struct {
int64_t suid;
int64_t uid;
TSDBROW row;
} SRowInfo;
typedef struct {
SRowInfo rowInfo;
SDataFReader *pReader;
int32_t iLast;
SArray *aBlockL; // SArray<SBlockL>
int32_t iBlockL;
SBlockData bData;
int32_t iRow;
} SLDataIter;
typedef struct {
SRBTreeNode *pNode;
SRBTree rbt;
} SDataMerger;
static int32_t tRowInfoCmprFn(const void *p1, const void *p2) { static int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
SRowInfo *pInfo1 = (SRowInfo *)p1; SRowInfo *pInfo1 = (SRowInfo *)p1;
SRowInfo *pInfo2 = (SRowInfo *)p2; SRowInfo *pInfo2 = (SRowInfo *)p2;
...@@ -1330,6 +1342,165 @@ static int32_t tRowInfoCmprFn(const void *p1, const void *p2) { ...@@ -1330,6 +1342,165 @@ static int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row); return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row);
} }
static int32_t tsdbNextCommitRow(SCommitter *pCommitter, SRowInfo **ppInfo) {
int32_t code = 0;
if (pCommitter->pIter) {
SDataIter *pIter = pCommitter->pIter;
if (pCommitter->pIter->type == 0) { // memory
tsdbTbDataIterNext(&pIter->iter);
TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
while (true) {
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
pRow = NULL;
}
if (pRow) {
pIter->r.suid = pIter->iter.pTbData->suid;
pIter->r.uid = pIter->iter.pTbData->uid;
pIter->r.row = *pRow;
break;
}
pIter->iTbDataP++;
if (pIter->iTbDataP < taosArrayGetSize(pIter->aTbDataP)) {
STbData *pTbData = (STbData *)taosArrayGetP(pIter->aTbDataP, pIter->iTbDataP);
TSDBKEY keyFrom = {.ts = pCommitter->minKey, .version = VERSION_MIN};
tsdbTbDataIterOpen(pTbData, &keyFrom, 0, &pIter->iter);
pRow = tsdbTbDataIterGet(&pIter->iter);
continue;
} else {
pCommitter->pIter = NULL;
break;
}
}
} else if (pCommitter->pIter->type == 1) { // last file
pIter->iRow++;
if (pIter->iRow < pIter->bData.nRow) {
pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
} else {
pIter->iBlockL++;
if (pIter->iBlockL < taosArrayGetSize(pIter->aBlockL)) {
SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL);
code = tsdbReadLastBlockEx(pCommitter->dReader.pReader, pIter->iLast, pBlockL, &pIter->bData);
if (code) goto _exit;
pIter->iRow = 0;
pIter->r.suid = pIter->bData.suid;
pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);
} else {
pCommitter->pIter = NULL;
}
}
} else {
ASSERT(0);
}
// compare with min in RB Tree
pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
if (pCommitter->pIter && pIter) {
int32_t c = tRowInfoCmprFn(&pCommitter->pIter->r, &pIter->r);
if (c > 0) {
tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
pCommitter->pIter = NULL;
} else {
ASSERT(c);
}
}
}
if (pCommitter->pIter == NULL) {
pCommitter->pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
if (pCommitter->pIter) {
tRBTreeDrop(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
}
}
if (pCommitter->pIter) {
*ppInfo = &pCommitter->pIter->r;
} else {
*ppInfo = NULL;
}
_exit:
return code;
}
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
int32_t code = 0;
SRowInfo *pRowInfo = NULL;
TABLEID id = {0};
while (true) {
code = tsdbNextCommitRow(pCommitter, &pRowInfo);
if (code) goto _err;
if (pRowInfo == NULL) {
// end the commit (todo)
break;
}
if (id.suid != pRowInfo->suid || id.uid != pRowInfo->uid) {
// table changed, end current table commit (todo)
// prepare the new
id.suid = pRowInfo->suid;
id.uid = pRowInfo->uid;
}
SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx;
if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) {
while (true) {
/* code */
}
}
if (pRowInfo->row.type == 0) {
code = tsdbCommitterUpdateRowSchema(pCommitter, pRowInfo->suid, pRowInfo->uid, TSDBROW_SVERSION(&pRowInfo->row));
if (code) goto _err;
}
code = tBlockDataAppendRow(&pCommitter->dWriter.bData, &pRowInfo->row, pCommitter->skmRow.pTSchema, pRowInfo->uid);
if (code) goto _err;
if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow * 4 / 5) {
if (1 /*toLastOnly*/) {
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err;
} else {
code = tsdbCommitDataBlock(pCommitter, NULL);
if (code) goto _err;
}
}
}
return code;
_err:
tsdbError("vgId:%d tsdb commit file data impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
// ================================================================================
typedef struct {
SRowInfo rowInfo;
SDataFReader *pReader;
int32_t iLast;
SArray *aBlockL; // SArray<SBlockL>
int32_t iBlockL;
SBlockData bData;
int32_t iRow;
} SLDataIter;
typedef struct {
SRBTreeNode *pNode;
SRBTree rbt;
} SDataMerger;
static void tDataMergerInit(SDataMerger *pMerger, SArray *aNodeP) { static void tDataMergerInit(SDataMerger *pMerger, SArray *aNodeP) {
pMerger->pNode = NULL; pMerger->pNode = NULL;
tRBTreeCreate(&pMerger->rbt, tRowInfoCmprFn); tRBTreeCreate(&pMerger->rbt, tRowInfoCmprFn);
...@@ -1341,9 +1512,6 @@ static void tDataMergerInit(SDataMerger *pMerger, SArray *aNodeP) { ...@@ -1341,9 +1512,6 @@ static void tDataMergerInit(SDataMerger *pMerger, SArray *aNodeP) {
} }
} }
extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL,
SBlockData *pBlockData); // todo
static int32_t tDataMergeNext(SDataMerger *pMerger, SRowInfo **ppInfo) { static int32_t tDataMergeNext(SDataMerger *pMerger, SRowInfo **ppInfo) {
int32_t code = 0; int32_t code = 0;
...@@ -1401,7 +1569,6 @@ _exit: ...@@ -1401,7 +1569,6 @@ _exit:
return code; return code;
} }
// ================================================================================
typedef struct { typedef struct {
STsdb *pTsdb; STsdb *pTsdb;
int8_t maxLast; int8_t maxLast;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册