“befa257a869ba903619761d19d2c949d35350739”上不存在“src/wal/git@gitcode.net:qq_37101384/tdengine.git”
提交 d2b5bf71 编写于 作者: H Hongze Cheng

refactor more code

上级 fd645261
...@@ -286,8 +286,6 @@ typedef struct { ...@@ -286,8 +286,6 @@ typedef struct {
typedef struct { typedef struct {
STsdbRepo* pRepo; STsdbRepo* pRepo;
SFileGroup fGroup; SFileGroup fGroup;
TSKEY minKey;
TSKEY maxKey;
SBlockIdx* pBlockIdx; SBlockIdx* pBlockIdx;
int nBlockIdx; int nBlockIdx;
int cBlockIdx; int cBlockIdx;
...@@ -301,6 +299,7 @@ typedef struct { ...@@ -301,6 +299,7 @@ typedef struct {
} SReadHandle; } SReadHandle;
#define TSDB_BLOCK_DATA_LEN(nCols) (sizeof(SBlockData) + sizeof(SBlockCol) * (nCols) + sizeof(TSCKSUM)) #define TSDB_BLOCK_DATA_LEN(nCols) (sizeof(SBlockData) + sizeof(SBlockCol) * (nCols) + sizeof(TSCKSUM))
#define TSDB_BLOCK_INFO_LEN(nBlocks) (sizeof(SBlockInfo) + sizeof(SBlock) * (nBlocks) + sizeof(TSCKSUM))
// Operations // Operations
// ------------------ tsdbMeta.c // ------------------ tsdbMeta.c
...@@ -509,8 +508,27 @@ int tsdbLoadBlockDataCols(SReadHandle* pReadH, SBlock* pBlock, SBlockInfo* pBloc ...@@ -509,8 +508,27 @@ int tsdbLoadBlockDataCols(SReadHandle* pReadH, SBlock* pBlock, SBlockInfo* pBloc
int tsdbLoadBlockDataInfo(SReadHandle* pReadH, SBlock* pBlock); int tsdbLoadBlockDataInfo(SReadHandle* pReadH, SBlock* pBlock);
#define TSDB_FILE_IN_FGROUP(pGroup, type) (&((pGroup)->files[(type)])) #define TSDB_FILE_IN_FGROUP(pGroup, type) (&((pGroup)->files[(type)]))
#define TSDB_KEY_BEYOND_RANGE(key, maxKey) ((key) < 0 || (key) > (maxKey))
int tsdbAllocBuf(void **ppBuf, uint32_t size); static FORCE_INLINE int tsdbAllocBuf(void **ppBuf, uint32_t size) {
ASSERT(size > 0);
void *pBuf = *pBuf;
uint32_t tsize = taosTSizeof(pBuf);
if (tsize >= size) return 0;
if (tsize == 0) tsize = 1024;
while (tsize < size) {
tsize *= 2;
}
*ppBuf = taosTRealloc(pBuf, tsize);
if (*ppBuf == NULL) return -1;
}
int tsdbEncodeBlockIdx(void** buf, SBlockIdx* pBlockIdx);
void* tsdbDecodeBlockIdx(void* buf, SBlockIdx* pBlockIdx);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -17,8 +17,9 @@ ...@@ -17,8 +17,9 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include "tsdbMain.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "tscompression.h"
#include "tsdbMain.h"
#define TSDB_DATA_FILE_CHANGE 0 #define TSDB_DATA_FILE_CHANGE 0
#define TSDB_META_FILE_CHANGE 1 #define TSDB_META_FILE_CHANGE 1
...@@ -31,11 +32,15 @@ typedef struct { ...@@ -31,11 +32,15 @@ typedef struct {
SCommitIter *pIters; SCommitIter *pIters;
SReadHandle *pReadH; SReadHandle *pReadH;
SFileGroup * pFGroup; SFileGroup * pFGroup;
TSKEY minKey;
TSKEY maxKey;
SBlockIdx * pBlockIdx; SBlockIdx * pBlockIdx;
int nBlockIdx; int nBlockIdx;
SBlockIdx newBlockIdx;
SBlockInfo * pBlockInfo; SBlockInfo * pBlockInfo;
int nBlocks;
SBlock * pSubBlock; SBlock * pSubBlock;
int nSubBlock; int nSubBlocks;
SDataCols * pDataCols; SDataCols * pDataCols;
} STSCommitHandle; } STSCommitHandle;
...@@ -164,24 +169,28 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) { ...@@ -164,24 +169,28 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) {
if (pTSCh == NULL) return -1; if (pTSCh == NULL) return -1;
// Seek skip over data beyond retention // Seek skip over data beyond retention
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, mfid, &minKey, &maxKey);
tsdbSeekTSCommitHandle(pTSCh, maxKey); tsdbSeekTSCommitHandle(pTSCh, minKey);
// Commit Time-Series data file by file // Commit Time-Series data file by file
int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
for (int fid = sfid; fid <= efid; fid++) { for (int fid = sfid; fid <= efid; fid++) {
// Skip files beyond retention
if (fid < mfid) continue; if (fid < mfid) continue;
if (!tsdbHasDataToCommit(tsCommitH.pIters, pMem->maxTables, minKey, maxKey)) continue; if (!tsdbHasDataToCommit(pTSCh, minKey, maxKey)) continue;
// TODO : set pOldGroup and pNewGroup
SFileGroup *pOldGroup = NULL;
SFileGroup *pNewGroup = NULL;
if (tsdbLogTSFileChange(pCommitH, fid) < 0) { if (tsdbLogTSFileChange(pCommitH, fid) < 0) {
tsdbFreeTSCommitHandle(pTSCh); tsdbFreeTSCommitHandle(pTSCh);
return -1; return -1;
} }
if (tsdbCommitToFileGroup(pRepo, NULL, NULL, &tsCommitH) < 0) { if (tsdbCommitToFileGroup(pTSCh, pOldGroup, pNewGroup) < 0) {
tsdbFreeTSCommitHandle(pTSCh); tsdbFreeTSCommitHandle(pTSCh);
return -1; return -1;
} }
...@@ -295,8 +304,8 @@ static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) { ...@@ -295,8 +304,8 @@ static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
free(iters); free(iters);
} }
static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pOldGroup, SFileGroup *pNewGroup, STSCommitHandle *pTSCh) { static int tsdbCommitToFileGroup(STSCommitHandle *pTSCh, SFileGroup *pOldGroup, SFileGroup *pNewGroup) {
SCommitIter *iters = pTSCh->pIters; SCommitIter *pIters = pTSCh->pIters;
if (tsdbSetAndOpenCommitFGroup(pTSCh, pOldGroup, pNewGroup) < 0) return -1; if (tsdbSetAndOpenCommitFGroup(pTSCh, pOldGroup, pNewGroup) < 0) return -1;
...@@ -306,7 +315,7 @@ static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pOldGroup, SFileG ...@@ -306,7 +315,7 @@ static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pOldGroup, SFileG
} }
for (int tid = 1; tid < pTSCh->maxIters; tid++) { for (int tid = 1; tid < pTSCh->maxIters; tid++) {
SCommitIter *pIter = iters + tid; SCommitIter *pIter = pIters + tid;
if (pIter->pTable == NULL) continue; if (pIter->pTable == NULL) continue;
if (tsdbCommitTableData(pTSCh, tid) < 0) { if (tsdbCommitTableData(pTSCh, tid) < 0) {
...@@ -320,12 +329,20 @@ static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pOldGroup, SFileG ...@@ -320,12 +329,20 @@ static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pOldGroup, SFileG
return -1; return -1;
} }
if (tsdbUpdateFileGroupInfo(pNewGroup) < 0) {
tsdbCloseAndUnsetCommitFGroup(pTSCh, true /* hasError = true */);
return -1;
}
tsdbCloseAndUnsetCommitFGroup(pTSCh, false /* hasError = false */); tsdbCloseAndUnsetCommitFGroup(pTSCh, false /* hasError = false */);
return 0; return 0;
} }
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { static int tsdbHasDataToCommit(STSCommitHandle *pTSCh, TSKEY minKey, TSKEY maxKey) {
int nIters = pTSCh->maxIters;
SCommitIter *iters = pTSCh->pIters;
for (int i = 0; i < nIters; i++) { for (int i = 0; i < nIters; i++) {
SCommitIter *pIter = iters + i; SCommitIter *pIter = iters + i;
if (pIter->pTable == NULL) continue; if (pIter->pTable == NULL) continue;
...@@ -619,6 +636,7 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { ...@@ -619,6 +636,7 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) {
SCommitIter *pIter = pTSCh->pIters + tid; SCommitIter *pIter = pTSCh->pIters + tid;
SReadHandle *pReadH = pTSCh->pReadH; SReadHandle *pReadH = pTSCh->pReadH;
SDataCols * pDataCols = pTSCh->pDataCols; SDataCols * pDataCols = pTSCh->pDataCols;
TSKEY keyNext = tsdbNextIterKey(pIter->pIter);
taosRLockLatch(&(pIter->pTable->latch)); taosRLockLatch(&(pIter->pTable->latch));
...@@ -627,7 +645,13 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { ...@@ -627,7 +645,13 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) {
return -1; return -1;
} }
if (tsdbLoadBlockInfo(pReadH) < 0) { if (pReadH->pCurBlockIdx == NULL && TSDB_KEY_BEYOND_RANGE(keyNext, pTSCh->maxKey)) {
// no data in memory and no old data in file, just skip the table
taosRUnLockLatch(&(pIter->pTable->latch));
return 0;
}
if (tsdbLoadBlockInfo(pReadH, tid) < 0) {
taosRUnLockLatch(&(pIter->pTable->latch)); taosRUnLockLatch(&(pIter->pTable->latch));
return -1; return -1;
} }
...@@ -642,6 +666,12 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { ...@@ -642,6 +666,12 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) {
return -1; return -1;
} }
// Append a new blockIdx
if (tsdbAppendBlockIdx(pTSCh) < 0) {
taosRUnLockLatch(&(pIter->pTable->latch));
return -1;
}
taosRUnLockLatch(&(pIter->pTable->latch)); taosRUnLockLatch(&(pIter->pTable->latch));
return 0; return 0;
} }
...@@ -689,23 +719,19 @@ static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIte ...@@ -689,23 +719,19 @@ static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIte
return numOfRows; return numOfRows;
} }
static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SBlock *pCompBlock) { static int tsdbWriteBlockToRightFile(STSCommitHandle *pTSCh, SDataCols *pDataCols, SBlock *pBlock) {
STsdbCfg *pCfg = &(pHelper->pRepo->config); STsdbCfg *pCfg = &(pTSCh->pReadH->pRepo->config);
SFile * pFile = NULL; SFile * pFile = NULL;
bool isLast = false; bool islast = false;
ASSERT(pDataCols->numOfRows > 0);
if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) {
pFile = helperDataF(pHelper); pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA);
} else { } else {
isLast = true; islast = true;
pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? helperNewLastF(pHelper) : helperLastF(pHelper); pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST);
} }
ASSERT(pFile->fd > 0); if (tsdbWriteBlockToFile(pTSCh, pFile, pDataCols, pBlock, islast, true) < 0) return -1;
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, pCompBlock, isLast, true) < 0) return -1;
return 0; return 0;
} }
...@@ -877,8 +903,11 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkI ...@@ -877,8 +903,11 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkI
return 0; return 0;
} }
static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, STsdbRepo *pOldGroup, STsdbRepo *pNewGroup) { static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, SFileGroup *pOldGroup, SFileGroup *pNewGroup) {
ASSERT(pOldGroup->fileId == pNewGroup->fileId);
STsdbRepo *pRepo = pTSCh->pReadH->pRepo; STsdbRepo *pRepo = pTSCh->pReadH->pRepo;
STsdbCfg * pCfg = &(pRepo->config);
if (tsdbSetAndOpenReadFGroup(pTSCh->pReadH, pOldGroup) < 0) { if (tsdbSetAndOpenReadFGroup(pTSCh->pReadH, pOldGroup) < 0) {
tsdbError("vgId:%d failed to set and open commit file group since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to set and open commit file group since %s", REPO_ID(pRepo), tstrerror(terrno));
...@@ -913,6 +942,8 @@ static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, STsdbRepo *pOldGro ...@@ -913,6 +942,8 @@ static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, STsdbRepo *pOldGro
pTSCh->pFGroup = pNewGroup; pTSCh->pFGroup = pNewGroup;
pTSCh->nBlockIdx = 0; pTSCh->nBlockIdx = 0;
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pOldGroup->fileId, &(pTSCh->minKey), &(pTSCh->maxKey));
return 0; return 0;
} }
...@@ -934,12 +965,91 @@ static void tsdbCloseAndUnsetCommitFGroup(STSCommitHandle *pTSCh, bool hasError) ...@@ -934,12 +965,91 @@ static void tsdbCloseAndUnsetCommitFGroup(STSCommitHandle *pTSCh, bool hasError)
} }
static int tsdbWriteBlockInfo(STSCommitHandle *pTSCh) { static int tsdbWriteBlockInfo(STSCommitHandle *pTSCh) {
// TODO ASSERT(pTSCh->nBlocks > 0);
SReadHandle *pReadH = pTSCh->pReadH;
SBlockInfo * pBlockInfo = pTSCh->pBlockInfo;
SFile * pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_HEAD);
int tlen = TSDB_BLOCK_INFO_LEN(pTSCh->nBlocks, pTSCh->nSubBlocks);
pBlockInfo->delimiter = TSDB_FILE_DELIMITER;
pBlockInfo->uid = TABLE_UID(pReadH->pTable);
pBlockInfo->tid = TABLE_TID(pReadH->pTable);
if (pTSCh->nSubBlocks > 0) {
if (tsdbAllocBuf(&((void *)pTSCh->pBlockInfo), tlen) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
memcpy(POINTER_SHIFT(pTSCh->pBlockInfo, sizeof(SBlockInfo) + sizeof(SBlock) * pTSCh->nBlocks),
(void *)pTSCh->pSubBlock, sizeof(SBlock) * pTSCh->nSubBlocks);
}
taosCalcChecksumAppend(0, (uint8_t *)(pTSCh->pBlockInfo), tlen);
if (taosTWrite(pFile->fd, (void *)pTSCh->pBlockInfo, tlen) < tlen) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pRepo), tlen, pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pTSCh->newBlockIdx.tid = TABLE_TID(pReadH->pTable);
pTSCh->newBlockIdx.uid = TABLE_UID(pReadH->pTable);
pTSCh->newBlockIdx.offset = (uint32_t)(pFile->info.size);
pTSCh->newBlockIdx.numOfBlocks = pTSCh->nBlocks;
pTSCh->newBlockIdx.len = tlen;
pTSCh->newBlockIdx.hasLast = pTSCh->pBlockInfo->blocks[pTSCh->nBlocks - 1].last;
pTSCh->newBlockIdx.maxKey = pTSCh->pBlockInfo->blocks[pTSCh->nBlocks - 1].keyLast;
pFile->info.size += tlen;
pFile->info.magic = taosCalcChecksum(
pFile->info.magic, (uint8_t *)POINTER_SHIFT(pTSCh->pBlockInfo, tlen - sizeof(TSCKSUM)), sizeof(TSCKSUM));
return 0; return 0;
} }
static int tsdbWriteBlockIdx(STSCommitHandle *pTSCh) { static int tsdbWriteBlockIdx(STSCommitHandle *pTSCh) {
SFile *pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_HEAD); ASSERT(pTSCh->nBlockIdx > 0);
SReadHandle *pReadH = pTSCh->pReadH;
STsdbRepo * pRepo = pReadH->pRepo;
SFile * pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_HEAD);
int len = tsdbEncodeBlockIdxArray(pTSCh);
if (len < 0) return -1;
// label checksum
len += sizeof(TSCKSUM);
if (tsdbAllocBuf(&(pReadH->pBuf), len) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
taosCalcChecksumAppend(0, (uint8_t *)(pReadH->pBuf), len);
off_t offset = lseek(pFile->fd, 0, SEEK_END);
if (offset < 0) {
tsdbError("vgId:%d failed to lseek to end of file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (taosTWrite(pFile->fd, pReadH->pBuf, len) < len) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pRepo), len, pFile->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
// Update pFile->info
pFile->info.size += len;
pFile->info.offset = (uint32_t)offset;
pFile->info.len = len;
pFile->info.magic = taosCalcChecksum(pFile->info.magic,
(uint8_t *)POINTER_SHIFT(pReadH->pBuf, len - sizeof(TSCKSUM), sizeof(TSCKSUM)));
ASSERT(pFile->info.size == pFile->info.offset + pFile->info.len);
return 0; return 0;
} }
...@@ -958,29 +1068,37 @@ static int tsdbCommitTableDataImpl(STSCommitHandle *pTSCh, int tid) { ...@@ -958,29 +1068,37 @@ static int tsdbCommitTableDataImpl(STSCommitHandle *pTSCh, int tid) {
SReadHandle *pReadH = pTSCh->pReadH; SReadHandle *pReadH = pTSCh->pReadH;
SDataCols * pDataCols = pTSCh->pDataCols; SDataCols * pDataCols = pTSCh->pDataCols;
SBlockIdx * pOldIdx = pReadH->pCurBlockIdx; SBlockIdx * pOldIdx = pReadH->pCurBlockIdx;
TSKEY keyNext = tsdbNextIterKey(pIter->pIter);
ASSERT((pOldIdx == NULL && (!TSDB_KEY_BEYOND_RANGE(keyNext, pTSCh->maxKey))) || pOldIdx->numOfBlocks > 0);
ASSERT(pOldIdx == NULL || pOldIdx->numOfBlocks > 0); // Initialize
memset((void *)(&(pTSCh->newBlockIdx)), 0, sizeof(pTSCh->newBlockIdx));
pTSCh->nBlocks = 0;
pTSCh->nSubBlocks = 0;
int sidx = 0; int sidx = 0;
int eidx = (pOldIdx == NULL) ? 0 : pOldIdx->numOfBlocks; int eidx = (pOldIdx == NULL) ? 0 : pOldIdx->numOfBlocks;
while (true) { while (true) {
TSKEY keyNext = tsdbNextIterKey(pIter->pIter); if (TSDB_KEY_BEYOND_RANGE(keyNext, pTSCh->maxKey)) break;
if (keyNext > pReadH->maxKey) break; ASSERT(pTSCh->nBlocks == 0 || keyNext > pTSCh->pBlockInfo->blocks[pTSCh->nBlocks-1].keyLast);
void *ptr = taosbsearch((void *)keyNext, (void *)(pReadH->pBlockInfo->blocks + sidx), eidx - sidx, sizeof(SBlock), void *ptr = NULL;
NULL, TD_GE); if (eidx > sidx) {
if (ptr == NULL) { ptr = taosbsearch((void *)keyNext, (void *)(pReadH->pBlockInfo->blocks + sidx), eidx - sidx, sizeof(SBlock),
if (sidx < eidx && pOldIdx->hasLast) { compareKeyBlock, TD_GE);
ptr = pReadH->pBlockInfo->blocks + eidx - 1; }
}
if (ptr == NULL && sidx < eidx && pOldIdx->hasLast) {
ptr = pReadH->pBlockInfo->blocks + eidx - 1;
} }
int bidx = 0; int bidx = 0;
if (ptr == NULL) { if (ptr == NULL) {
bidx = eidx; bidx = eidx;
} else { } else {
bidx = POINTER_DISTANCE(ptr, (void *)pReadH->pBlockInfo->blocks) / sizeof(SBlock); bidx = POINTER_DISTANCE(ptr, (void *)(pReadH->pBlockInfo->blocks)) / sizeof(SBlock);
} }
if (tsdbCopyBlocks(pTSCh, sidx, bidx) < 0) return -1; if (tsdbCopyBlocks(pTSCh, sidx, bidx) < 0) return -1;
...@@ -992,8 +1110,12 @@ static int tsdbCommitTableDataImpl(STSCommitHandle *pTSCh, int tid) { ...@@ -992,8 +1110,12 @@ static int tsdbCommitTableDataImpl(STSCommitHandle *pTSCh, int tid) {
if (tsdbMergeCommit(pTSCh, (SBlock *)ptr) < 0) return -1; if (tsdbMergeCommit(pTSCh, (SBlock *)ptr) < 0) return -1;
sidx++; sidx++;
} }
// Update keyNext
keyNext = tsdbNextIterKey(pIter->pIter);
} }
// Move remaining blocks
if (tsdbCopyBlocks(pTSCh, sidx, eidx) < 0) return -1; if (tsdbCopyBlocks(pTSCh, sidx, eidx) < 0) return -1;
return 0; return 0;
...@@ -1001,28 +1123,29 @@ static int tsdbCommitTableDataImpl(STSCommitHandle *pTSCh, int tid) { ...@@ -1001,28 +1123,29 @@ static int tsdbCommitTableDataImpl(STSCommitHandle *pTSCh, int tid) {
static int tsdbCopyBlocks(STSCommitHandle *pTSCh, int sidx, int eidx) { static int tsdbCopyBlocks(STSCommitHandle *pTSCh, int sidx, int eidx) {
ASSERT(sidx <= eidx); ASSERT(sidx <= eidx);
for (int idx = sidx; idx < eidx; idx++) { for (int idx = sidx; idx < eidx; idx++) {
// TODO if (tsdbCopyBlock(pTSCh, idx) < 0) return -1;
} }
return 0; return 0;
} }
static int tsdbAppendCommit(STSCommitHandle *pTSCh) { static int tsdbAppendCommit(STSCommitHandle *pTSCh) {
SDataCols * pDataCols = pTSCh->pDataCols; SDataCols * pDataCols = pTSCh->pDataCols;
SReadHandle *pReadH = pTSCh->pReadH; SReadHandle *pReadH = pTSCh->pReadH;
STsdbRepo * pRepo = pReadH->pRepo;
STable * pTable = pReadH->pTable; STable * pTable = pReadH->pTable;
SBlock block = {0}; SBlock block = {0};
SBlock * pBlock = &block; SBlock * pBlock = &block;
STsdbCfg * pCfg = &(pRepo->config); STsdbCfg * pCfg = &(pReadH->pRepo->config);
SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable); SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable);
int blockCommitRows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock); int dbrows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock); // default block rows
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
int rowsToRead = tsdbLoadDataFromCache(pTable, pIter->pIter, pReadH->maxKey, blockCommitRows, pDataCols, NULL, 0); int rows = tsdbLoadDataFromCache(pTable, pIter->pIter, pReadH->maxKey, dbrows, pDataCols, NULL, 0);
ASSERT(rowsToRead > 0); ASSERT(rows > 0 && pDataCols->numOfRows == rows);
if (tsdbWriteBlockToProperFile(pTSCh, pDataCols, pBlock) < 0) return -1; if (tsdbWriteBlockToRightFile(pTSCh, pDataCols, pBlock) < 0) return -1;
if (tsdbAddSuperBlock(pTSCh, pBlock) < 0) return -1; if (tsdbAddSuperBlock(pTSCh, pBlock) < 0) return -1;
return -1; return -1;
...@@ -1043,112 +1166,111 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols ...@@ -1043,112 +1166,111 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols
SReadHandle *pReadH = pTSCh->pReadH; SReadHandle *pReadH = pTSCh->pReadH;
STsdbRepo * pRepo = pReadH->pRepo; STsdbRepo * pRepo = pReadH->pRepo;
STsdbCfg * pCfg = &(pRepo->config); STsdbCfg * pCfg = &(pRepo->config);
int64_t offset = 0; int64_t offset = pFile->info.size;
SBlockData * pBlockData = NULL;
int nColsNotAllNull = 0; int nColsNotAllNull = 0;
int csize = TSDB_BLOCK_DATA_LEN(nColsNotAllNull); // column size
int32_t keyLen = 0;
ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_END)); ASSERT(offset == lseek(pFile->fd, 0, SEEK_END));
ASSERT(pDataCols->numOfRows > 0 && pDataCols->numOfRows <= pCfg->maxRowsPerFileBlock); ASSERT(pDataCols->numOfRows > 0 && pDataCols->numOfRows <= pCfg->maxRowsPerFileBlock);
ASSERT(isLast ? pDataCols->numOfRows < pCfg->minRowsPerFileBlock : true); ASSERT(isLast ? pDataCols->numOfRows < pCfg->minRowsPerFileBlock : true);
offset = pFile->info.size; if (tsdbAllocBuf(&((void *)pReadH->pBlockData), csize) < 0) {
int32_t bsize = TSDB_BLOCK_DATA_LEN(0); // total block size terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
int32_t csize = 0; // SBlockCol part size return -1;
}
for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column int32_t coffset = 0; // column data offset
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column
SDataCol * pDataCol = pDataCols->cols + ncol; SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = NULL;
if (isNEleNull(pDataCol, pDataCols->numOfRows)) { // all data to commit are NULL, just ignore it if (ncol != 0) {
continue; if (isNEleNull(pDataCol, pDataCols->numOfRows)) { // all data to commit are NULL, just ignore it
} continue;
}
nColsNotAllNull++; nColsNotAllNull++;
bsize = TSDB_BLOCK_DATA_LEN(nColsNotAllNull); csize = TSDB_BLOCK_DATA_LEN(nColsNotAllNull);
if (tsdbAllocBuf(&(pReadH->pBuf), bsize) < 0) { if (tsdbAllocBuf(&((void *)pReadH->pBlockData), csize) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
} }
pBlockData = (SBlockData *)(pReadH->pBuf);
SBlockCol *pBlockCol = pBlockData->cols + (nColsNotAllNull - 1);
memset(pBlockCol, 0, sizeof(*pBlockCol)); pBlockCol = pReadH->pBlockData->cols + nColsNotAllNull - 1;
memset(pBlockCol, 0, sizeof(*pBlockCol));
pBlockCol->colId = pDataCol->colId; pBlockCol->colId = pDataCol->colId;
pBlockCol->type = pDataCol->type; pBlockCol->type = pDataCol->type;
if (tDataTypeDesc[pDataCol->type].getStatisFunc) { if (tDataTypeDesc[pDataCol->type].getStatisFunc) {
(*tDataTypeDesc[pDataCol->type].getStatisFunc)( (*tDataTypeDesc[pDataCol->type].getStatisFunc)((TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData,
(TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), pDataCols->numOfRows, &(pBlockCol->min), &(pBlockCol->max),
&(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), &(pBlockCol->numOfNull)); &(pBlockCol->sum), &(pBlockCol->minIndex),
&(pBlockCol->maxIndex), &(pBlockCol->numOfNull));
}
} }
}
csize = bsize;
ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols);
// Compress the data if neccessary
int tcol = 0;
int32_t toffset = 0;
int32_t keyLen = 0;
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
if (ncol != 0 && tcol >= nColsNotAllNull) break;
SDataCol * pDataCol = pDataCols->cols + ncol; // compress data if needed
SBlockCol *pBlockCol = pBlockData->cols + tcol; int32_t olen = dataColGetNEleLen(pDataCol, pDataCols->numOfRows);
int32_t blen = olen + COMP_OVERFLOW_BYTES; // allocated buffer length
int32_t clen = 0;
if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue; if (tsdbAllocBuf(&(pReadH->pBuf), coffset + blen + sizeof(TSCKSUM)) < 0) {
void *tptr = POINTER_SHIFT(pBlockData, lsize); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
int32_t flen = 0; // final length void *pData = POINTER_SHIFT(pReadH->pBuf, coffset);
int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite);
if (pCfg->compression) { if (pCfg->compression) {
if (pCfg->compression == TWO_STAGE_COMP) { if (pCfg->compression == TWO_STAGE_COMP) {
pHelper->compBuffer = taosTRealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES); if (tsdbAllocBuf(&(pReadH->pCBuf), blen) < 0) {
if (pHelper->compBuffer == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; return -1;
} }
} }
flen = (*(tDataTypeDesc[pDataCol->type].compFunc))( clen = (*(tDataTypeDesc[pDataCol->type].compFunc))((char *)pDataCol->pData, olen, pDataCols->numOfRows, pData,
(char *)pDataCol->pData, tlen, rowsToWrite, tptr, (int32_t)taosTSizeof(pHelper->pBuffer) - lsize, blen, pCfg->compression, pReadH->pCBuf, blen);
pCfg->compression, pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer));
} else { } else {
flen = tlen; clen = olen;
memcpy(tptr, pDataCol->pData, flen); memcpy(pData, olen);
} }
// Add checksum ASSERT(clen > 0 && clen <= blen);
ASSERT(flen > 0);
flen += sizeof(TSCKSUM); clen += sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)tptr, flen); taosCalcChecksumAppend(0, (uint8_t *)pData, clen);
pFile->info.magic = pFile->info.magic =
taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)), sizeof(TSCKSUM)); taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pData, clen - sizeof(TSCKSUM)), sizeof(TSCKSUM));
if (ncol != 0) { if (ncol != 0) {
pBlockCol->offset = toffset; pReadH->pBlockData->cols[ncol].offset = coffset;
pBlockCol->len = flen; pReadH->pBlockData->cols[ncol].len = clen;
tcol++;
} else { } else {
keyLen = flen; keyLen = clen;
} }
toffset += flen; coffset += clen;
lsize += flen;
} }
ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols);
pReadH->pBlockData->delimiter = TSDB_FILE_DELIMITER;
pReadH->pBlockData->uid = TABLE_UID(pReadH->pTable);
pReadH->pBlockData->numOfCols = nColsNotAllNull;
pBlockData->delimiter = TSDB_FILE_DELIMITER; taosCalcChecksumAppend(0, (uint8_t *)pReadH->pBlockData, csize);
pBlockData->uid = pHelper->tableInfo.uid; pFile->info.magic = taosCalcChecksum(
pBlockData->numOfCols = nColsNotAllNull; pFile->info.magic, (uint8_t *)POINTER_SHIFT(pReadH->pBlockData, csize - sizeof(TSCKSUM)), sizeof(TSCKSUM));
taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize); if (taosTWrite(pFile->fd, (void *)pReadH->pBlockData, csize) < csize) {
pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM)), tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pRepo), csize, pFile->fname, strerror(errno));
sizeof(TSCKSUM)); terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
// Write the whole block to file if (taosTWrite(pFile->fd, pReadH->pBuf, coffset) < coffset) {
if (taosTWrite(pFile->fd, (void *)pBlockData, lsize) < lsize) { tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pRepo), coffset, pFile->fname,
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, pFile->fname,
strerror(errno)); strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -1158,21 +1280,20 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols ...@@ -1158,21 +1280,20 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols
pBlock->last = isLast; pBlock->last = isLast;
pBlock->offset = offset; pBlock->offset = offset;
pBlock->algorithm = pCfg->compression; pBlock->algorithm = pCfg->compression;
pBlock->numOfRows = rowsToWrite; pBlock->numOfRows = pDataCols->numOfRows;
pBlock->len = lsize; pBlock->len = coffset+csize;
pBlock->keyLen = keyLen; pBlock->keyLen = keyLen;
pBlock->numOfSubBlocks = isSuperBlock ? 1 : 0; pBlock->numOfSubBlocks = isSuperBlock ? 1 : 0;
pBlock->numOfCols = nColsNotAllNull; pBlock->numOfCols = nColsNotAllNull;
pBlock->keyFirst = dataColsKeyFirst(pDataCols); pBlock->keyFirst = dataColsKeyFirst(pDataCols);
pBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1); pBlock->keyLast = dataColsKeyLast(pDataCols);
pFile->info.size += pBlock->len;
tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64 tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, pFile->fname, (int64_t)(pBlock->offset), REPO_ID(pRepo), TABLE_TID(pReadH->pTable), pFile->fname, (int64_t)(pBlock->offset),
(int)(pBlock->numOfRows), pBlock->len, pBlock->numOfCols, pBlock->keyFirst, (int)(pBlock->numOfRows), pBlock->len, pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast);
pBlock->keyLast);
pFile->info.size += pBlock->len;
return 0; return 0;
} }
...@@ -1319,5 +1440,103 @@ static int tsdbCommitMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ...@@ -1319,5 +1440,103 @@ static int tsdbCommitMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
} }
} }
} }
return 0;
}
static int tsdbEncodeBlockIdxArray(STSCommitHandle *pTSCh) {
SReadHandle *pReadH = pTSCh->pReadH;
int len = 0;
for (int i = 0; i < pTSCh->nBlockIdx; i++) {
int tlen = tsdbEncodeBlockIdx(NULL, pTSCh->pBlockIdx + i);
if (tsdbAllocBuf(&(pReadH->pBuf), tlen + len) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
void *ptr = POINTER_SHIFT(pReadH->pBuf, len);
tsdbEncodeBlockIdx(&ptr, pTSCh->pBlockIdx + i);
len += tlen;
}
return len;
}
static int tsdbUpdateFileGroupInfo(SFileGroup *pFileGroup) {
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
SFile *pFile = TSDB_FILE_IN_FGROUP(pFileGroup, type);
if (tsdbUpdateFileHeader(pFile) < 0) return -1;
}
return 0;
}
static int tsdbAppendBlockIdx(STSCommitHandle *pTSCh) {
if (tsdbAllocBuf(&((void *)(pTSCh->pBlockIdx)), sizeof(SBlockIdx) * (pTSCh->nBlockIdx + 1)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
pTSCh->pBlockIdx[pTSCh->nBlockIdx++] = pTSCh->newBlockIdx;
return 0;
}
static int tsdbCopyBlock(STSCommitHandle *pTSCh, int bidx) {
SReadHandle *pReadH = pTSCh->pReadH;
SBlock *pBlock = pReadH->pBlockInfo->blocks + bidx;
SFile * pWFile = NULL;
SFile * pRFile = NULL;
SBlock newBlock = {0};
if (pBlock->last) {
pWFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST);
pRFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST);
} else {
pWFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA);
pRFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_DATA);
}
// TODO: use flag to omit this string compare. this may cause a lot of time
if (strncmp(pWFile->fname, pRFile->fname, TSDB_FILENAME_LEN) == 0) {
if (pBlock->numOfSubBlocks == 1) {
} else { // need to copy both super block and sub-blocks
}
} else {
if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1;
if (tsdbWriteBlockToFile(pTSCh, pWFile, pReadH->pDataCols[0], &newBlock, pBlock->last, true) < 0) return -1;
// TODO: add a super block
}
return 0;
}
static int compareKeyBlock(const void *arg1, const void *arg2) {
TSKEY key = *(TSKEY *)arg1;
SBlock *pBlock = (SBlock *)arg2;
if (key < pBlock->keyFirst) {
return -1;
} else if (key > pBlock->keyLast) {
return 1;
}
return 0;
}
static int tsdbAddSuperBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
int tsize = sizeof(SBlockInfo) + sizeof(SBlock) * (pTSCh->nBlocks + 1) + sizeof(TSCKSUM);
if (tsdbAllocBuf(&((void *)pTSCh->pBlockInfo), tsize) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
ASSERT(pTSCh->nBlocks == 0 || pBlock->keyFirst > pTSCh->pBlockInfo->blocks[pTSCh->nBlocks - 1].keyLast);
pTSCh->pBlockInfo->blocks[pTSCh->nBlocks++] = *pBlock;
return 0; return 0;
} }
\ No newline at end of file
...@@ -95,8 +95,6 @@ int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { ...@@ -95,8 +95,6 @@ int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) {
} }
} }
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pFGroup->fileId, &(pReadH->minKey), &(pReadH->maxKey));
return 0; return 0;
} }
......
...@@ -111,21 +111,4 @@ void tsdbResetFGroupFd(SFileGroup *pFGroup) { ...@@ -111,21 +111,4 @@ void tsdbResetFGroupFd(SFileGroup *pFGroup) {
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
TSDB_FILE_IN_FGROUP(pFGroup, type)->fd = -1; TSDB_FILE_IN_FGROUP(pFGroup, type)->fd = -1;
} }
} }
\ No newline at end of file
int tsdbAllocBuf(void **ppBuf, uint32_t size) {
ASSERT(size > 0);
void *pBuf = *pBuf;
uint32_t tsize = taosTSizeof(pBuf);
if (tsize >= size) return 0;
if (tsize == 0) tsize = 1024;
while (tsize < size) {
tsize *= 2;
}
*ppBuf = taosTRealloc(pBuf, tsize);
if (*ppBuf == NULL) return -1;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册