From d630c7d0d8a0d6a42b63212bf6d4bd2aa4f9adc5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 11 Jan 2021 22:58:05 +0800 Subject: [PATCH] more work --- src/tsdb/inc/tsdbReadImpl.h | 2 +- src/tsdb/src/tsdbCommit.c | 117 ++++++++++++++++++------------------ 2 files changed, 61 insertions(+), 58 deletions(-) diff --git a/src/tsdb/inc/tsdbReadImpl.h b/src/tsdb/inc/tsdbReadImpl.h index 99dcef3ffc..63fdb1864a 100644 --- a/src/tsdb/inc/tsdbReadImpl.h +++ b/src/tsdb/inc/tsdbReadImpl.h @@ -93,7 +93,7 @@ struct SReadH { #define TSDB_READ_REPO(rh) ((rh)->pRepo) #define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh)) #define TSDB_READ_FSET(rh) (&((rh)->rSet)) -#define TSDB_READ_TABLE(ch) ((rh)->pTable) +#define TSDB_READ_TABLE(rh) ((rh)->pTable) #define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD) #define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA) #define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST) diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index c894cec442..5dd3dc70bc 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -706,42 +706,48 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { } } -static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, +static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, bool isSuper) { - // TODO - STsdbCfg * pCfg = &(pHelper->pRepo->config); - SBlockData *pCompData = (SBlockData *)(pHelper->pBuffer); + STsdbRepo * pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg * pCfg = REPO_CFG(pRepo); + SBlockData *pBlockData; int64_t offset = 0; + STable * pTable = TSDB_COMMIT_TABLE(pCommith); int rowsToWrite = pDataCols->numOfRows; ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); - ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true); + ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock); - offset = lseek(pFile->fd, 0, SEEK_END); + // Seek file + offset = tsdbSeekDFile(pDFile, 0, SEEK_END); if (offset < 0) { - tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), TSDB_FILE_NAME(pFile), - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; + return -1; + } + + // Make buffer space + if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommith)), TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) { + return -1; } + pBlockData = (SBlockData *)TSDB_COMMIT_BUF(pCommith); + // Get # of cols not all NULL(not including key column) int nColsNotAllNull = 0; for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column SDataCol * pDataCol = pDataCols->cols + ncol; - SBlockCol *pCompCol = pCompData->cols + nColsNotAllNull; + SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull; if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it continue; } - memset(pCompCol, 0, sizeof(*pCompCol)); + memset(pBlockCol, 0, sizeof(*pBlockCol)); - pCompCol->colId = pDataCol->colId; - pCompCol->type = pDataCol->type; + pBlockCol->colId = pDataCol->colId; + pBlockCol->type = pDataCol->type; if (tDataTypeDesc[pDataCol->type].getStatisFunc) { (*tDataTypeDesc[pDataCol->type].getStatisFunc)( - (TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max), - &(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull)); + (TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), + &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), &(pBlockCol->numOfNull)); } nColsNotAllNull++; } @@ -749,35 +755,41 @@ static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCol ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); // Compress the data if neccessary - int tcol = 0; + int tcol = 0; // counter of not all NULL and written columns int32_t toffset = 0; - int32_t tsize = TSDB_GET_COMPCOL_LEN(nColsNotAllNull); + int32_t tsize = TSDB_BLOCK_STATIS_SIZE(nColsNotAllNull); int32_t lsize = tsize; int32_t keyLen = 0; for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { + // All not NULL columns finish if (ncol != 0 && tcol >= nColsNotAllNull) break; SDataCol * pDataCol = pDataCols->cols + ncol; - SBlockCol *pCompCol = pCompData->cols + tcol; + SBlockCol *pBlockCol = pBlockData->cols + tcol; - if (ncol != 0 && (pDataCol->colId != pCompCol->colId)) continue; - void *tptr = POINTER_SHIFT(pCompData, lsize); + if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue; - int32_t flen = 0; // final length + int32_t flen; // final length int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite); + void * tptr; - if (pCfg->compression) { - if (pCfg->compression == TWO_STAGE_COMP) { - pHelper->compBuffer = taosTRealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES); - if (pHelper->compBuffer == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - } + // Make room + if (tsdbMakeRoom((void **)TSDB_COMMIT_BUF(pCommith), lsize + tlen + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) { + return -1; + } + pBlockData = (SBlockData *)TSDB_COMMIT_BUF(pCommith); + tptr = POINTER_SHIFT(pBlockData, lsize); - flen = (*(tDataTypeDesc[pDataCol->type].compFunc))( - (char *)pDataCol->pData, tlen, rowsToWrite, tptr, (int32_t)taosTSizeof(pHelper->pBuffer) - lsize, - pCfg->compression, pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer)); + if (pCfg->compression == TWO_STAGE_COMP && + tsdbMakeRoom((void **)TSDB_COMMIT_COMP_BUF(pCommith), tlen + COMP_OVERFLOW_BYTES) < 0) { + return -1; + } + + // Compress or just copy + if (pCfg->compression) { + flen = (*(tDataTypeDesc[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr, + tlen + COMP_OVERFLOW_BYTES, pCfg->compression, + TSDB_COMMIT_COMP_BUF(pCommith), tlen + COMP_OVERFLOW_BYTES); } else { flen = tlen; memcpy(tptr, pDataCol->pData, flen); @@ -787,12 +799,11 @@ static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCol ASSERT(flen > 0); flen += sizeof(TSCKSUM); taosCalcChecksumAppend(0, (uint8_t *)tptr, flen); - pFile->info.magic = - taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)), sizeof(TSCKSUM)); + tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM))); if (ncol != 0) { - pCompCol->offset = toffset; - pCompCol->len = flen; + pBlockCol->offset = toffset; + pBlockCol->len = flen; tcol++; } else { keyLen = flen; @@ -802,20 +813,16 @@ static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCol lsize += flen; } - pCompData->delimiter = TSDB_FILE_DELIMITER; - pCompData->uid = pHelper->tableInfo.uid; - pCompData->numOfCols = nColsNotAllNull; + pBlockData->delimiter = TSDB_FILE_DELIMITER; + pBlockData->uid = TABLE_UID(pTable); + pBlockData->numOfCols = nColsNotAllNull; - taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize); - pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pCompData, tsize - sizeof(TSCKSUM)), - sizeof(TSCKSUM)); + taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize); + tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM))); // Write the whole block to file - if (taosWrite(pFile->fd, (void *)pCompData, lsize) < lsize) { - tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, - TSDB_FILE_NAME(pFile), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; + if (tsdbWriteDFile(pDFile, (void *)pBlockData, lsize < lsize)) { + return -1; } // Update pBlock membership vairables @@ -828,20 +835,16 @@ static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCol pBlock->numOfSubBlocks = isSuper ? 1 : 0; pBlock->numOfCols = nColsNotAllNull; pBlock->keyFirst = dataColsKeyFirst(pDataCols); - pBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1); + pBlock->keyLast = dataColsKeyLast(pDataCols); + + pDFile->info.size += pBlock->len; tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64 " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, - REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, TSDB_FILE_NAME(pFile), (int64_t)(pBlock->offset), - (int)(pBlock->numOfRows), pBlock->len, pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast); - - pFile->info.size += pBlock->len; - // ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR)); + REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len, + pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast); return 0; - -_err: - return -1; } static int tsdbWriteBlockInfo(SCommitH *pCommih) { -- GitLab