提交 d630c7d0 编写于 作者: H Hongze Cheng

more work

上级 8c5a3a95
...@@ -93,7 +93,7 @@ struct SReadH { ...@@ -93,7 +93,7 @@ struct SReadH {
#define TSDB_READ_REPO(rh) ((rh)->pRepo) #define TSDB_READ_REPO(rh) ((rh)->pRepo)
#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh)) #define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
#define TSDB_READ_FSET(rh) (&((rh)->rSet)) #define TSDB_READ_FSET(rh) (&((rh)->rSet))
#define TSDB_READ_TABLE(ch) ((rh)->pTable) #define TSDB_READ_TABLE(rh) ((rh)->pTable)
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD) #define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA) #define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST) #define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
......
...@@ -706,42 +706,48 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { ...@@ -706,42 +706,48 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
} }
} }
static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isSuper) { bool isSuper) {
// TODO STsdbRepo * pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = &(pHelper->pRepo->config); STsdbCfg * pCfg = REPO_CFG(pRepo);
SBlockData *pCompData = (SBlockData *)(pHelper->pBuffer); SBlockData *pBlockData;
int64_t offset = 0; int64_t offset = 0;
STable * pTable = TSDB_COMMIT_TABLE(pCommith);
int rowsToWrite = pDataCols->numOfRows; int rowsToWrite = pDataCols->numOfRows;
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true); ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock);
offset = lseek(pFile->fd, 0, SEEK_END); // Seek file
offset = tsdbSeekDFile(pDFile, 0, SEEK_END);
if (offset < 0) { if (offset < 0) {
tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), TSDB_FILE_NAME(pFile), return -1;
strerror(errno)); }
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; // Make buffer space
if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommith)), TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) {
return -1;
} }
pBlockData = (SBlockData *)TSDB_COMMIT_BUF(pCommith);
// Get # of cols not all NULL(not including key column)
int nColsNotAllNull = 0; int nColsNotAllNull = 0;
for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column
SDataCol * pDataCol = pDataCols->cols + ncol; SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol *pCompCol = pCompData->cols + nColsNotAllNull; SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull;
if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it
continue; continue;
} }
memset(pCompCol, 0, sizeof(*pCompCol)); memset(pBlockCol, 0, sizeof(*pBlockCol));
pCompCol->colId = pDataCol->colId; pBlockCol->colId = pDataCol->colId;
pCompCol->type = pDataCol->type; pBlockCol->type = pDataCol->type;
if (tDataTypeDesc[pDataCol->type].getStatisFunc) { if (tDataTypeDesc[pDataCol->type].getStatisFunc) {
(*tDataTypeDesc[pDataCol->type].getStatisFunc)( (*tDataTypeDesc[pDataCol->type].getStatisFunc)(
(TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max), (TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max),
&(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull)); &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), &(pBlockCol->numOfNull));
} }
nColsNotAllNull++; nColsNotAllNull++;
} }
...@@ -749,35 +755,41 @@ static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCol ...@@ -749,35 +755,41 @@ static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCol
ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols);
// Compress the data if neccessary // Compress the data if neccessary
int tcol = 0; int tcol = 0; // counter of not all NULL and written columns
int32_t toffset = 0; int32_t toffset = 0;
int32_t tsize = TSDB_GET_COMPCOL_LEN(nColsNotAllNull); int32_t tsize = TSDB_BLOCK_STATIS_SIZE(nColsNotAllNull);
int32_t lsize = tsize; int32_t lsize = tsize;
int32_t keyLen = 0; int32_t keyLen = 0;
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
// All not NULL columns finish
if (ncol != 0 && tcol >= nColsNotAllNull) break; if (ncol != 0 && tcol >= nColsNotAllNull) break;
SDataCol * pDataCol = pDataCols->cols + ncol; SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol *pCompCol = pCompData->cols + tcol; SBlockCol *pBlockCol = pBlockData->cols + tcol;
if (ncol != 0 && (pDataCol->colId != pCompCol->colId)) continue; if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue;
void *tptr = POINTER_SHIFT(pCompData, lsize);
int32_t flen = 0; // final length int32_t flen; // final length
int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite); int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite);
void * tptr;
if (pCfg->compression) { // Make room
if (pCfg->compression == TWO_STAGE_COMP) { if (tsdbMakeRoom((void **)TSDB_COMMIT_BUF(pCommith), lsize + tlen + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) {
pHelper->compBuffer = taosTRealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES); return -1;
if (pHelper->compBuffer == NULL) { }
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; pBlockData = (SBlockData *)TSDB_COMMIT_BUF(pCommith);
goto _err; tptr = POINTER_SHIFT(pBlockData, lsize);
}
}
flen = (*(tDataTypeDesc[pDataCol->type].compFunc))( if (pCfg->compression == TWO_STAGE_COMP &&
(char *)pDataCol->pData, tlen, rowsToWrite, tptr, (int32_t)taosTSizeof(pHelper->pBuffer) - lsize, tsdbMakeRoom((void **)TSDB_COMMIT_COMP_BUF(pCommith), tlen + COMP_OVERFLOW_BYTES) < 0) {
pCfg->compression, pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer)); return -1;
}
// Compress or just copy
if (pCfg->compression) {
flen = (*(tDataTypeDesc[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr,
tlen + COMP_OVERFLOW_BYTES, pCfg->compression,
TSDB_COMMIT_COMP_BUF(pCommith), tlen + COMP_OVERFLOW_BYTES);
} else { } else {
flen = tlen; flen = tlen;
memcpy(tptr, pDataCol->pData, flen); memcpy(tptr, pDataCol->pData, flen);
...@@ -787,12 +799,11 @@ static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCol ...@@ -787,12 +799,11 @@ static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCol
ASSERT(flen > 0); ASSERT(flen > 0);
flen += sizeof(TSCKSUM); flen += sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)tptr, flen); taosCalcChecksumAppend(0, (uint8_t *)tptr, flen);
pFile->info.magic = tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)));
taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)), sizeof(TSCKSUM));
if (ncol != 0) { if (ncol != 0) {
pCompCol->offset = toffset; pBlockCol->offset = toffset;
pCompCol->len = flen; pBlockCol->len = flen;
tcol++; tcol++;
} else { } else {
keyLen = flen; keyLen = flen;
...@@ -802,20 +813,16 @@ static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCol ...@@ -802,20 +813,16 @@ static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCol
lsize += flen; lsize += flen;
} }
pCompData->delimiter = TSDB_FILE_DELIMITER; pBlockData->delimiter = TSDB_FILE_DELIMITER;
pCompData->uid = pHelper->tableInfo.uid; pBlockData->uid = TABLE_UID(pTable);
pCompData->numOfCols = nColsNotAllNull; pBlockData->numOfCols = nColsNotAllNull;
taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize); taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize);
pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pCompData, tsize - sizeof(TSCKSUM)), tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM)));
sizeof(TSCKSUM));
// Write the whole block to file // Write the whole block to file
if (taosWrite(pFile->fd, (void *)pCompData, lsize) < lsize) { if (tsdbWriteDFile(pDFile, (void *)pBlockData, lsize < lsize)) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, return -1;
TSDB_FILE_NAME(pFile), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
} }
// Update pBlock membership vairables // Update pBlock membership vairables
...@@ -828,20 +835,16 @@ static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCol ...@@ -828,20 +835,16 @@ static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCol
pBlock->numOfSubBlocks = isSuper ? 1 : 0; pBlock->numOfSubBlocks = isSuper ? 1 : 0;
pBlock->numOfCols = nColsNotAllNull; pBlock->numOfCols = nColsNotAllNull;
pBlock->keyFirst = dataColsKeyFirst(pDataCols); pBlock->keyFirst = dataColsKeyFirst(pDataCols);
pBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1); pBlock->keyLast = dataColsKeyLast(pDataCols);
pDFile->info.size += pBlock->len;
tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64 tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, TSDB_FILE_NAME(pFile), (int64_t)(pBlock->offset), REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len,
(int)(pBlock->numOfRows), pBlock->len, pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast); pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast);
pFile->info.size += pBlock->len;
// ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR));
return 0; return 0;
_err:
return -1;
} }
static int tsdbWriteBlockInfo(SCommitH *pCommih) { static int tsdbWriteBlockInfo(SCommitH *pCommih) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册