提交 f318fdfd 编写于 作者: H Hongze Cheng

more

上级 7d3a57f5
......@@ -41,6 +41,8 @@ typedef struct {
SDataCols * pDataCols;
} SCommitH;
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
#define TSDB_COMMIT_REPO(ch) TSDB_READ_REPO(&(ch->readh))
#define TSDB_COMMIT_REPO_ID(ch) REPO_ID(TSDB_READ_REPO(&(ch->readh)))
#define TSDB_COMMIT_WRITE_FSET(ch) (&((ch)->wSet))
......@@ -83,16 +85,16 @@ static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
// static int tsdbWriteBlockInfo(SCommitH *pCommih);
// static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
// static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx);
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx);
static int tsdbMoveBlock(SCommitH *pCommith, int bidx);
static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks);
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
bool isLastOneBlock);
static void tsdbResetCommitTable(SCommitH *pCommith);
static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
// static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
// static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
// TSKEY maxKey, int maxRows, int8_t update);
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows, int8_t update);
int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
SDiskID did;
......@@ -1063,19 +1065,18 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
pBlock = NULL;
}
} else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) {
// // merge pBlock data and memory data
// if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) {
// TSDB_RUNLOCK_TABLE(pIter->pTable);
// return -1;
// }
// merge pBlock data and memory data
if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) {
return -1;
}
// bidx++;
// if (bidx < nBlocks) {
// pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
// } else {
// pBlock = NULL;
// }
// nextKey = tsdbNextIterKey(pIter->pIter);
bidx++;
if (bidx < nBlocks) {
pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
} else {
pBlock = NULL;
}
nextKey = tsdbNextIterKey(pIter->pIter);
} else {
// // Only commit memory data
// if (pBlock == NULL) {
......@@ -1329,77 +1330,76 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
// return 0;
// }
// static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
// STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
// STsdbCfg * pCfg = REPO_CFG(pRepo);
// int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
// SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
// TSKEY keyLimit;
// int16_t colId = 0;
// SMergeInfo mInfo;
// SBlock subBlocks[TSDB_MAX_SUBBLOCKS];
// SBlock block, supBlock;
// SDFile * pDFile;
// if (bidx == nBlocks - 1) {
// keyLimit = pCommith->maxKey;
// } else {
// keyLimit = pBlock[1].keyFirst - 1;
// }
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo);
int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
TSKEY keyLimit;
int16_t colId = 0;
SMergeInfo mInfo;
SBlock subBlocks[TSDB_MAX_SUBBLOCKS];
SBlock block, supBlock;
SDFile * pDFile;
if (bidx == nBlocks - 1) {
keyLimit = pCommith->maxKey;
} else {
keyLimit = pBlock[1].keyFirst - 1;
}
// SSkipListIterator titer = *(pIter->pIter);
// if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1) < 0) return -1;
SSkipListIterator titer = *(pIter->pIter);
if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1) < 0) return -1;
// tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, pCommith->readh.pDCols[0]->cols[0].pData,
// pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo);
tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, pCommith->readh.pDCols[0]->cols[0].pData,
pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo);
// if (mInfo.nOperations == 0) {
// // no new data to insert (all updates denied)
// if (tsdbMoveBlock(pCommith, bidx) < 0) {
// return -1;
// }
// *(pIter->pIter) = titer;
// } else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) {
// // Ignore the block
// ASSERT(0);
// *(pIter->pIter) = titer;
// } else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) {
// // Add a sub-block
// tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols,
// pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows,
// pCfg->update, &mInfo);
// if (pBlock->last) {
// pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
// } else {
// pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
// }
if (mInfo.nOperations == 0) {
// no new data to insert (all updates denied)
if (tsdbMoveBlock(pCommith, bidx) < 0) {
return -1;
}
*(pIter->pIter) = titer;
} else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) {
// Ignore the block
ASSERT(0);
*(pIter->pIter) = titer;
} else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) {
// Add a sub-block
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols,
pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update,
&mInfo);
if (pBlock->last) {
pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
} else {
pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
}
// if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1;
if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1;
// if (pBlock->numOfSubBlocks == 1) {
// subBlocks[0] = *pBlock;
// subBlocks[0].numOfSubBlocks = 0;
// } else {
// memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset),
// sizeof(SBlock) * pBlock->numOfSubBlocks);
// }
// subBlocks[pBlock->numOfSubBlocks] = block;
// supBlock = *pBlock;
// supBlock.keyFirst = mInfo.keyFirst;
// supBlock.keyLast = mInfo.keyLast;
// supBlock.numOfSubBlocks++;
// supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed;
// supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock);
// if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1;
// } else {
// if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1;
// if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return
// -1;
// }
if (pBlock->numOfSubBlocks == 1) {
subBlocks[0] = *pBlock;
subBlocks[0].numOfSubBlocks = 0;
} else {
memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset),
sizeof(SBlock) * pBlock->numOfSubBlocks);
}
subBlocks[pBlock->numOfSubBlocks] = block;
supBlock = *pBlock;
supBlock.keyFirst = mInfo.keyFirst;
supBlock.keyLast = mInfo.keyLast;
supBlock.numOfSubBlocks++;
supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed;
supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock);
if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1;
} else {
if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1;
if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return -1;
}
// return 0;
// }
return 0;
}
static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
......@@ -1454,107 +1454,107 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const
return 0;
}
// static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool
// isLastOneBlock) {
// STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
// STsdbCfg * pCfg = REPO_CFG(pRepo);
// SBlock block;
// SDFile * pDFile;
// bool isLast;
// int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
bool isLastOneBlock) {
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
SBlock block;
SDFile * pDFile;
bool isLast;
int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
// int biter = 0;
// while (true) {
// tsdbLoadAndMergeFromCache(pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows,
// pCfg->update);
// if (pCommith->pDataCols->numOfRows == 0) break;
// if (isLastOneBlock) {
// if (pCommith->pDataCols->numOfRows < pCfg->minRowsPerFileBlock) {
// pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
// isLast = true;
// } else {
// pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
// isLast = false;
// }
// } else {
// pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
// isLast = false;
// }
int biter = 0;
while (true) {
tsdbLoadAndMergeFromCache(pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows,
pCfg->update);
// if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1;
// if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1;
// }
if (pCommith->pDataCols->numOfRows == 0) break;
// return 0;
// }
if (isLastOneBlock) {
if (pCommith->pDataCols->numOfRows < pCfg->minRowsPerFileBlock) {
pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
isLast = true;
} else {
pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
isLast = false;
}
} else {
pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
isLast = false;
}
// static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
// TSKEY maxKey, int maxRows, int8_t update) {
// TSKEY key1 = INT64_MAX;
// TSKEY key2 = INT64_MAX;
// STSchema *pSchema = NULL;
if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1;
if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1;
}
// ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey);
// tdResetDataCols(pTarget);
return 0;
}
// while (true) {
// key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
// SMemRow row = tsdbNextIterRow(pCommitIter->pIter);
// if (row == NULL || memRowKey(row) > maxKey) {
// key2 = INT64_MAX;
// } else {
// key2 = memRowKey(row);
// }
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows, int8_t update) {
TSKEY key1 = INT64_MAX;
TSKEY key2 = INT64_MAX;
STSchema *pSchema = NULL;
// if (key1 == INT64_MAX && key2 == INT64_MAX) break;
ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey);
tdResetDataCols(pTarget);
// if (key1 < key2) {
// for (int i = 0; i < pDataCols->numOfCols; i++) {
// //TODO: dataColAppendVal may fail
// dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
// pTarget->maxPoints);
// }
while (true) {
key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
SMemRow row = tsdbNextIterRow(pCommitIter->pIter);
if (row == NULL || memRowKey(row) > maxKey) {
key2 = INT64_MAX;
} else {
key2 = memRowKey(row);
}
// pTarget->numOfRows++;
// (*iter)++;
// } else if (key1 > key2) {
// if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
// pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
// ASSERT(pSchema != NULL);
// }
if (key1 == INT64_MAX && key2 == INT64_MAX) break;
// tdAppendMemRowToDataCol(row, pSchema, pTarget, true);
if (key1 < key2) {
for (int i = 0; i < pDataCols->numOfCols; i++) {
// TODO: dataColAppendVal may fail
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints);
}
// tSkipListIterNext(pCommitIter->pIter);
// } else {
// if (update != TD_ROW_OVERWRITE_UPDATE) {
// //copy disk data
// for (int i = 0; i < pDataCols->numOfCols; i++) {
// //TODO: dataColAppendVal may fail
// dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
// pTarget->maxPoints);
// }
// if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++;
// }
// if (update != TD_ROW_DISCARD_UPDATE) {
// //copy mem data
// if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
// pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
// ASSERT(pSchema != NULL);
// }
// tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE);
// }
// (*iter)++;
// tSkipListIterNext(pCommitIter->pIter);
// }
pTarget->numOfRows++;
(*iter)++;
} else if (key1 > key2) {
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
// if (pTarget->numOfRows >= maxRows) break;
// }
// }
tdAppendMemRowToDataCol(row, pSchema, pTarget, true);
tSkipListIterNext(pCommitIter->pIter);
} else {
if (update != TD_ROW_OVERWRITE_UPDATE) {
// copy disk data
for (int i = 0; i < pDataCols->numOfCols; i++) {
// TODO: dataColAppendVal may fail
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints);
}
if (update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++;
}
if (update != TD_ROW_DISCARD_UPDATE) {
// copy mem data
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE);
}
(*iter)++;
tSkipListIterNext(pCommitIter->pIter);
}
if (pTarget->numOfRows >= maxRows) break;
}
}
static void tsdbResetCommitTable(SCommitH *pCommith) {
taosArrayClear(pCommith->aSubBlk);
......@@ -1573,23 +1573,23 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) {
tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
}
// static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) {
// STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
// STsdbCfg * pCfg = REPO_CFG(pRepo);
// int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed;
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) {
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed;
// ASSERT(mergeRows > 0);
ASSERT(mergeRows > 0);
// if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRowsPerFileBlock) {
// if (pBlock->last) {
// if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true;
// } else {
// if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true;
// }
// }
if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRowsPerFileBlock) {
if (pBlock->last) {
if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true;
} else {
if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true;
}
}
// return false;
// }
return false;
}
// int tsdbApplyRtn(STsdbRepo *pRepo) {
// SRtn rtn;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册