提交 f37a072e 编写于 作者: H Hongze Cheng

more work

上级 4f06c4bb
...@@ -53,7 +53,6 @@ typedef struct { ...@@ -53,7 +53,6 @@ typedef struct {
// last // last
SArray *aBlockL; // SArray<SBlockL> SArray *aBlockL; // SArray<SBlockL>
int32_t iBlockL; int32_t iBlockL;
SBlockL *pBlockL;
SBlockData bDatal; SBlockData bDatal;
int32_t iRow; int32_t iRow;
SRowInfo *pRowInfo; SRowInfo *pRowInfo;
...@@ -290,6 +289,54 @@ _err: ...@@ -290,6 +289,54 @@ _err:
return code; return code;
} }
static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
int32_t code = 0;
if (pCommitter->skmTable.pTSchema) {
if (pCommitter->skmTable.suid == suid) {
if (suid == 0) {
if (pCommitter->skmTable.uid == uid && sver == pCommitter->skmTable.pTSchema->version) goto _exit;
} else {
if (sver == pCommitter->skmTable.pTSchema->version) goto _exit;
}
}
}
pCommitter->skmTable.suid = suid;
pCommitter->skmTable.uid = uid;
tTSchemaDestroy(pCommitter->skmTable.pTSchema);
code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmTable.pTSchema);
if (code) goto _exit;
_exit:
return code;
}
static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
int32_t code = 0;
if (pCommitter->skmRow.pTSchema) {
if (pCommitter->skmRow.suid == suid) {
if (suid == 0) {
if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit;
} else {
if (sver == pCommitter->skmRow.pTSchema->version) goto _exit;
}
}
}
pCommitter->skmRow.suid = suid;
pCommitter->skmRow.uid = uid;
tTSchemaDestroy(pCommitter->skmRow.pTSchema);
code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
if (code) {
goto _exit;
}
_exit:
return code;
}
static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) { static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
...@@ -308,8 +355,17 @@ static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) { ...@@ -308,8 +355,17 @@ static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) {
} else { } else {
pCommitter->dReader.iBlockL++; pCommitter->dReader.iBlockL++;
if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) { if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) {
pCommitter->dReader.pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL); SBlockL *pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL);
code = tsdbReadLastBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockL, pBlockDatal); int64_t suid = pBlockL->suid;
int64_t uid = pBlockL->maxUid;
code = tsdbCommitterUpdateTableSchema(pCommitter, suid, uid, 1 /*TODO*/);
if (code) goto _exit;
code = tBlockDataInit(pBlockDatal, suid, suid ? 0 : uid, pCommitter->skmTable.pTSchema);
if (code) goto _exit;
code = tsdbReadLastBlock(pCommitter->dReader.pReader, pBlockL, pBlockDatal);
if (code) goto _exit; if (code) goto _exit;
pCommitter->dReader.iRow = 0; pCommitter->dReader.iRow = 0;
...@@ -444,54 +500,6 @@ _err: ...@@ -444,54 +500,6 @@ _err:
return code; return code;
} }
static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
int32_t code = 0;
if (pCommitter->skmTable.pTSchema) {
if (pCommitter->skmTable.suid == suid) {
if (suid == 0) {
if (pCommitter->skmTable.uid == uid && sver == pCommitter->skmTable.pTSchema->version) goto _exit;
} else {
if (sver == pCommitter->skmTable.pTSchema->version) goto _exit;
}
}
}
pCommitter->skmTable.suid = suid;
pCommitter->skmTable.uid = uid;
tTSchemaDestroy(pCommitter->skmTable.pTSchema);
code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmTable.pTSchema);
if (code) goto _exit;
_exit:
return code;
}
static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
int32_t code = 0;
if (pCommitter->skmRow.pTSchema) {
if (pCommitter->skmRow.suid == suid) {
if (suid == 0) {
if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit;
} else {
if (sver == pCommitter->skmRow.pTSchema->version) goto _exit;
}
}
}
pCommitter->skmRow.suid = suid;
pCommitter->skmRow.uid = uid;
tTSchemaDestroy(pCommitter->skmRow.pTSchema);
code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
if (code) {
goto _exit;
}
_exit:
return code;
}
static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) { static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) {
int32_t code = 0; int32_t code = 0;
SBlockData *pBlockData = &pCommitter->dWriter.bData; SBlockData *pBlockData = &pCommitter->dWriter.bData;
...@@ -779,13 +787,13 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { ...@@ -779,13 +787,13 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) {
STbData *pTbData = pIter->pTbData; STbData *pTbData = pIter->pTbData;
int32_t nRow = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}); int32_t nRow = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN});
if (pCommitter->dReader.pRowInfo) { if (pCommitter->dReader.pRowInfo && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pRowInfo) == 0) {
if (pCommitter->dReader.pRowInfo->suid) { if (pCommitter->dReader.pRowInfo->suid) { // super table
for (int32_t iRow = pCommitter->dReader.iRow; iRow < pCommitter->dReader.bDatal.nRow; iRow++) { for (int32_t iRow = pCommitter->dReader.iRow; iRow < pCommitter->dReader.bDatal.nRow; iRow++) {
if (pTbData->uid != pCommitter->dReader.bDatal.aUid[iRow]) break; if (pTbData->uid != pCommitter->dReader.bDatal.aUid[iRow]) break;
nRow++; nRow++;
} }
} else { } else { // normal table
ASSERT(pCommitter->dReader.iRow == 0); ASSERT(pCommitter->dReader.iRow == 0);
nRow += pCommitter->dReader.bDatal.nRow; nRow += pCommitter->dReader.bDatal.nRow;
} }
...@@ -812,10 +820,13 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { ...@@ -812,10 +820,13 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) {
pBlockData = &pCommitter->dWriter.bDatal; pBlockData = &pCommitter->dWriter.bDatal;
// commit and reset block data schema if need // commit and reset block data schema if need
if (pBlockData->nRow > 0) { // QUESTION: Is there a case that pBlockData->nRow == 0 but need to change schema ?
if (pBlockData->suid != pTbData->suid || pBlockData->suid == 0) { if (pBlockData->suid || pBlockData->uid) {
code = tsdbCommitLastBlock(pCommitter); if (pBlockData->suid != pTbData->uid || pBlockData->suid == 0) {
if (code) goto _err; if (pBlockData->nRow > 0) {
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err;
}
tBlockDataReset(pBlockData); tBlockDataReset(pBlockData);
} }
...@@ -962,10 +973,12 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { ...@@ -962,10 +973,12 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
pBlock = NULL; pBlock = NULL;
} }
code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid, pTbData->maxSkmVer); code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid, pTbData->maxSkmVer /*TODO*/);
if (code) goto _err; if (code) goto _err;
tMapDataReset(&pCommitter->dWriter.mBlock); tMapDataReset(&pCommitter->dWriter.mBlock);
code = tBlockDataInit(&pCommitter->dReader.bData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema);
if (code) goto _err;
code = tBlockDataInit(&pCommitter->dWriter.bData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema); code = tBlockDataInit(&pCommitter->dWriter.bData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema);
if (code) goto _err; if (code) goto _err;
...@@ -1130,11 +1143,12 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { ...@@ -1130,11 +1143,12 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
ASSERT(pBlockDataR->nRow > 0); ASSERT(pBlockDataR->nRow > 0);
// commit and reset block data schema if need // commit and reset block data schema if need
if (pBlockDataW->nRow > 0) { if (pBlockDataW->suid || pBlockDataW->uid) {
if (pBlockDataW->suid != pCommitter->dReader.pRowInfo->suid || pBlockDataW->suid == 0) { if (pBlockDataW->suid != suid || pBlockDataW->suid == 0) {
code = tsdbCommitLastBlock(pCommitter); if (pBlockDataW->nRow > 0) {
if (code) goto _err; code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err;
}
tBlockDataReset(pBlockDataW); tBlockDataReset(pBlockDataW);
} }
} }
...@@ -1151,9 +1165,10 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { ...@@ -1151,9 +1165,10 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
// check if it can make sure that one table data in one block // check if it can make sure that one table data in one block
int32_t nRow = 0; int32_t nRow = 0;
if (pBlockDataR->suid) { if (pBlockDataR->suid) {
for (int32_t iRow = pCommitter->dReader.iRow; (iRow < pBlockDataR->nRow) && (pBlockDataR->aUid[iRow] == uid); int32_t iRow = pCommitter->dReader.iRow;
iRow++) { while ((iRow < pBlockDataR->nRow) && (pBlockDataR->aUid[iRow] == uid)) {
nRow++; nRow++;
iRow++;
} }
} else { } else {
ASSERT(pCommitter->dReader.iRow == 0); ASSERT(pCommitter->dReader.iRow == 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册