提交 496cbcc6 编写于 作者: H Hongze Cheng

partial work

上级 779b12b2
......@@ -26,6 +26,7 @@
#include "tsdb.h"
#include "tskiplist.h"
#include "tutil.h"
#include "tchecksum.h"
#include "tfs.h"
#ifdef __cplusplus
......@@ -355,9 +356,11 @@ int tsdbOpenDFile(SDFile* pDFile, int flags);
void tsdbCloseDFile(SDFile* pDFile);
int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence);
int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte);
int64_t tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte, int64_t* offset);
int64_t tsdbTellDFile(SDFile* pDFile);
int tsdbEncodeDFile(void** buf, SDFile* pDFile);
void* tsdbDecodeDFile(void* buf, SDFile* pDFile);
void tsdbUpdateDFileMagic(SDFile* pDFile, void* pCksm);
typedef struct {
int fid;
......
......@@ -93,11 +93,12 @@ struct SReadH {
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
#define TSDB_READ_FSET(rh) &((rh)->rSet)
#define TSDB_READ_TABLE(ch) ((rh)->pTable)
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
#define TSDB_READ_BUF(rh) (rh)->pBuf
#define TSDB_READ_COMP_BUF(rh) (rh)->pCBuf
#define TSDB_READ_BUF(rh) ((rh)->pBuf)
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_BLOCK_STATIS_SIZE(ncols) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM))
......
......@@ -38,10 +38,14 @@ typedef struct {
} SCommitH;
#define TSDB_COMMIT_REPO(ch) TSDB_READ_REPO(&(ch->readh))
#define TSDB_COMMIT_REPO_ID(ch) REPO_ID(TSDB_READ_REPO(&(ch->readh)))
#define TSDB_COMMIT_WRITE_FSET(ch) ((ch)->pWSet)
#define TSDB_COMMIT_TABLE(ch) TSDB_READ_TABLE(&(ch->readh))
#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD)
#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA)
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&(ch->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&(ch->readh))
void *tsdbCommitData(STsdbRepo *pRepo) {
if (tsdbStartCommit(pRepo) < 0) {
......@@ -659,7 +663,114 @@ _err:
static int tsdbWriteBlockInfo(SCommitH *pCommih) {
SDFile *pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
// TODO
SBlockIdx blkIdx;
STable * pTable = TSDB_COMMIT_TABLE(pCommih);
SBlock * pBlock;
size_t nSupBlocks;
size_t nSubBlocks;
uint32_t tlen;
SBlockInfo *pBlkInfo;
int64_t offset;
nSupBlocks = taosArrayGetSize(pCommih->aSupBlk);
nSubBlocks = taosArrayGetSize(pCommih->aSubBlk);
tlen = sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM);
ASSERT(nSupBlocks > 0);
// Write SBlockInfo part
if (tsdbMakeRoom((void **)(&(TSDB_COMMIT_BUF(pCommih))), tlen) < 0) return -1;
pBlkInfo = TSDB_COMMIT_BUF(pCommih);
pBlkInfo->delimiter = TSDB_FILE_DELIMITER;
pBlkInfo->tid = TABLE_TID(pTable);
pBlkInfo->uid = TABLE_UID(pTable);
memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pCommih->aSupBlk, 0), nSupBlocks * sizeof(SBlock));
if (nSubBlocks > 0) {
memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pCommih->aSubBlk, 0), nSubBlocks * sizeof(SBlock));
for (int i = 0; i < nSupBlocks; i++) {
pBlock = pBlkInfo->blocks + i;
if (pBlock->numOfSubBlocks > 1) {
pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks);
}
}
}
taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen);
offset = tsdbSeekDFile(pHeadf, 0, SEEK_END);
if (offset < 0) {
tsdbError("vgId:%d failed to write block info part to file %s while seek since %s", TSDB_COMMIT_REPO_ID(pCommih),
TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno));
return -1;
}
if (tsdbWriteDFile(pHeadf, TSDB_COMMIT_BUF(pCommih), tlen) < tlen) {
tsdbError("vgId:%d failed to write block info part to file %s since %s", TSDB_COMMIT_REPO_ID(pCommih),
TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno));
return -1;
}
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM)));
// Set blkIdx
pBlock = taosArrayGet(pCommih->aSupBlk, nSupBlocks - 1);
blkIdx.tid = TABLE_TID(pTable);
blkIdx.uid = TABLE_UID(pTable);
blkIdx.hasLast = pBlock->last ? 1 : 0;
blkIdx.maxKey = pBlock->keyLast;
blkIdx.numOfBlocks = nSupBlocks;
blkIdx.len = tlen;
blkIdx.offset = (uint32_t)offset;
ASSERT(blkIdx.numOfBlocks > 0);
if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static int tsdbWriteBlockIdx(SCommitH *pCommih) {
SBlockIdx *pBlkIdx;
SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
size_t nidx = taosArrayGetSize(pCommih->aBlkIdx);
int tlen = 0, size;
int64_t offset;
ASSERT(nidx > 0);
for (size_t i = 0; i < nidx; i++) {
pBlkIdx = (SBlockIdx *)taosArrayGet(pCommih->aBlkIdx, i);
size = tsdbEncodeSBlockIdx(NULL, pBlkIdx);
if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommih)), tlen + size) < 0) return -1;
void *ptr = POINTER_SHIFT(TSDB_COMMIT_BUF(pCommih), tlen);
tsdbEncodeSBlockIdx(&ptr, pBlkIdx);
tlen += size;
}
tlen += sizeof(TSCKSUM);
if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommih)), tlen) < 0) return -1;
taosCalcChecksumAppend(0, (uint8_t *)TSDB_COMMIT_BUF(pCommih), tlen);
if (tsdbAppendDFile(pHeadf, TSDB_COMMIT_BUF(pCommih), tlen, &offset) < tlen) {
tsdbError("vgId:%d failed to write block index part to file %s since %s", TSDB_COMMIT_REPO_ID(pCommih),
TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno));
return -1;
}
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(TSDB_COMMIT_BUF(pCommih), tlen - sizeof(TSCKSUM)));
pHeadf->info.offset = offset;
pHeadf->info.len = tlen;
return 0;
}
\ No newline at end of file
......@@ -173,6 +173,19 @@ int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
return -1;
}
return nwrite;
}
int64_t tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nwrite;
*offset = tsdbSeekDFile(pDFile, 0, SEEK_SET);
if (*offset < 0) return -1;
nwrite = tsdbWriteDFile(pDFile, buf, nbyte);
if (nwrite < 0) return nwrite;
pDFile->info.size += nbyte;
return nwrite;
}
......@@ -207,6 +220,10 @@ void *tsdbDecodeDFile(void *buf, SDFile *pDFile) {
return buf;
}
void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm) {
pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM));
}
static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
int tlen = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册