提交 ac625921 编写于 作者: H Hongze Cheng

more code

上级 efff4e77
...@@ -63,7 +63,7 @@ int32_t tsdbSttFileWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter ...@@ -63,7 +63,7 @@ int32_t tsdbSttFileWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter
int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, TFileOpArray *opArray); int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, TFileOpArray *opArray);
int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row); int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row);
int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *pBlockData); int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *pBlockData);
int32_t tsdbSttFileWriteDLData(SSttFileWriter *writer, TABLEID *tbid, SDelData *pDelData); int32_t tsdbSttFileWriteDelRecord(SSttFileWriter *writer, const SDelRecord *record);
struct SSttFileWriterConfig { struct SSttFileWriterConfig {
STsdb *tsdb; STsdb *tsdb;
......
...@@ -63,8 +63,6 @@ int32_t tDelBlockInit(SDelBlock *delBlock); ...@@ -63,8 +63,6 @@ int32_t tDelBlockInit(SDelBlock *delBlock);
int32_t tDelBlockFree(SDelBlock *delBlock); int32_t tDelBlockFree(SDelBlock *delBlock);
int32_t tDelBlockClear(SDelBlock *delBlock); int32_t tDelBlockClear(SDelBlock *delBlock);
int32_t tDelBlockPut(SDelBlock *delBlock, const SDelRecord *delRecord); int32_t tDelBlockPut(SDelBlock *delBlock, const SDelRecord *delRecord);
int32_t tDelBlockEncode(SDelBlock *delBlock, void *buf, int32_t size);
int32_t tDelBlockDecode(const void *buf, SDelBlock *delBlock);
// STbStatisBlock ---------- // STbStatisBlock ----------
#define STATIS_RECORD_NUM_ELEM 9 #define STATIS_RECORD_NUM_ELEM 9
......
...@@ -217,45 +217,47 @@ _exit: ...@@ -217,45 +217,47 @@ _exit:
static int32_t tsdbCommitDelData(SCommitter2 *committer) { static int32_t tsdbCommitDelData(SCommitter2 *committer) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino = 0;
return 0; SMemTable *mem = committer->tsdb->imem;
#if 0 if (mem->nDel == 0) goto _exit;
ASSERTS(0, "TODO: Not implemented yet");
int64_t nDel = 0; SRBTreeIter iter[1] = {tRBTreeIterCreate(committer->tsdb->imem->tbDataTree, 1)};
SMemTable *pMem = committer->tsdb->imem;
if (pMem->nDel == 0) { // no del data for (SRBTreeNode *node = tRBTreeIterNext(iter); node; node = tRBTreeIterNext(iter)) {
goto _exit; STbData *tbData = TCONTAINER_OF(node, STbData, rbtn);
} SDelRecord record[1] = {{
.suid = tbData->suid,
for (int32_t iTbData = 0; iTbData < taosArrayGetSize(committer->aTbDataP); iTbData++) { .uid = tbData->uid,
STbData *pTbData = (STbData *)taosArrayGetP(committer->aTbDataP, iTbData); }};
for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) { for (SDelData *delData = tbData->pHead; delData; delData = delData->pNext) {
if (pDelData->eKey < committer->ctx->minKey) continue; if (delData->eKey < committer->ctx->minKey) continue;
if (pDelData->sKey > committer->ctx->maxKey) { if (delData->sKey > committer->ctx->maxKey) {
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, pDelData->sKey); committer->ctx->nextKey = TMIN(committer->ctx->nextKey, delData->sKey);
continue; continue;
} }
code = tsdbCommitWriteDelData(committer, pTbData->suid, pTbData->uid, pDelData->version, record->version = delData->version;
pDelData->sKey /* TODO */, pDelData->eKey /* TODO */); record->skey = TMAX(delData->sKey, committer->ctx->minKey);
if (delData->eKey > committer->ctx->maxKey) {
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, committer->ctx->maxKey + 1);
record->ekey = committer->ctx->maxKey;
} else {
record->ekey = delData->eKey;
}
code = tsdbSttFileWriteDelRecord(committer->sttWriter, record); // TODO
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d failed at line %d since %s", TD_VID(committer->tsdb->pVnode), lino, tstrerror(code)); TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
} else {
tsdbDebug("vgId:%d %s done, fid:%d nDel:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid,
pMem->nDel);
} }
return code; return code;
#endif
} }
static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
......
...@@ -440,51 +440,62 @@ _exit: ...@@ -440,51 +440,62 @@ _exit:
} }
static int32_t tsdbSttFileDoWriteDelBlock(SSttFileWriter *writer) { static int32_t tsdbSttFileDoWriteDelBlock(SSttFileWriter *writer) {
return 0; if (DEL_BLOCK_SIZE(writer->dData) == 0) return 0;
#if 0
if (writer->dData->nRow == 0) return 0;
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino = 0;
SDelBlk delBlk[1]; SDelBlk delBlk[1] = {{
.numRec = DEL_BLOCK_SIZE(writer->dData),
.minTid =
{
.suid = TARRAY2_FIRST(writer->dData->suid),
.uid = TARRAY2_FIRST(writer->dData->uid),
},
.maxTid =
{
.suid = TARRAY2_LAST(writer->dData->suid),
.uid = TARRAY2_LAST(writer->dData->uid),
},
.minVer = TARRAY2_FIRST(writer->dData->version),
.maxVer = TARRAY2_FIRST(writer->dData->version),
.dp[0] =
{
.offset = writer->file->size,
.size = 0,
},
}};
delBlk->nRow = writer->sData->nRow; for (int32_t i = 1; i < DEL_BLOCK_SIZE(writer->dData); i++) {
delBlk->minTid.suid = writer->sData->aData[0][0]; delBlk->minVer = TMIN(delBlk->minVer, TARRAY2_GET(writer->dData->version, i));
delBlk->minTid.uid = writer->sData->aData[1][0]; delBlk->maxVer = TMAX(delBlk->maxVer, TARRAY2_GET(writer->dData->version, i));
delBlk->maxTid.suid = writer->sData->aData[0][writer->sData->nRow - 1];
delBlk->maxTid.uid = writer->sData->aData[1][writer->sData->nRow - 1];
delBlk->minVer = delBlk->maxVer = delBlk->maxVer = writer->sData->aData[2][0];
for (int32_t iRow = 1; iRow < writer->sData->nRow; iRow++) {
if (delBlk->minVer > writer->sData->aData[2][iRow]) delBlk->minVer = writer->sData->aData[2][iRow];
if (delBlk->maxVer < writer->sData->aData[2][iRow]) delBlk->maxVer = writer->sData->aData[2][iRow];
} }
delBlk->dp.offset = writer->file->size; for (int32_t i = 0; i < ARRAY_SIZE(writer->dData->dataArr); i++) {
delBlk->dp.size = 0; // TODO int32_t size;
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->dData->dataArr[i]),
TARRAY2_DATA_LEN(&writer->dData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, writer->config->cmprAlg,
&writer->config->aBuf[0], 0, &size, &writer->config->aBuf[1]);
TSDB_CHECK_CODE(code, lino, _exit);
int64_t tsize = sizeof(int64_t) * writer->dData->nRow; code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[0], size);
for (int32_t i = 0; i < ARRAY_SIZE(writer->dData->aData); i++) {
code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->dData->aData[i], tsize);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
delBlk->dp.size += tsize; delBlk->size[i] = size;
writer->file->size += tsize; delBlk->dp[0].size += size;
writer->file->size += size;
} }
tDelBlockDestroy(writer->dData);
code = TARRAY2_APPEND_PTR(writer->delBlkArray, delBlk); code = TARRAY2_APPEND_PTR(writer->delBlkArray, delBlk);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
tDelBlockClear(writer->dData);
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
tstrerror(code));
} else {
// tsdbTrace();
} }
return code; return code;
#endif
} }
static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) { static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) {
...@@ -531,7 +542,7 @@ _exit: ...@@ -531,7 +542,7 @@ _exit:
static int32_t tsdbSttFileDoWriteDelBlk(SSttFileWriter *writer) { static int32_t tsdbSttFileDoWriteDelBlk(SSttFileWriter *writer) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino = 0;
writer->footer->delBlkPtr->offset = writer->file->size; writer->footer->delBlkPtr->offset = writer->file->size;
writer->footer->delBlkPtr->size = TARRAY2_DATA_LEN(writer->delBlkArray); writer->footer->delBlkPtr->size = TARRAY2_DATA_LEN(writer->delBlkArray);
...@@ -545,8 +556,7 @@ static int32_t tsdbSttFileDoWriteDelBlk(SSttFileWriter *writer) { ...@@ -545,8 +556,7 @@ static int32_t tsdbSttFileDoWriteDelBlk(SSttFileWriter *writer) {
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
tstrerror(code));
} }
return code; return code;
} }
...@@ -624,7 +634,6 @@ static int32_t tsdbSttFileDoUpdateHeader(SSttFileWriter *writer) { ...@@ -624,7 +634,6 @@ static int32_t tsdbSttFileDoUpdateHeader(SSttFileWriter *writer) {
static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *opArray) { static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *opArray) {
int32_t lino; int32_t lino;
int32_t code; int32_t code;
int32_t vid = TD_VID(writer->config->tsdb->pVnode);
code = tsdbSttFileDoWriteTSDataBlock(writer); code = tsdbSttFileDoWriteTSDataBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -668,7 +677,7 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *o ...@@ -668,7 +677,7 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *o
_exit: _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(vid, lino, code); TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
} }
return code; return code;
} }
...@@ -837,42 +846,39 @@ _exit: ...@@ -837,42 +846,39 @@ _exit:
return code; return code;
} }
int32_t tsdbSttFileWriteDLData(SSttFileWriter *writer, TABLEID *tbid, SDelData *pDelData) { int32_t tsdbSttFileWriteDelRecord(SSttFileWriter *writer, const SDelRecord *record) {
ASSERTS(0, "TODO: Not implemented yet");
int32_t code; int32_t code;
int32_t lino; int32_t lino;
int32_t vid = TD_VID(writer->config->tsdb->pVnode);
if (!writer->ctx->opened) { if (!writer->ctx->opened) {
code = tsdbSttFWriterDoOpen(writer); code = tsdbSttFWriterDoOpen(writer);
return code; return code;
} }
// end time-series data write
if (writer->bData->nRow > 0) {
code = tsdbSttFileDoWriteTSDataBlock(writer); code = tsdbSttFileDoWriteTSDataBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
}
if (STATIS_BLOCK_SIZE(writer->sData) > 0) {
code = tsdbSttFileDoWriteStatisBlock(writer); code = tsdbSttFileDoWriteStatisBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
}
#if 0 // write SDelRecord
writer->dData[0].aData[0][writer->dData[0].nRow] = tbid->suid; // suid code = tDelBlockPut(writer->dData, record);
writer->dData[0].aData[1][writer->dData[0].nRow] = tbid->uid; // uid TSDB_CHECK_CODE(code, lino, _exit);
writer->dData[0].aData[2][writer->dData[0].nRow] = pDelData->version; // version
writer->dData[0].aData[3][writer->dData[0].nRow] = pDelData->sKey; // skey
writer->dData[0].aData[4][writer->dData[0].nRow] = pDelData->eKey; // ekey
writer->dData[0].nRow++;
if (writer->dData[0].nRow >= writer->config->maxRow) { // write SDelBlock if need
return tsdbSttFileDoWriteDelBlock(writer); if (DEL_BLOCK_SIZE(writer->dData) >= writer->config->maxRow) {
} else { code = tsdbSttFileDoWriteDelBlock(writer);
return 0; TSDB_CHECK_CODE(code, lino, _exit);
} }
#endif
_exit: _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(vid, lino, code); TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
} }
return code; return code;
} }
\ No newline at end of file
...@@ -45,16 +45,6 @@ int32_t tDelBlockPut(SDelBlock *delBlock, const SDelRecord *delRecord) { ...@@ -45,16 +45,6 @@ int32_t tDelBlockPut(SDelBlock *delBlock, const SDelRecord *delRecord) {
return 0; return 0;
} }
int32_t tDelBlockEncode(SDelBlock *delBlock, void *buf, int32_t size) {
// TODO
return 0;
}
int32_t tDelBlockDecode(const void *buf, SDelBlock *delBlock) {
// TODO
return 0;
}
// STbStatisBlock ---------- // STbStatisBlock ----------
int32_t tStatisBlockInit(STbStatisBlock *statisBlock) { int32_t tStatisBlockInit(STbStatisBlock *statisBlock) {
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->dataArr); ++i) { for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->dataArr); ++i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册