diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index f77d3c6dc7e3d77bd7e175672fde774831907ecc..4b8940536fcc3794f162a3494cff407017fe2a72 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -67,6 +67,13 @@ int tdGetSchemaEncodeSize(STSchema *pSchema); void * tdEncodeSchema(void *dst, STSchema *pSchema); STSchema *tdDecodeSchema(void **psrc); +// ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR +typedef int32_t VarDataOffsetT; +typedef int16_t VarDataLenT; +#define varDataLen(v) ((VarDataLenT *)(v))[0] +#define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v)) +#define varDataVal(v) ((void *)((char *)v + sizeof(VarDataLenT))) + // ----------------- Data row structure /* A data row, the format is like below: @@ -111,18 +118,25 @@ static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t o // ----------------- Data column structure typedef struct SDataCol { - int8_t type; - int16_t colId; - int bytes; - int len; - int offset; - void * pData; // Original data + int8_t type; // column type + int16_t colId; // column ID + int bytes; // column data bytes defined + int offset; // data offset in a SDataRow + int spaceSize; // Total space size for this column + int len; // column data length + VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column + void * pData; // Actual data pointer } SDataCol; +static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } + +void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints); void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints); +void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfPoints); +void dataColSetOffset(SDataCol *pCol, int nEle); + bool isNEleNull(SDataCol *pCol, int nEle); void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints); -void dataColSetOffset(SDataCol *pCol, int nEle, int maxPoints); // Get the data pointer from a column-wised data static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) { @@ -130,7 +144,7 @@ static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) { { case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: - return (void *)((char *)(pCol->pData) + ((int32_t *)(pCol->pData))[row]); + return (void *)((char *)(pCol->pData) + pCol->dataOff[row]); break; default: @@ -139,20 +153,17 @@ static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) { } } -static FORCE_INLINE void dataColGetNEleStartAndLen(SDataCol *pDataCol, int rows, void **pStart, int32_t *len, int32_t maxPoints) { +static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) { void *ptr = NULL; switch (pDataCol->type) { case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: ptr = tdGetColDataOfRow(pDataCol, rows - 1); - *pStart = (char *)(pDataCol->pData) + sizeof(int32_t) * maxPoints; - *len = (char *)ptr - (char *)(*pStart) + sizeof(int16_t) + *(int16_t *)ptr; + return ((VarDataOffsetT *)(pDataCol->pData))[rows-1] + varDataTLen(ptr); break; default: - *pStart = pDataCol->pData; - *len = TYPE_BYTES[pDataCol->type] * rows; - break; + return TYPE_BYTES[pDataCol->type] * rows; } } @@ -161,6 +172,7 @@ typedef struct { int maxRowSize; int maxCols; // max number of columns int maxPoints; // max number of points + int bufSize; int numOfPoints; int numOfCols; // Total number of cols diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index c5965347fc00c508eabe6c9f389dc63b7e370043..3034532d20700dab721d10fedff146b315fbebb1 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -213,28 +213,66 @@ SDataRow tdDataRowDup(SDataRow row) { return trow; } +void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) { + pDataCol->type = colType(pCol); + pDataCol->colId = colColId(pCol); + pDataCol->bytes = colBytes(pCol); + pDataCol->offset = colOffset(pCol); + + pDataCol->len = 0; + if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { + pDataCol->spaceSize = (sizeof(int32_t) + sizeof(int16_t) + pDataCol->bytes) * maxPoints; + pDataCol->dataOff = (VarDataOffsetT *)(*pBuf); + pDataCol->pData = (void *)((char *)(*pBuf) + sizeof(int32_t) * maxPoints); + } else { + pDataCol->spaceSize = pDataCol->bytes * maxPoints; + pDataCol->dataOff = NULL; + pDataCol->pData = *pBuf; + } + + *pBuf = (void *)((char *)(*pBuf) + pDataCol->spaceSize); +} + void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints) { ASSERT(pCol != NULL && value != NULL); switch (pCol->type) { case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: - if (pCol->len == 0) pCol->len = sizeof(int32_t) * maxPoints; // set offset ((int32_t *)(pCol->pData))[numOfPoints] = pCol->len; // Copy data - memcpy(pCol->pData + pCol->len, value, sizeof(int16_t) + *(int16_t *)value); + memcpy((void *)((char *)pCol->pData + pCol->len), value, varDataTLen(value)); // Update the length - pCol->len += (sizeof(int16_t) + *(int16_t *)value); + pCol->len += varDataTLen(value); break; default: ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfPoints); - memcpy(pCol->pData + pCol->len, value, pCol->bytes); + memcpy((void *)((char *)pCol->pData + pCol->len), value, pCol->bytes); pCol->len += pCol->bytes; break; } } +void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfPoints) { + int pointsLeft = numOfPoints - pointsToPop; + + ASSERT(pointsLeft > 0); + + if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) { + ASSERT(pCol->len > 0); + VarDataOffsetT toffset = ((VarDataOffsetT *)(pCol->pData))[pointsToPop]; + pCol->len = pCol->len - toffset; + ASSERT(pCol->len > 0); + memmove(pCol->pData, (void *)((char *)(pCol->pData) + toffset), pCol->len); + dataColSetOffset(pCol, pointsLeft); + } else { + ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfPoints); + pCol->len = TYPE_BYTES[pCol->type] * pointsLeft; + memmove(pCol->pData, (void *)((char *)(pCol->pData) + TYPE_BYTES[pCol->type] * pointsToPop), pCol->len); + } +} + bool isNEleNull(SDataCol *pCol, int nEle) { void *ptr = NULL; switch (pCol->type) { @@ -242,8 +280,7 @@ bool isNEleNull(SDataCol *pCol, int nEle) { case TSDB_DATA_TYPE_NCHAR: for (int i = 0; i < nEle; i++) { ptr = tdGetColDataOfRow(pCol, i); - ptr = (void *)((char *)ptr + sizeof(int16_t)); - if (!isNull(ptr, pCol->type)) return false; + if (!isNull(varDataVal(ptr), pCol->type)) return false; } return true; default: @@ -259,16 +296,15 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) { switch (pCol->type) { case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: - pCol->len = sizeof(int32_t) * maxPoints; + pCol->len = 0; for (int i = 0; i < nEle; i++) { - ((int32_t *)(pCol->pData))[i] = pCol->len; - - ptr = ((char *)pCol->pData) + pCol->len; - *(int16_t *)ptr = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE; - setNull(ptr + sizeof(int16_t), pCol->type, pCol->bytes); - - pCol->len += (sizeof(int16_t) + ((int16_t *)ptr)[0]); + pCol->dataOff[i] = pCol->len; + ptr = (char *)pCol->pData + pCol->len; + varDataLen(ptr) = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE; + setNull(ptr + sizeof(VarDataLenT), pCol->type, pCol->bytes); + pCol->len += varDataTLen(ptr); } + break; default: setNullN(pCol->pData, pCol->type, pCol->bytes, nEle); @@ -277,13 +313,16 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) { } } -void dataColSetOffset(SDataCol *pCol, int nEle, int maxPoints) { - ASSERT(nEle <= maxPoints && ((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR))); +void dataColSetOffset(SDataCol *pCol, int nEle) { + ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR))); + + char *tptr = (char *)(pCol->pData); - char *tptr = (char *)(pCol->pData) + sizeof(int32_t) * maxPoints; + VarDataOffsetT offset = 0; for (int i = 0; i < nEle; i++) { - ((int32_t *)(pCol->pData))[i] = tptr - (char *)(pCol->pData); - tptr = tptr + *(int16_t *)tptr + sizeof(int16_t); + ((VarDataOffsetT *)(pCol->pData))[i] = offset; + offset += varDataTLen(tptr); + tptr = tptr + varDataTLen(tptr); } } @@ -294,8 +333,9 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { pCols->maxRowSize = maxRowSize; pCols->maxCols = maxCols; pCols->maxPoints = maxRows; + pCols->bufSize = maxRowSize * maxRows; - pCols->buf = malloc(maxRowSize * maxRows); + pCols->buf = malloc(pCols->bufSize); if (pCols->buf == NULL) { free(pCols); return NULL; @@ -311,16 +351,8 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { void *ptr = pCols->buf; for (int i = 0; i < schemaNCols(pSchema); i++) { - pCols->cols[i].type = colType(schemaColAt(pSchema, i)); - pCols->cols[i].bytes = colBytes(schemaColAt(pSchema, i)); - pCols->cols[i].offset = colOffset(schemaColAt(pSchema, i)) + TD_DATA_ROW_HEAD_SIZE; - pCols->cols[i].colId = colColId(schemaColAt(pSchema, i)); - pCols->cols[i].pData = ptr; - - ptr = ptr + colBytes(schemaColAt(pSchema, i)) * pCols->maxPoints; - if (colType(schemaColAt(pSchema, i)) == TSDB_DATA_TYPE_BINARY || - colType(schemaColAt(pSchema, i)) == TSDB_DATA_TYPE_NCHAR) - ptr = ptr + (sizeof(int32_t) + sizeof(int16_t)) * pCols->maxPoints; + dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints); + ASSERT((char *)ptr - (char *)pCols <= pCols->bufSize); } } @@ -332,8 +364,7 @@ void tdFreeDataCols(SDataCols *pCols) { } SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { - SDataCols *pRet = - tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints); + SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints); if (pRet == NULL) return NULL; pRet->numOfCols = pDataCols->numOfCols; @@ -344,11 +375,24 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { pRet->cols[i].type = pDataCols->cols[i].type; pRet->cols[i].colId = pDataCols->cols[i].colId; pRet->cols[i].bytes = pDataCols->cols[i].bytes; - pRet->cols[i].len = pDataCols->cols[i].len; pRet->cols[i].offset = pDataCols->cols[i].offset; + + pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize; pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf))); - if (keepData) memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); + if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) { + ASSERT(pDataCols->cols[i].dataOff != NULL); + pRet->cols[i].dataOff = + (int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf))); + } + + if (keepData) { + pRet->cols[i].len = pDataCols->cols[i].len; + memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); + if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) { + memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(int32_t) * pDataCols->maxPoints); + } + } } return pRet; @@ -357,7 +401,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { void tdResetDataCols(SDataCols *pCols) { pCols->numOfPoints = 0; for (int i = 0; i < pCols->maxCols; i++) { - pCols->cols[i].len = 0; + dataColReset(pCols->cols + i); } } @@ -381,37 +425,9 @@ void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { return; } - int32_t offsetSize = sizeof(int32_t) * pCols->maxPoints; - int32_t toffset = 0; - int tlen = 0; for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { SDataCol *pCol = pCols->cols + iCol; - ASSERT(pCol->len > 0); - - switch (pCol->type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - // memmove offset part - memmove(pCol->pData, pCol->pData + sizeof(int32_t) * pointsToPop, sizeof(int32_t) * pointsLeft); - // memmove string part - toffset = *(int32_t *)pCol->pData; - ASSERT(toffset >= offsetSize); - tlen = pCol->len - toffset; - memmove(pCol->pData + offsetSize, pCol->pData + toffset, tlen); - // update offset part - for (int i = 0; i < pointsLeft; i++) { - ((int32_t *)(pCol->pData))[i] -= (toffset - offsetSize); - } - // Update length - pCol->len = offsetSize + tlen; - break; - default: - ASSERT(pCol->len == TYPE_BYTES[pCol->type] * pCols->numOfPoints); - pCol->len = TYPE_BYTES[pCol->type] * pointsLeft; - memmove((void *)(pCol->pData), (void *)((char *)(pCol->pData) + TYPE_BYTES[pCol->type] * pointsToPop), - pCol->len); - break; - } + dataColPopPoints(pCol, pointsToPop, pCols->numOfPoints); } pCols->numOfPoints = pointsLeft; } diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index d1ee5113fd3a8b94900087904663daa7b0ff2f4b..61c463801cff570dd8091241e327551267d9e7ea 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -578,7 +578,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 INT32_MAX, comp, buffer, bufferSize); if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { pDataCol->len += (sizeof(int32_t) * maxPoints); - dataColSetOffset(pDataCol, numOfPoints, maxPoints); + dataColSetOffset(pDataCol, numOfPoints); } } else { // No need to decompress, just memcpy it @@ -588,7 +588,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 pDataCol->len = sizeof(int32_t) * maxPoints; memcpy((char *)pDataCol->pData + pDataCol->len, content, len - sizeof(TSCKSUM)); pDataCol->len += (len - sizeof(TSCKSUM)); - dataColSetOffset(pDataCol, numOfPoints, maxPoints); + dataColSetOffset(pDataCol, numOfPoints); break; default: @@ -736,12 +736,8 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa pCompCol->offset = toffset; - void *pStart = NULL; - int32_t tlen = 0; - - dataColGetNEleStartAndLen(pDataCol, rowsToWrite, &pStart, &tlen, pDataCols->maxPoints); + int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite); - // TODO: compresee the data if (pHelper->config.compress) { if (pHelper->config.compress == TWO_STAGE_COMP) { pHelper->compBuffer = trealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES); @@ -749,11 +745,11 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa } pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))( - (char *)pStart, tlen, rowsToWrite, tptr, tsizeof(pHelper->blockBuffer) - lsize, pHelper->config.compress, - pHelper->compBuffer, tsizeof(pHelper->compBuffer)); + (char *)pDataCol->pData, tlen, rowsToWrite, tptr, tsizeof(pHelper->blockBuffer) - lsize, + pHelper->config.compress, pHelper->compBuffer, tsizeof(pHelper->compBuffer)); } else { pCompCol->len = tlen; - memcpy(tptr, pStart, pCompCol->len); + memcpy(tptr, pDataCol->pData, pCompCol->len); } // Add checksum