From 286c0903f566803e00f648cf7e0f952889ce2925 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Tue, 28 Apr 2020 17:17:02 +0800 Subject: [PATCH] TD-166 --- src/common/inc/tdataformat.h | 21 ++++ src/common/src/tdataformat.c | 52 +++++++++ src/common/src/ttypes.c | 23 ++-- src/inc/taosdef.h | 4 + src/tsdb/inc/tsdbMain.h | 4 +- src/tsdb/src/tsdbRWHelper.c | 202 +++++++++++++++++++---------------- src/util/CMakeLists.txt | 6 +- 7 files changed, 206 insertions(+), 106 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 3e73cc937b..ddacc1ed01 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -120,6 +120,9 @@ typedef struct SDataCol { } SDataCol; void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints); +bool isNEleNull(SDataCol *pCol, int nEle); +void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints); +void dataColSetOffset(SDataCol *pCol, int nEle, int maxPoints); // Get the data pointer from a column-wised data static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) { @@ -136,6 +139,24 @@ static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) { } } +static FORCE_INLINE void dataColGetNEleStartAndLen(SDataCol *pDataCol, int rows, void **pStart, int32_t *len, int32_t maxPoints) { + void *ptr = NULL; + switch (pDataCol->type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + ptr = tdGetColDataOfRow(pDataCol, rows - 1); + *pStart = (char *)(pDataCol->pData) + sizeof(int32_t) * maxPoints; + *len = (char *)ptr - (char *)(*pStart) + sizeof(int16_t) + *(int16_t *)ptr; + break; + + default: + *pStart = pDataCol->pData; + *len = TYPE_BYTES[pDataCol->type] * rows; + break; + } +} + + typedef struct { int maxRowSize; int maxCols; // max number of columns diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 8d12a6e43b..54402528ba 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -235,6 +235,58 @@ void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoint } } +bool isNEleNull(SDataCol *pCol, int nEle) { + void *ptr = NULL; + switch (pCol->type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + for (int i = 0; i < nEle; i++) { + ptr = tdGetColDataOfRow(pCol, i); + ptr = (void *)((char *)ptr + sizeof(int16_t)); + if (!isNull(ptr, pCol->type)) return false; + } + return true; + default: + for (int i = 0; i < nEle; i++) { + if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false; + } + return true; + } +} + +void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) { + char *ptr = NULL; + switch (pCol->type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + pCol->len = sizeof(int32_t) * maxPoints; + for (int i = 0; i < nEle; i++) { + ((int32_t *)(pCol->pData))[i] = pCol->len; + + ptr = ((char *)pCol->pData) + pCol->len; + *(int16_t *)ptr = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE; + setNull(ptr + sizeof(int16_t), pCol->type, pCol->bytes); + + pCol->len += (sizeof(int16_t) + ((int16_t *)ptr)[0]); + } + break; + default: + setNullN(pCol->pData, pCol->type, pCol->bytes, nEle); + pCol->len = TYPE_BYTES[pCol->type] * nEle; + break; + } +} + +void dataColSetOffset(SDataCol *pCol, int nEle, int maxPoints) { + ASSERT(nEle <= maxPoints && ((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR))); + + char *tptr = (char *)(pCol->pData) + sizeof(int32_t) * maxPoints; + for (int i = 0; i < nEle; i++) { + ((int32_t *)(pCol->pData))[i] = tptr - (char *)(pCol->pData); + tptr = tptr + *(int16_t *)tptr; + } +} + SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows, int exColBytes) { SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols); if (pCols == NULL) return NULL; diff --git a/src/common/src/ttypes.c b/src/common/src/ttypes.c index 2f4aa6ab76..9f392bcae5 100644 --- a/src/common/src/ttypes.c +++ b/src/common/src/ttypes.c @@ -16,6 +16,7 @@ #include "taosdef.h" #include "ttokendef.h" +#include "tscompression.h" const int32_t TYPE_BYTES[11] = { -1, // TSDB_DATA_TYPE_NULL @@ -32,17 +33,17 @@ const int32_t TYPE_BYTES[11] = { }; tDataTypeDescriptor tDataTypeDesc[11] = { - {TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE"}, - {TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL"}, - {TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT"}, - {TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT"}, - {TSDB_DATA_TYPE_INT, 3, INT_BYTES, "INT"}, - {TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT"}, - {TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT"}, - {TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE"}, - {TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY"}, - {TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP"}, - {TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR"}, + {TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", NULL, NULL}, + {TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", tsCompressBool, tsDecompressBool}, + {TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT", tsCompressTinyint, tsDecompressTinyint}, + {TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT", tsCompressSmallint, tsDecompressSmallint}, + {TSDB_DATA_TYPE_INT, 3, INT_BYTES, "INT", tsCompressInt, tsDecompressInt}, + {TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", tsCompressBigint, tsDecompressBigint}, + {TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", tsCompressFloat, tsDecompressFloat}, + {TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", tsCompressDouble, tsDecompressDouble}, + {TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", tsCompressString, tsDecompressString}, + {TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", tsCompressTimestamp, tsDecompressTimestamp}, + {TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", tsCompressString, tsDecompressString}, }; char tTokenTypeSwitcher[13] = { diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 1a3316cdcf..c078bd570d 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -121,6 +121,10 @@ typedef struct tDataTypeDescriptor { int16_t nameLen; int32_t nSize; char * aName; + int (*compFunc)(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, + char algorithm, char *const buffer, int bufferSize); + int (*decompFunc)(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize); } tDataTypeDescriptor; extern tDataTypeDescriptor tDataTypeDesc[11]; diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 8e0064a6ac..7a6735291b 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -297,7 +297,7 @@ typedef struct { // TODO: take pre-calculation into account typedef struct { int16_t colId; // Column ID - int16_t len; // Column length + int16_t len; // Column length // TODO: int16_t is not enough int32_t type : 8; int32_t offset : 24; } SCompCol; @@ -426,6 +426,8 @@ typedef struct { SCompData *pCompData; SDataCols *pDataCols[2]; + void *blockBuffer; // Buffer to hold the whole data block + void *compBuffer; // Buffer for temperary compress/decompress purpose } SRWHelper; // --------- Helper state diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 3bcaa8a8d7..a5cf75ae3b 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -552,61 +552,99 @@ int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, return 0; } +static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfPoints, + int maxPoints, char *buffer, int bufferSize) { + // Verify by checksum + if (!taosCheckChecksumWhole((uint8_t *)content, len)) return -1; + + // Decode the data + if (comp) { + // Need to decompress + void *pStart = NULL; + if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { + pStart = (char *)(pDataCol->pData) + sizeof(int32_t) * maxPoints; + } + // TODO: get rid of INT32_MAX here + pDataCol->len = (*(tDataTypeDesc[pDataCol->type].decompFunc))(content, len - sizeof(TSCKSUM), numOfPoints, pStart, + INT32_MAX, comp, buffer, bufferSize); + if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { + pDataCol->len += (sizeof(int32_t) * maxPoints); + dataColSetOffset(pDataCol, numOfPoints, maxPoints); + } + } else { + // No need to decompress, just memcpy it + switch (pDataCol->type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + pDataCol->len = sizeof(int32_t) * maxPoints; + memcpy((char *)pDataCol->pData + pDataCol->len, content, len - sizeof(TSCKSUM)); + pDataCol->len += (len - sizeof(TSCKSUM)); + dataColSetOffset(pDataCol, numOfPoints, maxPoints); + break; + + default: + pDataCol->len = len - sizeof(TSCKSUM); + memcpy(pDataCol->pData, content, pDataCol->len); + break; + } + } + return 0; +} + /** * Interface to read the data of a sub-block OR the data of a super-block of which (numOfSubBlocks == 1) */ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { ASSERT(pCompBlock->numOfSubBlocks <= 1); - SCompData *pCompData = (SCompData *)malloc(pCompBlock->len); - if (pCompData == NULL) return -1; + pHelper->blockBuffer = trealloc(pHelper->blockBuffer, pCompBlock->len); + if (pHelper->blockBuffer == NULL) return -1; + + SCompData *pCompData = (SCompData *)pHelper->blockBuffer; int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) goto _err; if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) goto _err; ASSERT(pCompData->numOfCols == pCompBlock->numOfCols); - // TODO : check the checksum - size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM); + int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM); if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) goto _err; - for (int i = 0; i < pCompData->numOfCols; i++) { - // TODO: check the data checksum - // if (!taosCheckChecksumWhole()) - } - - ASSERT(pCompBlock->numOfCols == pCompData->numOfCols); pDataCols->numOfPoints = pCompBlock->numOfPoints; - int ccol = 0, dcol = 0; - while (true) { - if (ccol >= pDataCols->numOfCols) { - // TODO: Fill rest NULL - break; + // Recover the data + int ccol = 0; + int dcol = 0; + while (dcol < pDataCols->numOfCols) { + SDataCol *pDataCol = &(pDataCols->cols[dcol]); + if (ccol >= pCompData->numOfCols) { + // Set current column as NULL and forward + dataColSetNEleNull(pDataCol, pCompBlock->numOfPoints, pDataCols->maxPoints); + dcol++; + continue; } - if (dcol >= pCompData->numOfCols) break; SCompCol *pCompCol = &(pCompData->cols[ccol]); - SDataCol *pDataCol = &(pDataCols->cols[dcol]); if (pCompCol->colId == pDataCol->colId) { - // TODO: uncompress - memcpy(pDataCol->pData, (void *)(((char *)pCompData) + tsize + pCompCol->offset), pCompCol->len); - ccol++; - dcol++; - } else if (pCompCol->colId > pDataCol->colId) { - // TODO: Fill NULL + if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len, + pCompBlock->algorithm, pCompBlock->numOfPoints, pDataCols->maxPoints, pHelper->compBuffer, + tsizeof(pHelper->compBuffer)) < 0) + goto _err; dcol++; - } else { ccol++; + } else if (pCompCol->colId < pDataCol->colId) { + ccol++; + } else { + // Set current column as NULL and forward + dataColSetNEleNull(pDataCol, pCompBlock->numOfPoints, pDataCols->maxPoints); + dcol++; } } - tfree(pCompData); return 0; _err: - tfree(pCompData); return -1; } @@ -634,36 +672,6 @@ _err: return -1; } -// static int tsdbCheckHelperCfg(SHelperCfg *pCfg) { -// // TODO -// return 0; -// } - -// static void tsdbClearHelperFile(SHelperFile *pHFile) { -// pHFile->fid = -1; -// if (pHFile->headF.fd > 0) { -// close(pHFile->headF.fd); -// pHFile->headF.fd = -1; -// } -// if (pHFile->dataF.fd > 0) { -// close(pHFile->dataF.fd); -// pHFile->dataF.fd = -1; -// } -// if (pHFile->lastF.fd > 0) { -// close(pHFile->lastF.fd); -// pHFile->lastF.fd = -1; -// } -// if (pHFile->nHeadF.fd > 0) { -// close(pHFile->nHeadF.fd); -// pHFile->nHeadF.fd = -1; -// } -// if (pHFile->nLastF.fd > 0) { -// close(pHFile->nLastF.fd); -// pHFile->nLastF.fd = -1; -// } - -// } - static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) { ASSERT(pHelper->files.lastF.fd > 0); struct stat st; @@ -677,81 +685,93 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints && rowsToWrite <= pHelper->config.maxRowsPerFileBlock); - SCompData *pCompData = NULL; + SCompData *pCompData = (SCompData *)(pHelper->blockBuffer); int64_t offset = 0; offset = lseek(pFile->fd, 0, SEEK_END); if (offset < 0) goto _err; - pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pDataCols->numOfCols + sizeof(TSCKSUM)); - if (pCompData == NULL) goto _err; - int nColsNotAllNull = 0; - int32_t toffset = 0; for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { SDataCol *pDataCol = pDataCols->cols + ncol; SCompCol *pCompCol = pCompData->cols + nColsNotAllNull; - if (0) { - // TODO: all data to commit are NULL + if (isNEleNull(pDataCol, rowsToWrite)) { + // all data to commit are NULL, just ignore it continue; } - // Compress the data here - { - // TODO - } - pCompCol->colId = pDataCol->colId; pCompCol->type = pDataCol->type; - pCompCol->len = TYPE_BYTES[pCompCol->type] * rowsToWrite; // TODO: change it - pCompCol->offset = toffset; nColsNotAllNull++; - - toffset += pCompCol->len; } ASSERT(nColsNotAllNull > 0 && nColsNotAllNull <= pDataCols->numOfCols); - pCompData->delimiter = TSDB_FILE_DELIMITER; - pCompData->uid = pHelper->tableInfo.uid; - pCompData->numOfCols = nColsNotAllNull; - - // Write SCompData + SCompCol part - size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull + sizeof(TSCKSUM); - taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize); - if (twrite(pFile->fd, (void *)pCompData, tsize) < tsize) goto _err; - // Write true data part - int nCompCol = 0; + // Compress the data if neccessary + int tcol = 0; + int32_t toffset = 0; + int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull + sizeof(TSCKSUM); + int32_t lsize = tsize; for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { - ASSERT(nCompCol < nColsNotAllNull); + if (tcol >= nColsNotAllNull) break; SDataCol *pDataCol = pDataCols->cols + ncol; - SCompCol *pCompCol = pCompData->cols + nCompCol; + SCompCol *pCompCol = pCompData->cols + tcol; + + if (pDataCol->colId != pCompCol->colId) continue; + void *tptr = (void *)((char *)pCompData + lsize); - if (pDataCol->colId == pCompCol->colId) { - if (twrite(pFile->fd, (void *)(pDataCol->pData), pCompCol->len) < pCompCol->len) goto _err; - tsize += pCompCol->len; - nCompCol++; + pCompCol->offset = toffset; + + void *pStart = NULL; + int32_t tlen = 0; + + dataColGetNEleStartAndLen(pDataCol, rowsToWrite, &pStart, &tlen, pDataCols->maxPoints); + + // TODO: compresee the data + if (pHelper->config.compress) { + pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))( + (char *)pStart, tlen, rowsToWrite, tptr, tsizeof(pHelper->blockBuffer) - lsize, pHelper->config.compress, + pHelper->compBuffer, tsizeof(pHelper->compBuffer)); + } else { + pCompCol->len = tlen; + memcpy(tptr, pStart, pCompCol->len); } + + // Add checksum + pCompCol->len += sizeof(TSCKSUM); + taosCalcChecksumAppend(0, (uint8_t *)tptr, pCompCol->len); + + toffset += pCompCol->len; + lsize += pCompCol->len; + tcol++; } + pCompData->delimiter = TSDB_FILE_DELIMITER; + pCompData->uid = pHelper->tableInfo.uid; + pCompData->numOfCols = nColsNotAllNull; + + taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize); + + // Write the whole block to file + if (twrite(pFile->fd, (void *)pCompData, lsize) < lsize) goto _err; + + // Update pCompBlock membership vairables pCompBlock->last = isLast; pCompBlock->offset = offset; pCompBlock->algorithm = pHelper->config.compress; pCompBlock->numOfPoints = rowsToWrite; pCompBlock->sversion = pHelper->tableInfo.sversion; - pCompBlock->len = (int32_t)tsize; + pCompBlock->len = (int32_t)lsize; pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0; pCompBlock->numOfCols = nColsNotAllNull; pCompBlock->keyFirst = dataColsKeyFirst(pDataCols); pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1); - tfree(pCompData); return 0; _err: - tfree(pCompData); return -1; } diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index a80e81f09f..d4350fc8b2 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -11,7 +11,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc) AUX_SOURCE_DIRECTORY(src SRC) ADD_LIBRARY(tutil ${SRC}) - TARGET_LINK_LIBRARIES(tutil pthread os m rt) + TARGET_LINK_LIBRARIES(tutil pthread os m rt lz4) FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/) IF (ICONV_INCLUDE_EXIST) ADD_DEFINITIONS(-DUSE_LIBICONV) @@ -68,7 +68,7 @@ ELSEIF (TD_WINDOWS_64) LIST(APPEND SRC ./src/tutil.c) LIST(APPEND SRC ./src/version.c) ADD_LIBRARY(tutil ${SRC}) - TARGET_LINK_LIBRARIES(tutil iconv regex pthread os winmm IPHLPAPI ws2_32) + TARGET_LINK_LIBRARIES(tutil iconv regex pthread os winmm IPHLPAPI ws2_32 lz4) ELSEIF(TD_DARWIN_64) ADD_DEFINITIONS(-DUSE_LIBICONV) LIST(APPEND SRC ./src/hash.c) @@ -105,7 +105,7 @@ ELSEIF(TD_DARWIN_64) LIST(APPEND SRC ./src/version.c) LIST(APPEND SRC ./src/hash.c) ADD_LIBRARY(tutil ${SRC}) - TARGET_LINK_LIBRARIES(tutil iconv pthread os) + TARGET_LINK_LIBRARIES(tutil iconv pthread os lz4) ENDIF() # TARGET_LINK_LIBRARIES(tutil mstorage) -- GitLab