提交 02534972 编写于 作者: H Hongze Cheng

partial work

上级 c309bd00
......@@ -608,6 +608,7 @@ struct STsdbRepo {
};
#define REPO_ID(r) (r)->config.tsdbId
#define REPO_CFG(r) (&((r)->config))
#define IS_REPO_LOCKED(r) (r)->repoLocked
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
......
......@@ -46,6 +46,7 @@ typedef struct {
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&(ch->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&(ch->readh))
#define TSDB_COMMIT_DEFAULT_ROWS(ch) (TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock * 4 / 5)
void *tsdbCommitData(STsdbRepo *pRepo) {
if (tsdbStartCommit(pRepo) < 0) {
......@@ -469,23 +470,70 @@ static int tsdbCommitToTable(SCommitH *pch, int tid) {
TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
int cidx = 0;
void * ptr = NULL;
SBlock *pBlock = NULL;
SBlock *pBlock;
if (cidx < nBlocks) {
pBlock = pch->readh.pBlkInfo->blocks + cidx;
} else {
pBlock = NULL;
}
while (true) {
if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pch->maxKey) && (cidx >= nBlocks)) break;
if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pch->maxKey) && (pBlock == NULL)) break;
if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pch->maxKey) ||
((cidx < nBlocks) && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) {
(pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) {
// TODO: move the block
ASSERT(pBlock->numOfSubBlocks > 0);
if (pBlock->numOfSubBlocks == 1) { // move super block
if (taosArrayPush(pch->aSupBlk, (void *)pBlock) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
} else {
}
cidx++;
if (cidx < nBlocks) {
pBlock = pch->readh.pBlkInfo->blocks + cidx;
} else {
pBlock = NULL;
}
} else if ((cidx < nBlocks) && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) {
// TODO: process merge commit
TSKEY keyLimit;
if (cidx == nBlocks - 1) {
keyLimit = pch->maxKey;
} else {
keyLimit = pBlock[1].keyFirst - 1;
}
if (tsdbMergeMemData(pch, pIter, pBlock, keyLimit) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
cidx++;
if (cidx < nBlocks) {
pBlock = pch->readh.pBlkInfo->blocks + cidx;
} else {
pBlock = NULL;
}
nextKey = tsdbNextIterKey(pIter->pIter);
} else {
if (pBlock == NULL) {
// commit memory data until pch->maxKey and write to the appropriate file
if (tsdbCommitMemData(pch, pIter, pch->maxKey, false) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
nextKey = tsdbNextIterKey(pIter->pIter);
} else {
// commit memory data until pBlock->keyFirst and write to only data file
if (tsdbCommitMemData(pch, pIter, pBlock->keyFirst-1, true) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
nextKey = tsdbNextIterKey(pIter->pIter);
}
}
......@@ -512,35 +560,6 @@ static int tsdbCommitToTable(SCommitH *pch, int tid) {
#endif
}
// if (pIter->pIter == NULL) {
// // No memory data but has disk data
// // TODO
// } else {
// TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
// int cidx = 0;
// SBlock *pBlock = NULL;
// void *ptr = taosbsearch((void *)(&nextKey), pch->readh.pBlkInfo->blocks, pch->readh.pBlkIdx->numOfBlocks,
// sizeof(SBlock), tsdbComparKeyBlock, TD_GE);
// while (true) {
// if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pch->maxKey) && (cidx >= pch->readh.pBlkIdx->numOfBlocks))
// break;
// if (tsdbComparKeyBlock((void *)(&nextKey), pBlock) < 0) {
// if (pBlock->last) {
// // merge with the last block
// } else {
// // Commit until pch->maxKey or (pBlock[1].keyFirst-1)
// }
// } else if (tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0) { // merge the block
// } else {
// }
// }
// }
TSDB_RUNLOCK_TABLE(pIter->pTable);
if (tsdbWriteBlockInfo(pch) < 0) return -1;
......@@ -825,5 +844,82 @@ static int tsdbWriteBlockIdx(SCommitH *pCommih) {
pHeadf->info.offset = offset;
pHeadf->info.len = tlen;
return 0;
}
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SMergeInfo mInfo;
int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
SDFile * pDFile;
bool isLast;
SBlock block;
while (true) {
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, defaultRows, pCommith->pDataCols, NULL, 0,
pCfg->update, &mInfo);
if (pCommith->pDataCols->numOfRows <= 0) break;
if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) {
pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
isLast = false;
} else {
pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
isLast = true;
}
if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1;
if (taosArrayPush(pCommith->aSupBlk, (void *)(&block)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
return 0;
}
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, SBlock *pBlock, TSKEY keyLimit) {
// TODO
return 0;
}
static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
SBlock *pBlock = pCommith->readh.pBlkInfo->blocks+bidx;
SDFile *pCommitF = (pBlock->last) ? TSDB_COMMIT_LAST_FILE(pCommith) : TSDB_COMMIT_DATA_FILE(pCommith);
SDFile *pReadF = (pBlock->last) ? TSDB_READ_LAST_FILE(&(pCommith->readh)) : TSDB_READ_DATA_FILE(&(pCommith->readh));
SBlock block;
if (tfsIsSameFile(&(pCommitF->f), &(pReadF->f))) {
if (pBlock->numOfSubBlocks == 1) {
if (taosArrayPush(pCommith->aSupBlk, (void *)pBlock) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
} else {
block = *pBlock;
block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSupBlock);
if (taosArrayPush(pCommith->aSupBlk, (void *)(&block)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (taosArrayPushBatch(pCommith->aSubBlk, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), pBlock->numOfSubBlocks) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
} else {
if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1;
if (tsdbWriteBlock(pCommith, pCommitF, pCommith->readh.pDCols[0], &block, pBlock->last, true) < 0) return -1;
if (taosArrayPush(pCommith->aSupBlk, (void *)(&block)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
return 0;
}
\ No newline at end of file
......@@ -46,9 +46,20 @@ void* taosArrayInit(size_t size, size_t elemSize);
*
* @param pArray
* @param pData
* @param nEles
* @return
*/
void* taosArrayPush(SArray* pArray, const void* pData);
void *taosArrayPushBatch(SArray *pArray, const void *pData, int nEles);
/**
*
* @param pArray
* @param pData
* @return
*/
static FORCE_INLINE void* taosArrayPush(SArray* pArray, const void* pData) {
return taosArrayPushBatch(pArray, pData, 1);
}
/**
*
......
......@@ -55,24 +55,29 @@ static int32_t taosArrayResize(SArray* pArray) {
return 0;
}
void* taosArrayPush(SArray* pArray, const void* pData) {
void* taosArrayPushBatch(SArray* pArray, const void* pData, int nEles) {
if (pArray == NULL || pData == NULL) {
return NULL;
}
if (pArray->size >= pArray->capacity) {
int32_t ret = taosArrayResize(pArray);
// failed to push data into buffer due to the failure of memory allocation
if (ret != 0) {
if (pArray->size + nEles > pArray->capacity) {
size_t tsize = (pArray->capacity << 1u);
while (pArray->size + nEles > tsize) {
tsize = (tsize << 1u);
}
pArray->pData = realloc(pArray->pData, tsize * pArray->elemSize);
if (pArray->pData == NULL) {
return NULL;
}
pArray->capacity = tsize;
}
void* dst = TARRAY_GET_ELEM(pArray, pArray->size);
memcpy(dst, pData, pArray->elemSize);
memcpy(dst, pData, pArray->elemSize * nEles);
pArray->size += 1;
pArray->size += nEles;
return dst;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册