diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 88ce13e560a1e2ecdab9566222f282d978b9801f..c0cfa4d3af3fc142bb15afa3d84bff8b99ba35ad 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -631,47 +631,44 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, return TSDB_CODE_SUCCESS; } -static void trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { - int32_t firstPartLen = 0; - - STableMeta* pTableMeta = pTableDataBlock->pTableMeta; +static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { + // TODO: optimize this function + int len = 0; + + STableMeta* pTableMeta = pTableDataBlock->pTableMeta; STableComInfo tinfo = tscGetTableInfo(pTableMeta); - SSchema* pSchema = tscGetTableSchema(pTableMeta); - + SSchema* pSchema = tscGetTableSchema(pTableMeta); + + SSubmitBlk* pBlock = pDataBlock; memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk)); pDataBlock += sizeof(SSubmitBlk); - - int32_t total = sizeof(int32_t)*2; - for(int32_t i = 0; i < tinfo.numOfColumns; ++i) { - switch (pSchema[i].type) { - case TSDB_DATA_TYPE_NCHAR: - case TSDB_DATA_TYPE_BINARY: { - assert(0); // not support binary yet - firstPartLen += sizeof(int32_t);break; - } - default: - firstPartLen += tDataTypeDesc[pSchema[i].type].nSize; - total += tDataTypeDesc[pSchema[i].type].nSize; - } + + int32_t flen = 0; + for (int32_t i = 0; i < tinfo.numOfColumns; ++i) { + flen += TYPE_BYTES[pSchema[i].type]; } - + char* p = pTableDataBlock->pData + sizeof(SSubmitBlk); - - SSubmitBlk* pBlock = (SSubmitBlk*) pTableDataBlock->pData; - int32_t rows = htons(pBlock->numOfRows); - - for(int32_t i = 0; i < rows; ++i) { - *(int32_t*) pDataBlock = total; - pDataBlock += sizeof(int32_t); - - *(int32_t*) pDataBlock = firstPartLen; - pDataBlock += sizeof(int32_t); - - memcpy(pDataBlock, p, pTableDataBlock->rowSize); - - p += pTableDataBlock->rowSize; - pDataBlock += pTableDataBlock->rowSize; + pBlock->len = 0; + for (int32_t i = 0; i < htons(pBlock->numOfRows); ++i) { + SDataRow trow = (SDataRow)pDataBlock; + dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen); + + int toffset = 0; + for (int32_t j = 0; j < tinfo.numOfColumns; j++) { + tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset); + toffset += TYPE_BYTES[pSchema[j].type]; + p += pSchema[j].bytes; + } + + // p += pTableDataBlock->rowSize; + pDataBlock += dataRowLen(trow); + pBlock->len += dataRowLen(trow); } + + len = pBlock->len; + pBlock->len = htonl(pBlock->len); + return len; } int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) { @@ -734,7 +731,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi pBlocks->len = htonl(len); // erase the empty space reserved for binary data - trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock); + len = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock); dataBuf->size += (len + sizeof(SSubmitBlk)); dataBuf->numOfTables += 1; } diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 17aa19cce7be5d5a35707ef68cb297c7ba6889a0..489635420a1f278ec4e52db5e87513d4a9e55239 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -20,6 +20,7 @@ #include #include "taosdef.h" +#include "tutil.h" #ifdef __cplusplus extern "C" { @@ -30,7 +31,7 @@ typedef struct { int8_t type; // Column type int16_t colId; // column ID int32_t bytes; // column bytes - int32_t offset; // point offset in a row data + int32_t offset; // point offset in SDataRow after the header part } STColumn; #define colType(col) ((col)->type) @@ -43,26 +44,25 @@ typedef struct { #define colSetBytes(col, b) (colBytes(col) = (b)) #define colSetOffset(col, o) (colOffset(col) = (o)) -STColumn *tdNewCol(int8_t type, int16_t colId, int16_t bytes); -void tdFreeCol(STColumn *pCol); -void tdColCpy(STColumn *dst, STColumn *src); -void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes); - // ----------------- TSDB SCHEMA DEFINITION typedef struct { + int totalCols; // Total columns allocated int numOfCols; // Number of columns appended - int padding; // Total columns allocated + int tlen; // maximum length of a SDataRow without the header part + int flen; // First part length in a SDataRow after the header part STColumn columns[]; } STSchema; #define schemaNCols(s) ((s)->numOfCols) +#define schemaTotalCols(s) ((s)->totalCols) +#define schemaTLen(s) ((s)->tlen) +#define schemaFLen(s) ((s)->flen) #define schemaColAt(s, i) ((s)->columns + i) STSchema *tdNewSchema(int32_t nCols); -int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes); +#define tdFreeSchema(s) tfree((s)) +int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes); STSchema *tdDupSchema(STSchema *pSchema); -void tdFreeSchema(STSchema *pSchema); -void tdUpdateSchema(STSchema *pSchema); int tdGetSchemaEncodeSize(STSchema *pSchema); void * tdEncodeSchema(void *dst, STSchema *pSchema); STSchema *tdDecodeSchema(void **psrc); @@ -70,53 +70,100 @@ STSchema *tdDecodeSchema(void **psrc); // ----------------- Data row structure /* A data row, the format is like below: - * +----------+---------+---------------------------------+---------------------------------+ - * | int32_t | int32_t | | | - * +----------+---------+---------------------------------+---------------------------------+ - * | len | flen | First part | Second part | - * +----------+---------+---------------------------------+---------------------------------+ - * plen: first part length - * len: the length including sizeof(row) + sizeof(len) - * row: actual row data encoding + * |<------------------------------------- len ---------------------------------->| + * |<--Head ->|<--------- flen -------------->| | + * +----------+---------------------------------+---------------------------------+ + * | int32_t | | | + * +----------+---------------------------------+---------------------------------+ + * | len | First part | Second part | + * +----------+---------------------------------+---------------------------------+ */ typedef void *SDataRow; - -#define TD_DATA_ROW_HEAD_SIZE (2 * sizeof(int32_t)) +#define TD_DATA_ROW_HEAD_SIZE sizeof(int32_t) #define dataRowLen(r) (*(int32_t *)(r)) -#define dataRowFLen(r) (*(int32_t *)((char *)(r) + sizeof(int32_t))) -#define dataRowTuple(r) ((char *)(r) + TD_DATA_ROW_HEAD_SIZE) +#define dataRowTuple(r) POINTER_DRIFT(r, TD_DATA_ROW_HEAD_SIZE) #define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r))) #define dataRowSetLen(r, l) (dataRowLen(r) = (l)) -#define dataRowSetFLen(r, l) (dataRowFLen(r) = (l)) -#define dataRowIdx(r, i) ((char *)(r) + i) #define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r)) -#define dataRowAt(r, idx) ((char *)(r) + (idx)) +#define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE) -void tdInitDataRow(SDataRow row, STSchema *pSchema); -int tdMaxRowBytesFromSchema(STSchema *pSchema); -SDataRow tdNewDataRow(int32_t bytes, STSchema *pSchema); SDataRow tdNewDataRowFromSchema(STSchema *pSchema); void tdFreeDataRow(SDataRow row); -int tdAppendColVal(SDataRow row, void *value, STColumn *pCol); -void tdDataRowReset(SDataRow row, STSchema *pSchema); +void tdInitDataRow(SDataRow row, STSchema *pSchema); +int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset); SDataRow tdDataRowDup(SDataRow row); +// NOTE: offset here including the header size +static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) { + switch (type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + return POINTER_DRIFT(row, *(VarDataOffsetT *)POINTER_DRIFT(row, offset)); + break; + default: + return POINTER_DRIFT(row, offset); + break; + } +} + // ----------------- Data column structure typedef struct SDataCol { - int8_t type; - int16_t colId; - int bytes; - int len; - int offset; - void * pData; // Original data + int8_t type; // column type + int16_t colId; // column ID + int bytes; // column data bytes defined + int offset; // data offset in a SDataRow (including the header size) + int spaceSize; // Total space size for this column + int len; // column data length + VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column + void * pData; // Actual data pointer } SDataCol; +static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } + +void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints); +void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints); +void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfPoints); +void dataColSetOffset(SDataCol *pCol, int nEle); + +bool isNEleNull(SDataCol *pCol, int nEle); +void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints); + +// Get the data pointer from a column-wised data +static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) { + switch (pCol->type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + return POINTER_DRIFT(pCol->pData, pCol->dataOff[row]); + break; + + default: + return POINTER_DRIFT(pCol->pData, TYPE_BYTES[pCol->type] * row); + break; + } +} + +static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) { + ASSERT(rows > 0); + + switch (pDataCol->type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + return pDataCol->dataOff[rows - 1] + varDataTLen(tdGetColDataOfRow(pDataCol, rows - 1)); + break; + default: + return TYPE_BYTES[pDataCol->type] * rows; + } +} + + typedef struct { int maxRowSize; int maxCols; // max number of columns int maxPoints; // max number of points + int bufSize; + int numOfPoints; int numOfCols; // Total number of cols int sversion; // TODO: set sversion @@ -125,7 +172,7 @@ typedef struct { } SDataCols; #define keyCol(pCols) (&((pCols)->cols[0])) // Key column -#define dataColsKeyAt(pCols, idx) ((int64_t *)(keyCol(pCols)->pData))[(idx)] +#define dataColsKeyAt(pCols, idx) ((TSKEY *)(keyCol(pCols)->pData))[(idx)] #define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0) #define dataColsKeyLast(pCols) dataColsKeyAt(pCols, (pCols)->numOfPoints - 1) diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index aff6d7f77369b2e22ada1a4e8bbb3531b2eddd90..7321e1c921bab082f3074b087cab12b30da00c4c 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -13,72 +13,7 @@ * along with this program. If not, see . */ #include "tdataformat.h" -#include "tutil.h" - -static int tdFLenFromSchema(STSchema *pSchema); - -/** - * Create a new STColumn object - * ASSUMPTIONS: VALID PARAMETERS - * - * @param type column type - * @param colId column ID - * @param bytes maximum bytes the col taken - * - * @return a STColumn object on success - * NULL for failure - */ -STColumn *tdNewCol(int8_t type, int16_t colId, int16_t bytes) { - if (!isValidDataType(type, 0)) return NULL; - - STColumn *pCol = (STColumn *)calloc(1, sizeof(STColumn)); - if (pCol == NULL) return NULL; - - colSetType(pCol, type); - colSetColId(pCol, colId); - colSetOffset(pCol, -1); - switch (type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - colSetBytes(pCol, bytes); - break; - default: - colSetBytes(pCol, TYPE_BYTES[type]); - break; - } - - return pCol; -} - -/** - * Free a STColumn object CREATED with tdNewCol - */ -void tdFreeCol(STColumn *pCol) { - if (pCol) free(pCol); -} - -/** - * Copy from source to destinition - */ -void tdColCpy(STColumn *dst, STColumn *src) { memcpy((void *)dst, (void *)src, sizeof(STColumn)); } - -/** - * Set the column - */ -void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes) { - colSetType(pCol, type); - colSetColId(pCol, colId); - switch (type) - { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - colSetBytes(pCol, bytes); - break; - default: - colSetBytes(pCol, TYPE_BYTES[type]); - break; - } -} +#include "wchar.h" /** * Create a SSchema object with nCols columns @@ -90,11 +25,15 @@ void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes) { * NULL for failure */ STSchema *tdNewSchema(int32_t nCols) { - int32_t size = sizeof(STSchema) + sizeof(STColumn) * nCols; + int32_t size = sizeof(STSchema) + sizeof(STColumn) * nCols; STSchema *pSchema = (STSchema *)calloc(1, size); if (pSchema == NULL) return NULL; + pSchema->numOfCols = 0; + pSchema->totalCols = nCols; + pSchema->flen = 0; + pSchema->tlen = 0; return pSchema; } @@ -102,25 +41,34 @@ STSchema *tdNewSchema(int32_t nCols) { /** * Append a column to the schema */ -int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes) { - // if (pSchema->numOfCols >= pSchema->totalCols) return -1; - if (!isValidDataType(type, 0)) return -1; +int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes) { + if (!isValidDataType(type, 0) || pSchema->numOfCols >= pSchema->totalCols) return -1; STColumn *pCol = schemaColAt(pSchema, schemaNCols(pSchema)); colSetType(pCol, type); colSetColId(pCol, colId); - colSetOffset(pCol, -1); + if (schemaNCols(pSchema) == 0) { + colSetOffset(pCol, 0); + } else { + STColumn *pTCol = schemaColAt(pSchema, schemaNCols(pSchema)-1); + colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]); + } switch (type) { case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: - colSetBytes(pCol, bytes); + colSetBytes(pCol, bytes); // Set as maximum bytes + pSchema->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes); break; default: colSetBytes(pCol, TYPE_BYTES[type]); + pSchema->tlen += TYPE_BYTES[type]; break; } pSchema->numOfCols++; + pSchema->flen += TYPE_BYTES[type]; + + ASSERT(pCol->offset < pSchema->flen); return 0; } @@ -138,40 +86,22 @@ STSchema *tdDupSchema(STSchema *pSchema) { return tSchema; } -/** - * Free the SSchema object created by tdNewSchema or tdDupSchema - */ -void tdFreeSchema(STSchema *pSchema) { - if (pSchema != NULL) free(pSchema); -} - -/** - * Function to update each columns's offset field in the schema. - * ASSUMPTIONS: VALID PARAMETERS - */ -void tdUpdateSchema(STSchema *pSchema) { - STColumn *pCol = NULL; - int32_t offset = TD_DATA_ROW_HEAD_SIZE; - for (int i = 0; i < schemaNCols(pSchema); i++) { - pCol = schemaColAt(pSchema, i); - colSetOffset(pCol, offset); - offset += TYPE_BYTES[pCol->type]; - } -} - /** * Return the size of encoded schema */ int tdGetSchemaEncodeSize(STSchema *pSchema) { - return sizeof(STSchema) + schemaNCols(pSchema) * (T_MEMBER_SIZE(STColumn, type) + T_MEMBER_SIZE(STColumn, colId) + - T_MEMBER_SIZE(STColumn, bytes)); + return T_MEMBER_SIZE(STSchema, totalCols) + + schemaNCols(pSchema) * + (T_MEMBER_SIZE(STColumn, type) + T_MEMBER_SIZE(STColumn, colId) + T_MEMBER_SIZE(STColumn, bytes)); } /** * Encode a schema to dst, and return the next pointer */ void *tdEncodeSchema(void *dst, STSchema *pSchema) { - T_APPEND_MEMBER(dst, pSchema, STSchema, numOfCols); + ASSERT(pSchema->numOfCols == pSchema->totalCols); + + T_APPEND_MEMBER(dst, pSchema, STSchema, totalCols); for (int i = 0; i < schemaNCols(pSchema); i++) { STColumn *pCol = schemaColAt(pSchema, i); T_APPEND_MEMBER(dst, pCol, STColumn, type); @@ -186,13 +116,13 @@ void *tdEncodeSchema(void *dst, STSchema *pSchema) { * Decode a schema from a binary. */ STSchema *tdDecodeSchema(void **psrc) { - int numOfCols = 0; + int totalCols = 0; - T_READ_MEMBER(*psrc, int, numOfCols); + T_READ_MEMBER(*psrc, int, totalCols); - STSchema *pSchema = tdNewSchema(numOfCols); + STSchema *pSchema = tdNewSchema(totalCols); if (pSchema == NULL) return NULL; - for (int i = 0; i < numOfCols; i++) { + for (int i = 0; i < totalCols; i++) { int8_t type = 0; int16_t colId = 0; int32_t bytes = 0; @@ -200,7 +130,7 @@ STSchema *tdDecodeSchema(void **psrc) { T_READ_MEMBER(*psrc, int16_t, colId); T_READ_MEMBER(*psrc, int32_t, bytes); - tdSchemaAppendCol(pSchema, type, colId, bytes); + tdSchemaAddCol(pSchema, type, colId, bytes); } return pSchema; @@ -209,53 +139,18 @@ STSchema *tdDecodeSchema(void **psrc) { /** * Initialize a data row */ -void tdInitDataRow(SDataRow row, STSchema *pSchema) { - dataRowSetFLen(row, TD_DATA_ROW_HEAD_SIZE); - dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + tdFLenFromSchema(pSchema)); -} +void tdInitDataRow(SDataRow row, STSchema *pSchema) { dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema)); } -/** - * Create a data row with maximum row length bytes. - * - * NOTE: THE AAPLICATION SHOULD MAKE SURE BYTES IS LARGE ENOUGH TO - * HOLD THE WHOE ROW. - * - * @param bytes max bytes a row can take - * @return SDataRow object for success - * NULL for failure - */ -SDataRow tdNewDataRow(int32_t bytes, STSchema *pSchema) { - int32_t size = sizeof(int32_t) + bytes; +SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { + int32_t size = dataRowMaxBytesFromSchema(pSchema); SDataRow row = malloc(size); if (row == NULL) return NULL; tdInitDataRow(row, pSchema); - return row; } -/** - * Get maximum bytes a data row from a schema - * ASSUMPTIONS: VALID PARAMETER - */ -int tdMaxRowBytesFromSchema(STSchema *pSchema) { - // TODO - int bytes = TD_DATA_ROW_HEAD_SIZE; - for (int i = 0; i < schemaNCols(pSchema); i++) { - STColumn *pCol = schemaColAt(pSchema, i); - bytes += TYPE_BYTES[pCol->type]; - - if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) { - bytes += pCol->bytes; - } - } - - return bytes; -} - -SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { return tdNewDataRow(tdMaxRowBytesFromSchema(pSchema), pSchema); } - /** * Free the SDataRow object */ @@ -265,28 +160,49 @@ void tdFreeDataRow(SDataRow row) { /** * Append a column value to the data row + * @param type: column type + * @param bytes: column bytes + * @param offset: offset in the data row tuple, not including the data row header */ -int tdAppendColVal(SDataRow row, void *value, STColumn *pCol) { - switch (colType(pCol)) - { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - *(int32_t *)dataRowAt(row, dataRowFLen(row)) = dataRowLen(row); - dataRowFLen(row) += TYPE_BYTES[colType(pCol)]; - memcpy((void *)dataRowAt(row, dataRowLen(row)), value, strlen(value)); - dataRowLen(row) += strlen(value); - break; - default: - memcpy(dataRowAt(row, dataRowFLen(row)), value, TYPE_BYTES[colType(pCol)]); - dataRowFLen(row) += TYPE_BYTES[colType(pCol)]; - break; +int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) { + ASSERT(value != NULL); + int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE; + char * ptr = POINTER_DRIFT(row, dataRowLen(row)); + + switch (type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + // set offset + *(VarDataOffsetT *)POINTER_DRIFT(row, toffset) = dataRowLen(row); + + // set length + VarDataLenT slen = 0; + if (isNull(value, type)) { + slen = (type == TSDB_DATA_TYPE_BINARY) ? sizeof(int8_t) : sizeof(int32_t); + } else { + if (type == TSDB_DATA_TYPE_BINARY) { + slen = strnlen((char *)value, bytes); + } else { + slen = wcsnlen((wchar_t *)value, (bytes) / TSDB_NCHAR_SIZE) * TSDB_NCHAR_SIZE; + } + } + + ASSERT(slen <= bytes); + *(VarDataLenT *)ptr = slen; + ptr = POINTER_DRIFT(ptr, sizeof(VarDataLenT)); + + memcpy((void *)ptr, value, slen); + dataRowLen(row) += (sizeof(int16_t) + slen); + + break; + default: + memcpy(POINTER_DRIFT(row, toffset), value, TYPE_BYTES[type]); + break; } return 0; } -void tdDataRowReset(SDataRow row, STSchema *pSchema) { tdInitDataRow(row, pSchema); } - SDataRow tdDataRowDup(SDataRow row) { SDataRow trow = malloc(dataRowLen(row)); if (trow == NULL) return NULL; @@ -295,6 +211,119 @@ SDataRow tdDataRowDup(SDataRow row) { return trow; } +void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) { + pDataCol->type = colType(pCol); + pDataCol->colId = colColId(pCol); + pDataCol->bytes = colBytes(pCol); + pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE; + + pDataCol->len = 0; + if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { + pDataCol->spaceSize = (sizeof(VarDataLenT) + pDataCol->bytes) * maxPoints; + pDataCol->dataOff = (VarDataOffsetT *)(*pBuf); + pDataCol->pData = POINTER_DRIFT(*pBuf, TYPE_BYTES[pDataCol->type] * maxPoints); + *pBuf = POINTER_DRIFT(*pBuf, pDataCol->spaceSize + TYPE_BYTES[pDataCol->type] * maxPoints); + } else { + pDataCol->spaceSize = pDataCol->bytes * maxPoints; + pDataCol->dataOff = NULL; + pDataCol->pData = *pBuf; + *pBuf = POINTER_DRIFT(*pBuf, pDataCol->spaceSize); + } + +} + +void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints) { + ASSERT(pCol != NULL && value != NULL); + + switch (pCol->type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + // set offset + pCol->dataOff[numOfPoints] = pCol->len; + // Copy data + memcpy(POINTER_DRIFT(pCol->pData, pCol->len), value, varDataTLen(value)); + // Update the length + pCol->len += varDataTLen(value); + break; + default: + ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfPoints); + memcpy(POINTER_DRIFT(pCol->pData, pCol->len), value, pCol->bytes); + pCol->len += pCol->bytes; + break; + } +} + +void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfPoints) { + int pointsLeft = numOfPoints - pointsToPop; + + ASSERT(pointsLeft > 0); + + if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) { + ASSERT(pCol->len > 0); + VarDataOffsetT toffset = pCol->dataOff[pointsToPop]; + pCol->len = pCol->len - toffset; + ASSERT(pCol->len > 0); + memmove(pCol->pData, POINTER_DRIFT(pCol->pData, toffset), pCol->len); + dataColSetOffset(pCol, pointsLeft); + } else { + ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfPoints); + pCol->len = TYPE_BYTES[pCol->type] * pointsLeft; + memmove(pCol->pData, POINTER_DRIFT(pCol->pData, TYPE_BYTES[pCol->type] * pointsToPop), pCol->len); + } +} + +bool isNEleNull(SDataCol *pCol, int nEle) { + switch (pCol->type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + for (int i = 0; i < nEle; i++) { + if (!isNull(varDataVal(tdGetColDataOfRow(pCol, i)), pCol->type)) return false; + } + return true; + default: + for (int i = 0; i < nEle; i++) { + if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false; + } + return true; + } +} + +void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) { + char *ptr = NULL; + switch (pCol->type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + pCol->len = 0; + for (int i = 0; i < nEle; i++) { + pCol->dataOff[i] = pCol->len; + ptr = (char *)pCol->pData + pCol->len; + varDataLen(ptr) = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE; + setNull(ptr + sizeof(VarDataLenT), pCol->type, pCol->bytes); + pCol->len += varDataTLen(ptr); + } + + break; + default: + setNullN(pCol->pData, pCol->type, pCol->bytes, nEle); + pCol->len = TYPE_BYTES[pCol->type] * nEle; + break; + } +} + +void dataColSetOffset(SDataCol *pCol, int nEle) { + ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR))); + + void * tptr = pCol->pData; + // char *tptr = (char *)(pCol->pData); + + VarDataOffsetT offset = 0; + for (int i = 0; i < nEle; i++) { + pCol->dataOff[i] = offset; + offset += varDataTLen(tptr); + tptr = POINTER_DRIFT(tptr, varDataTLen(tptr)); + } +} + SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols); if (pCols == NULL) return NULL; @@ -302,8 +331,9 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { pCols->maxRowSize = maxRowSize; pCols->maxCols = maxCols; pCols->maxPoints = maxRows; + pCols->bufSize = maxRowSize * maxRows; - pCols->buf = malloc(maxRowSize * maxRows); + pCols->buf = malloc(pCols->bufSize); if (pCols->buf == NULL) { free(pCols); return NULL; @@ -317,24 +347,16 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { tdResetDataCols(pCols); pCols->numOfCols = schemaNCols(pSchema); - pCols->cols[0].pData = pCols->buf; - int offset = TD_DATA_ROW_HEAD_SIZE; + void *ptr = pCols->buf; for (int i = 0; i < schemaNCols(pSchema); i++) { - if (i > 0) { - pCols->cols[i].pData = (char *)(pCols->cols[i - 1].pData) + schemaColAt(pSchema, i - 1)->bytes * pCols->maxPoints; - } - pCols->cols[i].type = colType(schemaColAt(pSchema, i)); - pCols->cols[i].bytes = colBytes(schemaColAt(pSchema, i)); - pCols->cols[i].offset = offset; - pCols->cols[i].colId = colColId(schemaColAt(pSchema, i)); - - offset += TYPE_BYTES[pCols->cols[i].type]; + dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints); + ASSERT((char *)ptr - (char *)(pCols->buf) <= pCols->bufSize); } } void tdFreeDataCols(SDataCols *pCols) { if (pCols) { - if (pCols->buf) free(pCols->buf); + tfree(pCols->buf); free(pCols); } } @@ -351,11 +373,24 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { pRet->cols[i].type = pDataCols->cols[i].type; pRet->cols[i].colId = pDataCols->cols[i].colId; pRet->cols[i].bytes = pDataCols->cols[i].bytes; - pRet->cols[i].len = pDataCols->cols[i].len; pRet->cols[i].offset = pDataCols->cols[i].offset; + + pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize; pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf))); - if (keepData) memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pRet->cols[i].bytes * pDataCols->numOfPoints); + if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) { + ASSERT(pDataCols->cols[i].dataOff != NULL); + pRet->cols[i].dataOff = + (int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf))); + } + + if (keepData) { + pRet->cols[i].len = pDataCols->cols[i].len; + memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); + if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) { + memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints); + } + } } return pRet; @@ -364,57 +399,60 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { void tdResetDataCols(SDataCols *pCols) { pCols->numOfPoints = 0; for (int i = 0; i < pCols->maxCols; i++) { - pCols->cols[i].len = 0; + dataColReset(pCols->cols + i); } } void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { + ASSERT(dataColsKeyLast(pCols) < dataRowKey(row)); + for (int i = 0; i < pCols->numOfCols; i++) { SDataCol *pCol = pCols->cols + i; - memcpy((void *)((char *)(pCol->pData) + pCol->len), dataRowAt(row, pCol->offset), pCol->bytes); - pCol->len += pCol->bytes; + void * value = tdGetRowDataOfCol(row, pCol->type, pCol->offset); + + dataColAppendVal(pCol, value, pCols->numOfPoints, pCols->maxPoints); } pCols->numOfPoints++; } + // Pop pointsToPop points from the SDataCols void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { int pointsLeft = pCols->numOfPoints - pointsToPop; + if (pointsLeft <= 0) { + tdResetDataCols(pCols); + return; + } for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { - SDataCol *p_col = pCols->cols + iCol; - if (p_col->len > 0) { - p_col->len = TYPE_BYTES[p_col->type] * pointsLeft; - if (pointsLeft > 0) { - memmove((void *)(p_col->pData), (void *)((char *)(p_col->pData) + TYPE_BYTES[p_col->type] * pointsToPop), p_col->len); - } - } + SDataCol *pCol = pCols->cols + iCol; + dataColPopPoints(pCol, pointsToPop, pCols->numOfPoints); } pCols->numOfPoints = pointsLeft; } -/** - * Return the first part length of a data row for a schema - */ -static int tdFLenFromSchema(STSchema *pSchema) { - int ret = 0; - for (int i = 0; i < schemaNCols(pSchema); i++) { - STColumn *pCol = schemaColAt(pSchema, i); - ret += TYPE_BYTES[pCol->type]; - } - - return ret; -} - int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfPoints); + ASSERT(target->numOfPoints + rowsToMerge <= target->maxPoints); + ASSERT(target->numOfCols == source->numOfCols); - SDataCols *pTarget = tdDupDataCols(target, true); - if (pTarget == NULL) goto _err; - // tdResetDataCols(target); + SDataCols *pTarget = NULL; - int iter1 = 0; - int iter2 = 0; - tdMergeTwoDataCols(target,pTarget, &iter1, source, &iter2, pTarget->numOfPoints + rowsToMerge); + if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap + for (int i = 0; i < rowsToMerge; i++) { + for (int j = 0; j < source->numOfCols; j++) { + dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfPoints, + target->maxPoints); + } + } + target->numOfPoints++; + } else { + pTarget = tdDupDataCols(target, true); + if (pTarget == NULL) goto _err; + + int iter1 = 0; + int iter2 = 0; + tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfPoints + rowsToMerge); + } tdFreeDataCols(pTarget); return 0; @@ -425,6 +463,7 @@ _err: } void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows) { + // TODO: add resolve duplicate key here tdResetDataCols(target); while (target->numOfPoints < tRows) { @@ -436,10 +475,8 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCol if (key1 < key2) { for (int i = 0; i < src1->numOfCols; i++) { ASSERT(target->cols[i].type == src1->cols[i].type); - memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), - (void *)((char *)(src1->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * (*iter1)), - TYPE_BYTES[target->cols[i].type]); - target->cols[i].len += TYPE_BYTES[target->cols[i].type]; + dataColAppendVal(target->cols[i].pData, tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfPoints, + target->maxPoints); } target->numOfPoints++; @@ -447,15 +484,14 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCol } else if (key1 > key2) { for (int i = 0; i < src2->numOfCols; i++) { ASSERT(target->cols[i].type == src2->cols[i].type); - memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), - (void *)((char *)(src2->cols[i].pData) + TYPE_BYTES[src2->cols[i].type] * (*iter2)), - TYPE_BYTES[target->cols[i].type]); - target->cols[i].len += TYPE_BYTES[target->cols[i].type]; + dataColAppendVal(target->cols[i].pData, tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfPoints, + target->maxPoints); } target->numOfPoints++; (*iter2)++; } else { + // TODO: deal with duplicate keys ASSERT(false); } } diff --git a/src/common/src/ttypes.c b/src/common/src/ttypes.c index 2f4aa6ab765c71fa841ff5246e3e88ae37fd054b..d99e916c73974b81ac8e8f08a305445ef76e31e0 100644 --- a/src/common/src/ttypes.c +++ b/src/common/src/ttypes.c @@ -16,33 +16,34 @@ #include "taosdef.h" #include "ttokendef.h" +#include "tscompression.h" const int32_t TYPE_BYTES[11] = { - -1, // TSDB_DATA_TYPE_NULL - sizeof(int8_t), // TSDB_DATA_TYPE_BOOL - sizeof(int8_t), // TSDB_DATA_TYPE_TINYINT - sizeof(int16_t), // TSDB_DATA_TYPE_SMALLINT - sizeof(int32_t), // TSDB_DATA_TYPE_INT - sizeof(int64_t), // TSDB_DATA_TYPE_BIGINT - sizeof(float), // TSDB_DATA_TYPE_FLOAT - sizeof(double), // TSDB_DATA_TYPE_DOUBLE - sizeof(int32_t), // TSDB_DATA_TYPE_BINARY - sizeof(TSKEY), // TSDB_DATA_TYPE_TIMESTAMP - sizeof(int32_t) // TSDB_DATA_TYPE_NCHAR + -1, // TSDB_DATA_TYPE_NULL + sizeof(int8_t), // TSDB_DATA_TYPE_BOOL + sizeof(int8_t), // TSDB_DATA_TYPE_TINYINT + sizeof(int16_t), // TSDB_DATA_TYPE_SMALLINT + sizeof(int32_t), // TSDB_DATA_TYPE_INT + sizeof(int64_t), // TSDB_DATA_TYPE_BIGINT + sizeof(float), // TSDB_DATA_TYPE_FLOAT + sizeof(double), // TSDB_DATA_TYPE_DOUBLE + sizeof(VarDataOffsetT), // TSDB_DATA_TYPE_BINARY + sizeof(TSKEY), // TSDB_DATA_TYPE_TIMESTAMP + sizeof(VarDataOffsetT) // TSDB_DATA_TYPE_NCHAR }; tDataTypeDescriptor tDataTypeDesc[11] = { - {TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE"}, - {TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL"}, - {TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT"}, - {TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT"}, - {TSDB_DATA_TYPE_INT, 3, INT_BYTES, "INT"}, - {TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT"}, - {TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT"}, - {TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE"}, - {TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY"}, - {TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP"}, - {TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR"}, + {TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", NULL, NULL}, + {TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", tsCompressBool, tsDecompressBool}, + {TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT", tsCompressTinyint, tsDecompressTinyint}, + {TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT", tsCompressSmallint, tsDecompressSmallint}, + {TSDB_DATA_TYPE_INT, 3, INT_BYTES, "INT", tsCompressInt, tsDecompressInt}, + {TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", tsCompressBigint, tsDecompressBigint}, + {TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", tsCompressFloat, tsDecompressFloat}, + {TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", tsCompressDouble, tsDecompressDouble}, + {TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", tsCompressString, tsDecompressString}, + {TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", tsCompressTimestamp, tsDecompressTimestamp}, + {TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", tsCompressString, tsDecompressString}, }; char tTokenTypeSwitcher[13] = { diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 09ae1e730f61990ad6cafd061971f99785a52c6f..ce0d52d737f9d447993f5b96a30e82b732921637 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -32,6 +32,13 @@ extern "C" { #define TSKEY int64_t #endif +// ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR +typedef int32_t VarDataOffsetT; +typedef int16_t VarDataLenT; +#define varDataLen(v) ((VarDataLenT *)(v))[0] +#define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v)) +#define varDataVal(v) ((void *)((char *)v + sizeof(VarDataLenT))) + // this data type is internally used only in 'in' query to hold the values #define TSDB_DATA_TYPE_ARRAY (TSDB_DATA_TYPE_NCHAR + 1) @@ -121,6 +128,10 @@ typedef struct tDataTypeDescriptor { int16_t nameLen; int32_t nSize; char * aName; + int (*compFunc)(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, + char algorithm, char *const buffer, int bufferSize); + int (*decompFunc)(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize); } tDataTypeDescriptor; extern tDataTypeDescriptor tDataTypeDesc[11]; diff --git a/src/query/tests/astTest.cpp b/src/query/tests/astTest.cpp index dee85ef63002b91e08cf32d7f1eb6417c1afac7c..d767e7ad7b427dd99c841c622a26ccda1a8141e9 100644 --- a/src/query/tests/astTest.cpp +++ b/src/query/tests/astTest.cpp @@ -582,7 +582,7 @@ void exprSerializeTest1() { tExprTreeDestroy(&p1, nullptr); tExprTreeDestroy(&p2, nullptr); - tbufClose(&bw); + // tbufClose(&bw); } void exprSerializeTest2() { @@ -627,7 +627,7 @@ void exprSerializeTest2() { tExprTreeDestroy(&p1, nullptr); tExprTreeDestroy(&p2, nullptr); - tbufClose(&bw); + // tbufClose(&bw); } } // namespace TEST(testCase, astTest) { diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 8e0064a6ac5d5d2abc30c93566f2e327f59fd162..fb77975d25c742b84f2adb41631cc521e152c4eb 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -153,17 +153,16 @@ typedef struct { } SCacheMem; typedef struct { - int maxBytes; int cacheBlockSize; int totalCacheBlocks; STsdbCachePool pool; STsdbCacheBlock *curBlock; SCacheMem * mem; SCacheMem * imem; - TsdbRepoT * pRepo; + TsdbRepoT * pRepo; } STsdbCache; -STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, TsdbRepoT *pRepo); +STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo); void tsdbFreeCache(STsdbCache *pCache); void * tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key); @@ -297,7 +296,7 @@ typedef struct { // TODO: take pre-calculation into account typedef struct { int16_t colId; // Column ID - int16_t len; // Column length + int16_t len; // Column length // TODO: int16_t is not enough int32_t type : 8; int32_t offset : 24; } SCompCol; @@ -426,6 +425,8 @@ typedef struct { SCompData *pCompData; SDataCols *pDataCols[2]; + void *blockBuffer; // Buffer to hold the whole data block + void *compBuffer; // Buffer for temperary compress/decompress purpose } SRWHelper; // --------- Helper state @@ -445,13 +446,11 @@ typedef struct { int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo); int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo); -// int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg); void tsdbDestroyHelper(SRWHelper *pHelper); void tsdbResetHelper(SRWHelper *pHelper); // --------- For set operations int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup); -// void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema); void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo); int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError); diff --git a/src/tsdb/src/tsdbCache.c b/src/tsdb/src/tsdbCache.c index 3e241773ed27204a4525254b61e07796b395ce02..9351bc602b5f34746381cbc8c712429b02a6561a 100644 --- a/src/tsdb/src/tsdbCache.c +++ b/src/tsdb/src/tsdbCache.c @@ -21,29 +21,25 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache); static void tsdbFreeBlockList(SList *list); static void tsdbFreeCacheMem(SCacheMem *mem); -STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, TsdbRepoT *pRepo) { +STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo) { STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache)); if (pCache == NULL) return NULL; if (cacheBlockSize < 0) cacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE; cacheBlockSize *= (1024 * 1024); - if (maxBytes < 0) maxBytes = cacheBlockSize * TSDB_DEFAULT_TOTAL_BLOCKS; + if (totalBlocks <= 1) totalBlocks = TSDB_DEFAULT_TOTAL_BLOCKS; - pCache->maxBytes = maxBytes; pCache->cacheBlockSize = cacheBlockSize; + pCache->totalCacheBlocks = totalBlocks; pCache->pRepo = pRepo; - int nBlocks = maxBytes / cacheBlockSize + 1; - if (nBlocks <= 1) nBlocks = 2; - pCache->totalCacheBlocks = nBlocks; - STsdbCachePool *pPool = &(pCache->pool); pPool->index = 0; pPool->memPool = tdListNew(sizeof(STsdbCacheBlock *)); if (pPool->memPool == NULL) goto _err; - for (int i = 0; i < nBlocks; i++) { + for (int i = 0; i < totalBlocks; i++) { STsdbCacheBlock *pBlock = (STsdbCacheBlock *)malloc(sizeof(STsdbCacheBlock) + cacheBlockSize); if (pBlock == NULL) { goto _err; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index ca0065bf1e69c7816ffe280640def1e9a6f5bbae..22fb5036e21ea2028e6400e39709e102e2902760 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -5,6 +5,7 @@ #include "tsdb.h" #include "tsdbMain.h" #include "tscompression.h" +#include "tchecksum.h" #define TSDB_DEFAULT_PRECISION TSDB_PRECISION_MILLI // default precision #define IS_VALID_PRECISION(precision) (((precision) >= TSDB_PRECISION_MILLI) && ((precision) <= TSDB_PRECISION_NANO)) @@ -202,7 +203,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) { return NULL; } - pRepo->tsdbCache = tsdbInitCache(-1, -1, (TsdbRepoT *)pRepo); + pRepo->tsdbCache = tsdbInitCache(pRepo->config.cacheBlockSize, pRepo->config.totalBlocks, (TsdbRepoT *)pRepo); if (pRepo->tsdbCache == NULL) { tsdbFreeMeta(pRepo->tsdbMeta); free(pRepo->rootDir); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 9b606fa50a58ce1f6972baad5b8921fa59a2a255..ecd4c0225be7b513d076ee7870ebe988333df2e8 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -242,7 +242,7 @@ int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId id, int32_t colId, int16_t* assert(pCol != NULL); SDataRow row = (SDataRow)pTable->tagVal; - char* d = dataRowAt(row, TD_DATA_ROW_HEAD_SIZE); + char* d = dataRowTuple(row); *val = d; *type = pCol->type; @@ -451,9 +451,8 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { // Update the pMeta->maxCols and pMeta->maxRowBytes if (pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_NORMAL_TABLE) { if (schemaNCols(pTable->schema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pTable->schema); - int bytes = tdMaxRowBytesFromSchema(pTable->schema); + int bytes = dataRowMaxBytesFromSchema(pTable->schema); if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes; - tdUpdateSchema(pTable->schema); } return tsdbAddTableIntoMap(pMeta, pTable); @@ -524,5 +523,5 @@ static int tsdbEstimateTableEncodeSize(STable *pTable) { char *getTupleKey(const void * data) { SDataRow row = (SDataRow)data; - return dataRowAt(row, TD_DATA_ROW_HEAD_SIZE); + return POINTER_DRIFT(row, TD_DATA_ROW_HEAD_SIZE); } \ No newline at end of file diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 25989a23224993913193e753503a32f3d17a9135..ee2f29ea5558b754147ffc9c0760e8213c854d55 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -131,6 +131,11 @@ static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t // Init block part if (tsdbInitHelperBlock(pHelper) < 0) goto _err; + pHelper->blockBuffer = + tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pHelper->config.maxCols + + pHelper->config.maxRowSize * pHelper->config.maxRowsPerFileBlock + sizeof(TSCKSUM)); + if (pHelper->blockBuffer == NULL) goto _err; + return 0; _err: @@ -149,6 +154,8 @@ int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { void tsdbDestroyHelper(SRWHelper *pHelper) { if (pHelper) { + tzfree(pHelper->blockBuffer); + tzfree(pHelper->compBuffer); tsdbDestroyHelperFile(pHelper); tsdbDestroyHelperTable(pHelper); tsdbDestroyHelperBlock(pHelper); @@ -330,7 +337,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { int blkIdx = (pCompBlock == NULL) ? (pIdx->numOfBlocks - 1) : (pCompBlock - pHelper->pCompInfo->blocks); if (pCompBlock == NULL) { // No key overlap, must has last block, just merge with the last block - ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last); + ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last); rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols); if (rowsToWrite < 0) goto _err; } else { // Has key overlap @@ -552,61 +559,97 @@ int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, return 0; } +static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfPoints, + int maxPoints, char *buffer, int bufferSize) { + // Verify by checksum + if (!taosCheckChecksumWhole((uint8_t *)content, len)) return -1; + + // Decode the data + if (comp) { + // // Need to decompress + pDataCol->len = (*(tDataTypeDesc[pDataCol->type].decompFunc))( + content, len - sizeof(TSCKSUM), numOfPoints, pDataCol->pData, pDataCol->spaceSize, comp, buffer, bufferSize); + if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { + pDataCol->len += (sizeof(int32_t) * maxPoints); + dataColSetOffset(pDataCol, numOfPoints); + } + } else { + // No need to decompress, just memcpy it + switch (pDataCol->type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + pDataCol->len = sizeof(int32_t) * maxPoints; + memcpy((char *)pDataCol->pData + pDataCol->len, content, len - sizeof(TSCKSUM)); + pDataCol->len += (len - sizeof(TSCKSUM)); + dataColSetOffset(pDataCol, numOfPoints); + break; + + default: + pDataCol->len = len - sizeof(TSCKSUM); + memcpy(pDataCol->pData, content, pDataCol->len); + break; + } + } + return 0; +} + /** * Interface to read the data of a sub-block OR the data of a super-block of which (numOfSubBlocks == 1) */ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { ASSERT(pCompBlock->numOfSubBlocks <= 1); - SCompData *pCompData = (SCompData *)malloc(pCompBlock->len); - if (pCompData == NULL) return -1; + ASSERT(tsizeof(pHelper->blockBuffer) >= pCompBlock->len); + + SCompData *pCompData = (SCompData *)pHelper->blockBuffer; int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) goto _err; if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) goto _err; ASSERT(pCompData->numOfCols == pCompBlock->numOfCols); - // TODO : check the checksum - size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM); + int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM); if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) goto _err; - for (int i = 0; i < pCompData->numOfCols; i++) { - // TODO: check the data checksum - // if (!taosCheckChecksumWhole()) - } - - ASSERT(pCompBlock->numOfCols == pCompData->numOfCols); pDataCols->numOfPoints = pCompBlock->numOfPoints; - int ccol = 0, dcol = 0; - while (true) { - if (ccol >= pDataCols->numOfCols) { - // TODO: Fill rest NULL - break; + // Recover the data + int ccol = 0; + int dcol = 0; + while (dcol < pDataCols->numOfCols) { + SDataCol *pDataCol = &(pDataCols->cols[dcol]); + if (ccol >= pCompData->numOfCols) { + // Set current column as NULL and forward + dataColSetNEleNull(pDataCol, pCompBlock->numOfPoints, pDataCols->maxPoints); + dcol++; + continue; } - if (dcol >= pCompData->numOfCols) break; SCompCol *pCompCol = &(pCompData->cols[ccol]); - SDataCol *pDataCol = &(pDataCols->cols[dcol]); if (pCompCol->colId == pDataCol->colId) { - // TODO: uncompress - memcpy(pDataCol->pData, (void *)(((char *)pCompData) + tsize + pCompCol->offset), pCompCol->len); - ccol++; - dcol++; - } else if (pCompCol->colId > pDataCol->colId) { - // TODO: Fill NULL + if (pCompBlock->algorithm == TWO_STAGE_COMP) { + pHelper->compBuffer = trealloc(pHelper->compBuffer, pCompCol->len + COMP_OVERFLOW_BYTES); + if (pHelper->compBuffer == NULL) goto _err; + } + if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len, + pCompBlock->algorithm, pCompBlock->numOfPoints, pDataCols->maxPoints, + pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0) + goto _err; dcol++; - } else { ccol++; + } else if (pCompCol->colId < pDataCol->colId) { + ccol++; + } else { + // Set current column as NULL and forward + dataColSetNEleNull(pDataCol, pCompBlock->numOfPoints, pDataCols->maxPoints); + dcol++; } } - tfree(pCompData); return 0; _err: - tfree(pCompData); return -1; } @@ -634,36 +677,6 @@ _err: return -1; } -// static int tsdbCheckHelperCfg(SHelperCfg *pCfg) { -// // TODO -// return 0; -// } - -// static void tsdbClearHelperFile(SHelperFile *pHFile) { -// pHFile->fid = -1; -// if (pHFile->headF.fd > 0) { -// close(pHFile->headF.fd); -// pHFile->headF.fd = -1; -// } -// if (pHFile->dataF.fd > 0) { -// close(pHFile->dataF.fd); -// pHFile->dataF.fd = -1; -// } -// if (pHFile->lastF.fd > 0) { -// close(pHFile->lastF.fd); -// pHFile->lastF.fd = -1; -// } -// if (pHFile->nHeadF.fd > 0) { -// close(pHFile->nHeadF.fd); -// pHFile->nHeadF.fd = -1; -// } -// if (pHFile->nLastF.fd > 0) { -// close(pHFile->nLastF.fd); -// pHFile->nLastF.fd = -1; -// } - -// } - static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) { ASSERT(pHelper->files.lastF.fd > 0); struct stat st; @@ -677,81 +690,94 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints && rowsToWrite <= pHelper->config.maxRowsPerFileBlock); - SCompData *pCompData = NULL; + SCompData *pCompData = (SCompData *)(pHelper->blockBuffer); int64_t offset = 0; offset = lseek(pFile->fd, 0, SEEK_END); if (offset < 0) goto _err; - pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pDataCols->numOfCols + sizeof(TSCKSUM)); - if (pCompData == NULL) goto _err; - int nColsNotAllNull = 0; - int32_t toffset = 0; for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { SDataCol *pDataCol = pDataCols->cols + ncol; SCompCol *pCompCol = pCompData->cols + nColsNotAllNull; - if (0) { - // TODO: all data to commit are NULL + if (isNEleNull(pDataCol, rowsToWrite)) { + // all data to commit are NULL, just ignore it continue; } - // Compress the data here - { - // TODO - } - pCompCol->colId = pDataCol->colId; pCompCol->type = pDataCol->type; - pCompCol->len = TYPE_BYTES[pCompCol->type] * rowsToWrite; // TODO: change it - pCompCol->offset = toffset; nColsNotAllNull++; - - toffset += pCompCol->len; } ASSERT(nColsNotAllNull > 0 && nColsNotAllNull <= pDataCols->numOfCols); - pCompData->delimiter = TSDB_FILE_DELIMITER; - pCompData->uid = pHelper->tableInfo.uid; - pCompData->numOfCols = nColsNotAllNull; - - // Write SCompData + SCompCol part - size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull + sizeof(TSCKSUM); - taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize); - if (twrite(pFile->fd, (void *)pCompData, tsize) < tsize) goto _err; - // Write true data part - int nCompCol = 0; + // Compress the data if neccessary + int tcol = 0; + int32_t toffset = 0; + int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull + sizeof(TSCKSUM); + int32_t lsize = tsize; for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { - ASSERT(nCompCol < nColsNotAllNull); + if (tcol >= nColsNotAllNull) break; SDataCol *pDataCol = pDataCols->cols + ncol; - SCompCol *pCompCol = pCompData->cols + nCompCol; + SCompCol *pCompCol = pCompData->cols + tcol; - if (pDataCol->colId == pCompCol->colId) { - if (twrite(pFile->fd, (void *)(pDataCol->pData), pCompCol->len) < pCompCol->len) goto _err; - tsize += pCompCol->len; - nCompCol++; + if (pDataCol->colId != pCompCol->colId) continue; + void *tptr = (void *)((char *)pCompData + lsize); + + pCompCol->offset = toffset; + + int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite); + + if (pHelper->config.compress) { + if (pHelper->config.compress == TWO_STAGE_COMP) { + pHelper->compBuffer = trealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES); + if (pHelper->compBuffer == NULL) goto _err; + } + + pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))( + (char *)pDataCol->pData, tlen, rowsToWrite, tptr, tsizeof(pHelper->blockBuffer) - lsize, + pHelper->config.compress, pHelper->compBuffer, tsizeof(pHelper->compBuffer)); + } else { + pCompCol->len = tlen; + memcpy(tptr, pDataCol->pData, pCompCol->len); } + + // Add checksum + pCompCol->len += sizeof(TSCKSUM); + taosCalcChecksumAppend(0, (uint8_t *)tptr, pCompCol->len); + + toffset += pCompCol->len; + lsize += pCompCol->len; + tcol++; } + pCompData->delimiter = TSDB_FILE_DELIMITER; + pCompData->uid = pHelper->tableInfo.uid; + pCompData->numOfCols = nColsNotAllNull; + + taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize); + + // Write the whole block to file + if (twrite(pFile->fd, (void *)pCompData, lsize) < lsize) goto _err; + + // Update pCompBlock membership vairables pCompBlock->last = isLast; pCompBlock->offset = offset; pCompBlock->algorithm = pHelper->config.compress; pCompBlock->numOfPoints = rowsToWrite; pCompBlock->sversion = pHelper->tableInfo.sversion; - pCompBlock->len = (int32_t)tsize; + pCompBlock->len = (int32_t)lsize; pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0; pCompBlock->numOfCols = nColsNotAllNull; pCompBlock->keyFirst = dataColsKeyFirst(pDataCols); pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1); - tfree(pCompData); return 0; _err: - tfree(pCompData); return -1; } @@ -782,7 +808,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa TSKEY keyFirst = dataColsKeyFirst(pDataCols); SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; - ASSERT(blkIdx < pIdx->numOfSuperBlocks); + ASSERT(blkIdx < pIdx->numOfBlocks); // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; ASSERT(blockAtIdx(pHelper, blkIdx)->numOfSubBlocks >= 1); @@ -790,7 +816,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa // ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0); if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append - ASSERT(blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock && blkIdx == pIdx->numOfSuperBlocks-1); + ASSERT(blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock && blkIdx == pIdx->numOfBlocks-1); int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5; // TODO: make a interface rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfPoints), pDataCols->numOfPoints); @@ -961,7 +987,7 @@ static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) { static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; - ASSERT(blkIdx >= 0 && blkIdx <= pIdx->numOfSuperBlocks); + ASSERT(blkIdx >= 0 && blkIdx <= pIdx->numOfBlocks); ASSERT(pCompBlock->numOfSubBlocks == 1); // Adjust memory if no more room @@ -1004,7 +1030,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId ASSERT(pCompBlock->numOfSubBlocks == 0); SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; - ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfSuperBlocks); + ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks); SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS); @@ -1088,7 +1114,7 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; - ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfSuperBlocks); + ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks); SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; diff --git a/src/tsdb/tests/tsdbTests.cpp b/src/tsdb/tests/tsdbTests.cpp index 84711b07f866017dd518120976febe5daa68109e..c7ed6fcae11eb3c86c4728c4fd980a10638c16f7 100644 --- a/src/tsdb/tests/tsdbTests.cpp +++ b/src/tsdb/tests/tsdbTests.cpp @@ -27,7 +27,7 @@ typedef struct { static int insertData(SInsertInfo *pInfo) { SSubmitMsg *pMsg = - (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(pInfo->pSchema) * pInfo->rowsPerSubmit); + (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + dataRowMaxBytesFromSchema(pInfo->pSchema) * pInfo->rowsPerSubmit); if (pMsg == NULL) return -1; TSKEY start_time = pInfo->startTime; @@ -52,11 +52,12 @@ static int insertData(SInsertInfo *pInfo) { tdInitDataRow(row, pInfo->pSchema); for (int j = 0; j < schemaNCols(pInfo->pSchema); j++) { + STColumn *pTCol = schemaColAt(pInfo->pSchema, j); if (j == 0) { // Just for timestamp - tdAppendColVal(row, (void *)(&start_time), schemaColAt(pInfo->pSchema, j)); + tdAppendColVal(row, (void *)(&start_time), pTCol->type, pTCol->bytes, pTCol->offset); } else { // For int int val = 10; - tdAppendColVal(row, (void *)(&val), schemaColAt(pInfo->pSchema, j)); + tdAppendColVal(row, (void *)(&val), pTCol->type, pTCol->bytes, pTCol->offset); } } pBlock->len += dataRowLen(row); @@ -105,9 +106,9 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) { for (int i = 0; i < nCols; i++) { if (i == 0) { - tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1); + tdSchemaAddCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1); } else { - tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1); + tdSchemaAddCol(schema, TSDB_DATA_TYPE_INT, i, -1); } } @@ -149,9 +150,9 @@ TEST(TsdbTest, createRepo) { for (int i = 0; i < nCols; i++) { if (i == 0) { - tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1); + tdSchemaAddCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1); } else { - tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1); + tdSchemaAddCol(schema, TSDB_DATA_TYPE_INT, i, -1); } } @@ -244,7 +245,7 @@ TEST(TsdbTest, DISABLED_openRepo) { // tsdbLoadCompCols(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, (void *)pCompData); // STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid); - // SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(tsdbGetTableSchema(pRepo->tsdbMeta, pTable)), 5, 10); + // SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(tsdbGetTableSchema(pRepo->tsdbMeta, pTable)), 5); // tdInitDataCols(pDataCols, tsdbGetTableSchema(pRepo->tsdbMeta, pTable)); // tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, 1, pDataCols, pCompData); diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index a80e81f09fd48e45520bdb5ce2e9851561fd39e9..d4350fc8b250541aa8eed3d7686fd3363734442e 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -11,7 +11,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc) AUX_SOURCE_DIRECTORY(src SRC) ADD_LIBRARY(tutil ${SRC}) - TARGET_LINK_LIBRARIES(tutil pthread os m rt) + TARGET_LINK_LIBRARIES(tutil pthread os m rt lz4) FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/) IF (ICONV_INCLUDE_EXIST) ADD_DEFINITIONS(-DUSE_LIBICONV) @@ -68,7 +68,7 @@ ELSEIF (TD_WINDOWS_64) LIST(APPEND SRC ./src/tutil.c) LIST(APPEND SRC ./src/version.c) ADD_LIBRARY(tutil ${SRC}) - TARGET_LINK_LIBRARIES(tutil iconv regex pthread os winmm IPHLPAPI ws2_32) + TARGET_LINK_LIBRARIES(tutil iconv regex pthread os winmm IPHLPAPI ws2_32 lz4) ELSEIF(TD_DARWIN_64) ADD_DEFINITIONS(-DUSE_LIBICONV) LIST(APPEND SRC ./src/hash.c) @@ -105,7 +105,7 @@ ELSEIF(TD_DARWIN_64) LIST(APPEND SRC ./src/version.c) LIST(APPEND SRC ./src/hash.c) ADD_LIBRARY(tutil ${SRC}) - TARGET_LINK_LIBRARIES(tutil iconv pthread os) + TARGET_LINK_LIBRARIES(tutil iconv pthread os lz4) ENDIF() # TARGET_LINK_LIBRARIES(tutil mstorage) diff --git a/src/util/inc/tscompression.h b/src/util/inc/tscompression.h index 55e282296fac6d179580c53f5384136f2ef290c5..9398ff82439445692b2e1c90041fd0595bbc27c7 100644 --- a/src/util/inc/tscompression.h +++ b/src/util/inc/tscompression.h @@ -21,7 +21,9 @@ extern "C" { #endif #include "taosdef.h" +#include "tutil.h" +#define COMP_OVERFLOW_BYTES 2 #define BITS_PER_BYTE 8 // Masks #define INT64MASK(_x) ((1ul << _x) - 1) @@ -32,43 +34,220 @@ extern "C" { #define ONE_STAGE_COMP 1 #define TWO_STAGE_COMP 2 -int tsCompressTinyint(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorithm, - char* const buffer, int bufferSize); -int tsCompressSmallint(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, - char* const buffer, int bufferSize); -int tsCompressInt(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, - char* const buffer, int bufferSize); -int tsCompressBigint(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, - char* const buffer, int bufferSize); -int tsCompressBool(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorithm, - char* const buffer, int bufferSize); -int tsCompressString(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, - char* const buffer, int bufferSize); -int tsCompressFloat(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, - char* const buffer, int bufferSize); -int tsCompressDouble(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, - char* const buffer, int bufferSize); -int tsCompressTimestamp(const char* const input, int inputSize, const int nelements, char* const output, int outputSize, char algorith, - char* const buffer, int bufferSize); - -int tsDecompressTinyint(const char* const input, int compressedSize, const int nelements, char* const output, - int outputSize, char algorithm, char* const buffer, int bufferSize); -int tsDecompressSmallint(const char* const input, int compressedSize, const int nelements, char* const output, - int outputSize, char algorithm, char* const buffer, int bufferSize); -int tsDecompressInt(const char* const input, int compressedSize, const int nelements, char* const output, int outputSize, - char algorithm, char* const buffer, int bufferSize); -int tsDecompressBigint(const char* const input, int compressedSize, const int nelements, char* const output, - int outputSize, char algorithm, char* const buffer, int bufferSize); -int tsDecompressBool(const char* const input, int compressedSize, const int nelements, char* const output, - int outputSize, char algorithm, char* const buffer, int bufferSize); -int tsDecompressString(const char* const input, int compressedSize, const int nelements, char* const output, - int outputSize, char algorithm, char* const buffer, int bufferSize); -int tsDecompressFloat(const char* const input, int compressedSize, const int nelements, char* const output, - int outputSize, char algorithm, char* const buffer, int bufferSize); -int tsDecompressDouble(const char* const input, int compressedSize, const int nelements, char* const output, - int outputSize, char algorith, char* const buffer, int bufferSize); -int tsDecompressTimestamp(const char* const input, int compressedSize, const int nelements, char* const output, - int outputSize, char algorithm, char* const buffer, int bufferSize); +extern int tsCompressINTImp(const char *const input, const int nelements, char *const output, const char type); +extern int tsDecompressINTImp(const char *const input, const int nelements, char *const output, const char type); +extern int tsCompressBoolImp(const char *const input, const int nelements, char *const output); +extern int tsDecompressBoolImp(const char *const input, const int nelements, char *const output); +extern int tsCompressStringImp(const char *const input, int inputSize, char *const output, int outputSize); +extern int tsDecompressStringImp(const char *const input, int compressedSize, char *const output, int outputSize); +extern int tsCompressTimestampImp(const char *const input, const int nelements, char *const output); +extern int tsDecompressTimestampImp(const char *const input, const int nelements, char *const output); +extern int tsCompressDoubleImp(const char *const input, const int nelements, char *const output); +extern int tsDecompressDoubleImp(const char *const input, const int nelements, char *const output); +extern int tsCompressFloatImp(const char *const input, const int nelements, char *const output); +extern int tsDecompressFloatImp(const char *const input, const int nelements, char *const output); + +static FORCE_INLINE int tsCompressTinyint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm, + char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT); + } else if (algorithm == TWO_STAGE_COMP) { + int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_TINYINT); + return tsCompressStringImp(buffer, len, output, outputSize); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsDecompressTinyint(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT); + } else if (algorithm == TWO_STAGE_COMP) { + tsDecompressStringImp(input, compressedSize, buffer, bufferSize); + return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_TINYINT); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsCompressSmallint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm, + char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT); + } else if (algorithm == TWO_STAGE_COMP) { + int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_SMALLINT); + return tsCompressStringImp(buffer, len, output, outputSize); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsDecompressSmallint(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT); + } else if (algorithm == TWO_STAGE_COMP) { + tsDecompressStringImp(input, compressedSize, buffer, bufferSize); + return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_SMALLINT); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsCompressInt(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm, + char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT); + } else if (algorithm == TWO_STAGE_COMP) { + int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_INT); + return tsCompressStringImp(buffer, len, output, outputSize); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsDecompressInt(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT); + } else if (algorithm == TWO_STAGE_COMP) { + tsDecompressStringImp(input, compressedSize, buffer, bufferSize); + return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_INT); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsCompressBigint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, + char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT); + } else if (algorithm == TWO_STAGE_COMP) { + int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_BIGINT); + return tsCompressStringImp(buffer, len, output, outputSize); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsDecompressBigint(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT); + } else if (algorithm == TWO_STAGE_COMP) { + tsDecompressStringImp(input, compressedSize, buffer, bufferSize); + return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_BIGINT); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsCompressBool(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, + char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsCompressBoolImp(input, nelements, output); + } else if (algorithm == TWO_STAGE_COMP) { + int len = tsCompressBoolImp(input, nelements, buffer); + return tsCompressStringImp(buffer, len, output, outputSize); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsDecompressBool(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsDecompressBoolImp(input, nelements, output); + } else if (algorithm == TWO_STAGE_COMP) { + tsDecompressStringImp(input, compressedSize, buffer, bufferSize); + return tsDecompressBoolImp(buffer, nelements, output); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsCompressString(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, + char algorithm, char *const buffer, int bufferSize) { + return tsCompressStringImp(input, inputSize, output, outputSize); +} + +static FORCE_INLINE int tsDecompressString(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + return tsDecompressStringImp(input, compressedSize, output, outputSize); +} + +static FORCE_INLINE int tsCompressFloat(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, + char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsCompressFloatImp(input, nelements, output); + } else if (algorithm == TWO_STAGE_COMP) { + int len = tsCompressFloatImp(input, nelements, buffer); + return tsCompressStringImp(buffer, len, output, outputSize); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsDecompressFloat(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsDecompressFloatImp(input, nelements, output); + } else if (algorithm == TWO_STAGE_COMP) { + tsDecompressStringImp(input, compressedSize, buffer, bufferSize); + return tsDecompressFloatImp(buffer, nelements, output); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsCompressDouble(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, + char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsCompressDoubleImp(input, nelements, output); + } else if (algorithm == TWO_STAGE_COMP) { + int len = tsCompressDoubleImp(input, nelements, buffer); + return tsCompressStringImp(buffer, len, output, outputSize); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsDecompressDouble(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsDecompressDoubleImp(input, nelements, output); + } else if (algorithm == TWO_STAGE_COMP) { + tsDecompressStringImp(input, compressedSize, buffer, bufferSize); + return tsDecompressDoubleImp(buffer, nelements, output); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsCompressTimestamp(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, + char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsCompressTimestampImp(input, nelements, output); + } else if (algorithm == TWO_STAGE_COMP) { + int len = tsCompressTimestampImp(input, nelements, buffer); + return tsCompressStringImp(buffer, len, output, outputSize); + } else { + assert(0); + } +} + +static FORCE_INLINE int tsDecompressTimestamp(const char *const input, int compressedSize, const int nelements, char *const output, + int outputSize, char algorithm, char *const buffer, int bufferSize) { + if (algorithm == ONE_STAGE_COMP) { + return tsDecompressTimestampImp(input, nelements, output); + } else if (algorithm == TWO_STAGE_COMP) { + tsDecompressStringImp(input, compressedSize, buffer, bufferSize); + return tsDecompressTimestampImp(buffer, nelements, output); + } else { + assert(0); + } +} #ifdef __cplusplus } diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h index cdcc6391514cde449059d3bf0d0ba88d9e24285e..55f4496755673d40cd3e4141e59ffbbb1dddf07f 100644 --- a/src/util/inc/tutil.h +++ b/src/util/inc/tutil.h @@ -44,7 +44,10 @@ extern "C" { #define tclose(x) taosCloseSocket(x) -#ifdef ASSERTION +// Pointer p drift right by b bytes +#define POINTER_DRIFT(p, b) ((void *)((char *)(p) + (b))) + +#ifndef NDEBUG #define ASSERT(x) assert(x) #else #define ASSERT(x) diff --git a/src/util/src/tcompression.c b/src/util/src/tcompression.c index 24a53b3fe49a555cf24c605cda4e7c4041086ce8..e3b3d65052e4d34c9c424b565d084ae123efcac0 100644 --- a/src/util/src/tcompression.c +++ b/src/util/src/tcompression.c @@ -56,223 +56,6 @@ const int TEST_NUMBER = 1; #define is_bigendian() ((*(char *)&TEST_NUMBER) == 0) #define SIMPLE8B_MAX_INT64 ((uint64_t)2305843009213693951L) -// Function declarations -int tsCompressINTImp(const char *const input, const int nelements, char *const output, const char type); -int tsDecompressINTImp(const char *const input, const int nelements, char *const output, const char type); -int tsCompressBoolImp(const char *const input, const int nelements, char *const output); -int tsDecompressBoolImp(const char *const input, const int nelements, char *const output); -int tsCompressStringImp(const char *const input, int inputSize, char *const output, int outputSize); -int tsDecompressStringImp(const char *const input, int compressedSize, char *const output, int outputSize); -int tsCompressTimestampImp(const char *const input, const int nelements, char *const output); -int tsDecompressTimestampImp(const char *const input, const int nelements, char *const output); -int tsCompressDoubleImp(const char *const input, const int nelements, char *const output); -int tsDecompressDoubleImp(const char *const input, const int nelements, char *const output); -int tsCompressFloatImp(const char *const input, const int nelements, char *const output); -int tsDecompressFloatImp(const char *const input, const int nelements, char *const output); - -/* ----------------------------------------------Compression function used by - * others ---------------------------------------------- */ -int tsCompressTinyint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm, - char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT); - } else if (algorithm == TWO_STAGE_COMP) { - int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_TINYINT); - return tsCompressStringImp(buffer, len, output, outputSize); - } else { - assert(0); - } -} - -int tsDecompressTinyint(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_TINYINT); - } else if (algorithm == TWO_STAGE_COMP) { - tsDecompressStringImp(input, compressedSize, buffer, bufferSize); - return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_TINYINT); - } else { - assert(0); - } -} - -int tsCompressSmallint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm, - char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT); - } else if (algorithm == TWO_STAGE_COMP) { - int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_SMALLINT); - return tsCompressStringImp(buffer, len, output, outputSize); - } else { - assert(0); - } -} - -int tsDecompressSmallint(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_SMALLINT); - } else if (algorithm == TWO_STAGE_COMP) { - tsDecompressStringImp(input, compressedSize, buffer, bufferSize); - return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_SMALLINT); - } else { - assert(0); - } -} - -int tsCompressInt(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm, - char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT); - } else if (algorithm == TWO_STAGE_COMP) { - int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_INT); - return tsCompressStringImp(buffer, len, output, outputSize); - } else { - assert(0); - } -} - -int tsDecompressInt(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_INT); - } else if (algorithm == TWO_STAGE_COMP) { - tsDecompressStringImp(input, compressedSize, buffer, bufferSize); - return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_INT); - } else { - assert(0); - } -} - -int tsCompressBigint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, - char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsCompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT); - } else if (algorithm == TWO_STAGE_COMP) { - int len = tsCompressINTImp(input, nelements, buffer, TSDB_DATA_TYPE_BIGINT); - return tsCompressStringImp(buffer, len, output, outputSize); - } else { - assert(0); - } -} - -int tsDecompressBigint(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsDecompressINTImp(input, nelements, output, TSDB_DATA_TYPE_BIGINT); - } else if (algorithm == TWO_STAGE_COMP) { - tsDecompressStringImp(input, compressedSize, buffer, bufferSize); - return tsDecompressINTImp(buffer, nelements, output, TSDB_DATA_TYPE_BIGINT); - } else { - assert(0); - } -} - -int tsCompressBool(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, - char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsCompressBoolImp(input, nelements, output); - } else if (algorithm == TWO_STAGE_COMP) { - int len = tsCompressBoolImp(input, nelements, buffer); - return tsCompressStringImp(buffer, len, output, outputSize); - } else { - assert(0); - } -} - -int tsDecompressBool(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsDecompressBoolImp(input, nelements, output); - } else if (algorithm == TWO_STAGE_COMP) { - tsDecompressStringImp(input, compressedSize, buffer, bufferSize); - return tsDecompressBoolImp(buffer, nelements, output); - } else { - assert(0); - } -} - -int tsCompressString(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, - char algorithm, char *const buffer, int bufferSize) { - return tsCompressStringImp(input, inputSize, output, outputSize); -} - -int tsDecompressString(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - return tsDecompressStringImp(input, compressedSize, output, outputSize); -} - -int tsCompressFloat(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, - char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsCompressFloatImp(input, nelements, output); - } else if (algorithm == TWO_STAGE_COMP) { - int len = tsCompressFloatImp(input, nelements, buffer); - return tsCompressStringImp(buffer, len, output, outputSize); - } else { - assert(0); - } -} - -int tsDecompressFloat(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsDecompressFloatImp(input, nelements, output); - } else if (algorithm == TWO_STAGE_COMP) { - tsDecompressStringImp(input, compressedSize, buffer, bufferSize); - return tsDecompressFloatImp(buffer, nelements, output); - } else { - assert(0); - } -} -int tsCompressDouble(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, - char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsCompressDoubleImp(input, nelements, output); - } else if (algorithm == TWO_STAGE_COMP) { - int len = tsCompressDoubleImp(input, nelements, buffer); - return tsCompressStringImp(buffer, len, output, outputSize); - } else { - assert(0); - } -} - -int tsDecompressDouble(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsDecompressDoubleImp(input, nelements, output); - } else if (algorithm == TWO_STAGE_COMP) { - tsDecompressStringImp(input, compressedSize, buffer, bufferSize); - return tsDecompressDoubleImp(buffer, nelements, output); - } else { - assert(0); - } -} - -int tsCompressTimestamp(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, - char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsCompressTimestampImp(input, nelements, output); - } else if (algorithm == TWO_STAGE_COMP) { - int len = tsCompressTimestampImp(input, nelements, buffer); - return tsCompressStringImp(buffer, len, output, outputSize); - } else { - assert(0); - } -} - -int tsDecompressTimestamp(const char *const input, int compressedSize, const int nelements, char *const output, - int outputSize, char algorithm, char *const buffer, int bufferSize) { - if (algorithm == ONE_STAGE_COMP) { - return tsDecompressTimestampImp(input, nelements, output); - } else if (algorithm == TWO_STAGE_COMP) { - tsDecompressStringImp(input, compressedSize, buffer, bufferSize); - return tsDecompressTimestampImp(buffer, nelements, output); - } else { - assert(0); - } -} - bool safeInt64Add(int64_t a, int64_t b) { if ((a > 0 && b > INT64_MAX - a) || (a < 0 && b < INT64_MIN - a)) return false; return true; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 2e31b72e8b7eb34f8b98e24d87c5556181a20057..1302ceaff4bd814e163b6048a50b98d90d7f8754 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -96,14 +96,16 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { } STsdbCfg tsdbCfg = {0}; - tsdbCfg.precision = pVnodeCfg->cfg.precision; - tsdbCfg.compression = pVnodeCfg->cfg.compression;; tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId; + tsdbCfg.cacheBlockSize = pVnodeCfg->cfg.cacheBlockSize; + tsdbCfg.totalBlocks = pVnodeCfg->cfg.totalBlocks; tsdbCfg.maxTables = pVnodeCfg->cfg.maxTables; tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile; + tsdbCfg.keep = pVnodeCfg->cfg.daysToKeep; tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock; tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock; - tsdbCfg.keep = pVnodeCfg->cfg.daysToKeep; + tsdbCfg.precision = pVnodeCfg->cfg.precision; + tsdbCfg.compression = pVnodeCfg->cfg.compression;; char tsdbDir[TSDB_FILENAME_LEN] = {0}; sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index cfcc9cd84716b17b2f2cacbae86c29916253159f..b1a49e6e6535d52eb70d92c348ef1d87642243b1 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -123,7 +123,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe STSchema *pDestSchema = tdNewSchema(numOfColumns); for (int i = 0; i < numOfColumns; i++) { - tdSchemaAppendCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); + tdSchemaAddCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); } tsdbTableSetSchema(&tCfg, pDestSchema, false); tsdbTableSetName(&tCfg, pTable->tableId, false); @@ -131,7 +131,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe if (numOfTags != 0) { STSchema *pDestTagSchema = tdNewSchema(numOfTags); for (int i = numOfColumns; i < totalCols; i++) { - tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); + tdSchemaAddCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); } tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false); tsdbTableSetSName(&tCfg, pTable->superTableId, false); @@ -141,7 +141,8 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); for (int i = 0; i < numOfTags; i++) { - tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i); + STColumn *pTCol = schemaColAt(pDestSchema, i); + tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset); accumBytes += htons(pSchema[i + numOfColumns].bytes); } tsdbTableSetTagValue(&tCfg, dataRow, false); @@ -188,14 +189,14 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet STSchema *pDestSchema = tdNewSchema(numOfColumns); for (int i = 0; i < numOfColumns; i++) { - tdSchemaAppendCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); + tdSchemaAddCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); } tsdbTableSetSchema(&tCfg, pDestSchema, false); if (numOfTags != 0) { STSchema *pDestTagSchema = tdNewSchema(numOfTags); for (int i = numOfColumns; i < totalCols; i++) { - tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); + tdSchemaAddCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); } tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false); @@ -204,7 +205,8 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); for (int i = 0; i < numOfTags; i++) { - tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i); + STColumn *pTCol = schemaColAt(pDestTagSchema, i); + tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset); accumBytes += htons(pSchema[i + numOfColumns].bytes); } tsdbTableSetTagValue(&tCfg, dataRow, false);