提交 44721767 编写于 作者: H Hongze Cheng

more refact

上级 dc2d5541
...@@ -604,182 +604,90 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { ...@@ -604,182 +604,90 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
SCommitIter *pIter = pCommith->iters + tid; SCommitIter *pIter = pCommith->iters + tid;
TSKEY nextKey = tsdbNextIterKey(pIter->pIter); TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
tsdbResetCommitTable(pCommith);
TSDB_RLOCK_TABLE(pIter->pTable); TSDB_RLOCK_TABLE(pIter->pTable);
// Set commit table // Set commit table
tsdbResetCommitTable(pCommith);
if (tsdbSetCommitTable(pCommith, pIter->pTable) < 0) { if (tsdbSetCommitTable(pCommith, pIter->pTable) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable); TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1; return -1;
} }
// No disk data and no memory data, just return
if (pCommith->readh.pBlkIdx == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) { if (pCommith->readh.pBlkIdx == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) {
// No disk data and no memory data
TSDB_RUNLOCK_TABLE(pIter->pTable); TSDB_RUNLOCK_TABLE(pIter->pTable);
return 0; return 0;
} else {
// Must has disk data, maybe has memory data
int nBlocks;
int bidx = 0;
SBlock *pBlock;
if (pCommith->readh.pBlkIdx) {
if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
} else {
nBlocks = 0;
}
if (bidx < nBlocks) {
pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
} else {
pBlock = NULL;
}
while (true) {
if (pBlock == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) break;
if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) ||
(pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) {
if (tsdbMoveBlock(pCommith, bidx) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
bidx++;
if (bidx < nBlocks) {
pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
} else {
pBlock = NULL;
}
} else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) {
// merge pBlock data and memory data
if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
bidx++;
if (bidx < nBlocks) {
pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
} else {
pBlock = NULL;
}
nextKey = tsdbNextIterKey(pIter->pIter);
} else {
// Only commit memory data
if (pBlock == NULL) {
if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
} else {
if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
}
nextKey = tsdbNextIterKey(pIter->pIter);
}
}
} }
#if 0 // Must has disk data or has memory data
if (!pCommith->isRFileSet) { int nBlocks;
if (pIter->pIter == NULL) { int bidx = 0;
// No memory data SBlock *pBlock;
TSDB_RUNLOCK_TABLE(pIter->pTable);
return 0;
} else {
// TODO: think about no data committed at all
if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, true) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
if (pCommith->readh.pBlkIdx) {
if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable); TSDB_RUNLOCK_TABLE(pIter->pTable);
if (tsdbWriteBlockInfo(pCommith) < 0) { return -1;
return -1;
}
return 0;
} }
}
// No memory data and no disk data, just return nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
if (pIter->pIter == NULL && pCommith->readh.pBlkIdx == NULL) { } else {
TSDB_RUNLOCK_TABLE(pIter->pTable); nBlocks = 0;
return 0;
}
if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
} }
// Process merge commit if (bidx < nBlocks) {
int nBlocks = (pCommith->readh.pBlkIdx == NULL) ? 0 : pCommith->readh.pBlkIdx->numOfBlocks; pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
int cidx = 0;
void * ptr = NULL;
SBlock *pBlock;
if (cidx < nBlocks) {
pBlock = pCommith->readh.pBlkInfo->blocks + cidx;
} else { } else {
pBlock = NULL; pBlock = NULL;
} }
while (true) { while (true) {
if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) && (pBlock == NULL)) break; if (pBlock == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) break;
if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) || if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) ||
(pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) { (pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) {
if (tsdbMoveBlock(pCommith, cidx) < 0) { if (tsdbMoveBlock(pCommith, bidx) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable); TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1; return -1;
} }
cidx++; bidx++;
if (cidx < nBlocks) { if (bidx < nBlocks) {
pBlock = pCommith->readh.pBlkInfo->blocks + cidx; pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
} else { } else {
pBlock = NULL; pBlock = NULL;
} }
} else if ((cidx < nBlocks) && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) { } else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) {
if (tsdbMergeMemData(pCommith, pIter, cidx) < 0) { // merge pBlock data and memory data
if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable); TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1; return -1;
} }
cidx++; bidx++;
if (cidx < nBlocks) { if (bidx < nBlocks) {
pBlock = pCommith->readh.pBlkInfo->blocks + cidx; pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
} else { } else {
pBlock = NULL; pBlock = NULL;
} }
nextKey = tsdbNextIterKey(pIter->pIter); nextKey = tsdbNextIterKey(pIter->pIter);
} else { } else {
// Only commit memory data
if (pBlock == NULL) { if (pBlock == NULL) {
if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) { if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable); TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1; return -1;
} }
nextKey = tsdbNextIterKey(pIter->pIter);
} else { } else {
if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) { if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable); TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1; return -1;
} }
nextKey = tsdbNextIterKey(pIter->pIter);
} }
nextKey = tsdbNextIterKey(pIter->pIter);
} }
} }
#endif
TSDB_RUNLOCK_TABLE(pIter->pTable); TSDB_RUNLOCK_TABLE(pIter->pTable);
...@@ -961,8 +869,7 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo ...@@ -961,8 +869,7 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
} }
static int tsdbWriteBlockInfo(SCommitH *pCommih) { static int tsdbWriteBlockInfo(SCommitH *pCommih) {
SDFile *pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
SBlockIdx blkIdx; SBlockIdx blkIdx;
STable * pTable = TSDB_COMMIT_TABLE(pCommih); STable * pTable = TSDB_COMMIT_TABLE(pCommih);
SBlock * pBlock; SBlock * pBlock;
...@@ -974,9 +881,13 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) { ...@@ -974,9 +881,13 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) {
nSupBlocks = taosArrayGetSize(pCommih->aSupBlk); nSupBlocks = taosArrayGetSize(pCommih->aSupBlk);
nSubBlocks = taosArrayGetSize(pCommih->aSubBlk); nSubBlocks = taosArrayGetSize(pCommih->aSubBlk);
tlen = sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM);
ASSERT(nSupBlocks > 0); if (nSupBlocks <= 0) {
// No data (data all deleted)
return 0;
}
tlen = sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM);
// Write SBlockInfo part // Write SBlockInfo part
if (tsdbMakeRoom((void **)(&(TSDB_COMMIT_BUF(pCommih))), tlen) < 0) return -1; if (tsdbMakeRoom((void **)(&(TSDB_COMMIT_BUF(pCommih))), tlen) < 0) return -1;
...@@ -1001,16 +912,7 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) { ...@@ -1001,16 +912,7 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) {
taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen); taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen);
offset = tsdbSeekDFile(pHeadf, 0, SEEK_END); if (tsdbAppendDFile(pHeadf, TSDB_COMMIT_BUF(pCommih), tlen, &offset) < 0) {
if (offset < 0) {
tsdbError("vgId:%d failed to write block info part to file %s while seek since %s", TSDB_COMMIT_REPO_ID(pCommih),
TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno));
return -1;
}
if (tsdbWriteDFile(pHeadf, TSDB_COMMIT_BUF(pCommih), tlen) < tlen) {
tsdbError("vgId:%d failed to write block info part to file %s since %s", TSDB_COMMIT_REPO_ID(pCommih),
TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno));
return -1; return -1;
} }
...@@ -1044,7 +946,10 @@ static int tsdbWriteBlockIdx(SCommitH *pCommih) { ...@@ -1044,7 +946,10 @@ static int tsdbWriteBlockIdx(SCommitH *pCommih) {
int tlen = 0, size; int tlen = 0, size;
int64_t offset; int64_t offset;
ASSERT(nidx > 0); if (nidx <= 0) {
// All data are deleted
return 0;
}
for (size_t i = 0; i < nidx; i++) { for (size_t i = 0; i < nidx; i++) {
pBlkIdx = (SBlockIdx *)taosArrayGet(pCommih->aBlkIdx, i); pBlkIdx = (SBlockIdx *)taosArrayGet(pCommih->aBlkIdx, i);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册