提交 d07f73d5 编写于 作者: H hzcheng

TD-166

上级 dcb64f4e
......@@ -67,6 +67,13 @@ int tdGetSchemaEncodeSize(STSchema *pSchema);
void * tdEncodeSchema(void *dst, STSchema *pSchema);
STSchema *tdDecodeSchema(void **psrc);
// ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR
typedef int32_t VarDataOffsetT;
typedef int16_t VarDataLenT;
#define varDataLen(v) ((VarDataLenT *)(v))[0]
#define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v))
#define varDataVal(v) ((void *)((char *)v + sizeof(VarDataLenT)))
// ----------------- Data row structure
/* A data row, the format is like below:
......@@ -111,18 +118,25 @@ static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t o
// ----------------- Data column structure
typedef struct SDataCol {
int8_t type;
int16_t colId;
int bytes;
int len;
int offset;
void * pData; // Original data
int8_t type; // column type
int16_t colId; // column ID
int bytes; // column data bytes defined
int offset; // data offset in a SDataRow
int spaceSize; // Total space size for this column
int len; // column data length
VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column
void * pData; // Actual data pointer
} SDataCol;
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints);
void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints);
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfPoints);
void dataColSetOffset(SDataCol *pCol, int nEle);
bool isNEleNull(SDataCol *pCol, int nEle);
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints);
void dataColSetOffset(SDataCol *pCol, int nEle, int maxPoints);
// Get the data pointer from a column-wised data
static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) {
......@@ -130,7 +144,7 @@ static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) {
{
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
return (void *)((char *)(pCol->pData) + ((int32_t *)(pCol->pData))[row]);
return (void *)((char *)(pCol->pData) + pCol->dataOff[row]);
break;
default:
......@@ -139,20 +153,17 @@ static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) {
}
}
static FORCE_INLINE void dataColGetNEleStartAndLen(SDataCol *pDataCol, int rows, void **pStart, int32_t *len, int32_t maxPoints) {
static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) {
void *ptr = NULL;
switch (pDataCol->type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
ptr = tdGetColDataOfRow(pDataCol, rows - 1);
*pStart = (char *)(pDataCol->pData) + sizeof(int32_t) * maxPoints;
*len = (char *)ptr - (char *)(*pStart) + sizeof(int16_t) + *(int16_t *)ptr;
return ((VarDataOffsetT *)(pDataCol->pData))[rows-1] + varDataTLen(ptr);
break;
default:
*pStart = pDataCol->pData;
*len = TYPE_BYTES[pDataCol->type] * rows;
break;
return TYPE_BYTES[pDataCol->type] * rows;
}
}
......@@ -161,6 +172,7 @@ typedef struct {
int maxRowSize;
int maxCols; // max number of columns
int maxPoints; // max number of points
int bufSize;
int numOfPoints;
int numOfCols; // Total number of cols
......
......@@ -213,28 +213,66 @@ SDataRow tdDataRowDup(SDataRow row) {
return trow;
}
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) {
pDataCol->type = colType(pCol);
pDataCol->colId = colColId(pCol);
pDataCol->bytes = colBytes(pCol);
pDataCol->offset = colOffset(pCol);
pDataCol->len = 0;
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
pDataCol->spaceSize = (sizeof(int32_t) + sizeof(int16_t) + pDataCol->bytes) * maxPoints;
pDataCol->dataOff = (VarDataOffsetT *)(*pBuf);
pDataCol->pData = (void *)((char *)(*pBuf) + sizeof(int32_t) * maxPoints);
} else {
pDataCol->spaceSize = pDataCol->bytes * maxPoints;
pDataCol->dataOff = NULL;
pDataCol->pData = *pBuf;
}
*pBuf = (void *)((char *)(*pBuf) + pDataCol->spaceSize);
}
void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints) {
ASSERT(pCol != NULL && value != NULL);
switch (pCol->type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
if (pCol->len == 0) pCol->len = sizeof(int32_t) * maxPoints;
// set offset
((int32_t *)(pCol->pData))[numOfPoints] = pCol->len;
// Copy data
memcpy(pCol->pData + pCol->len, value, sizeof(int16_t) + *(int16_t *)value);
memcpy((void *)((char *)pCol->pData + pCol->len), value, varDataTLen(value));
// Update the length
pCol->len += (sizeof(int16_t) + *(int16_t *)value);
pCol->len += varDataTLen(value);
break;
default:
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfPoints);
memcpy(pCol->pData + pCol->len, value, pCol->bytes);
memcpy((void *)((char *)pCol->pData + pCol->len), value, pCol->bytes);
pCol->len += pCol->bytes;
break;
}
}
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfPoints) {
int pointsLeft = numOfPoints - pointsToPop;
ASSERT(pointsLeft > 0);
if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) {
ASSERT(pCol->len > 0);
VarDataOffsetT toffset = ((VarDataOffsetT *)(pCol->pData))[pointsToPop];
pCol->len = pCol->len - toffset;
ASSERT(pCol->len > 0);
memmove(pCol->pData, (void *)((char *)(pCol->pData) + toffset), pCol->len);
dataColSetOffset(pCol, pointsLeft);
} else {
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfPoints);
pCol->len = TYPE_BYTES[pCol->type] * pointsLeft;
memmove(pCol->pData, (void *)((char *)(pCol->pData) + TYPE_BYTES[pCol->type] * pointsToPop), pCol->len);
}
}
bool isNEleNull(SDataCol *pCol, int nEle) {
void *ptr = NULL;
switch (pCol->type) {
......@@ -242,8 +280,7 @@ bool isNEleNull(SDataCol *pCol, int nEle) {
case TSDB_DATA_TYPE_NCHAR:
for (int i = 0; i < nEle; i++) {
ptr = tdGetColDataOfRow(pCol, i);
ptr = (void *)((char *)ptr + sizeof(int16_t));
if (!isNull(ptr, pCol->type)) return false;
if (!isNull(varDataVal(ptr), pCol->type)) return false;
}
return true;
default:
......@@ -259,16 +296,15 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
switch (pCol->type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
pCol->len = sizeof(int32_t) * maxPoints;
pCol->len = 0;
for (int i = 0; i < nEle; i++) {
((int32_t *)(pCol->pData))[i] = pCol->len;
ptr = ((char *)pCol->pData) + pCol->len;
*(int16_t *)ptr = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE;
setNull(ptr + sizeof(int16_t), pCol->type, pCol->bytes);
pCol->len += (sizeof(int16_t) + ((int16_t *)ptr)[0]);
pCol->dataOff[i] = pCol->len;
ptr = (char *)pCol->pData + pCol->len;
varDataLen(ptr) = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE;
setNull(ptr + sizeof(VarDataLenT), pCol->type, pCol->bytes);
pCol->len += varDataTLen(ptr);
}
break;
default:
setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
......@@ -277,13 +313,16 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
}
}
void dataColSetOffset(SDataCol *pCol, int nEle, int maxPoints) {
ASSERT(nEle <= maxPoints && ((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));
void dataColSetOffset(SDataCol *pCol, int nEle) {
ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));
char *tptr = (char *)(pCol->pData);
char *tptr = (char *)(pCol->pData) + sizeof(int32_t) * maxPoints;
VarDataOffsetT offset = 0;
for (int i = 0; i < nEle; i++) {
((int32_t *)(pCol->pData))[i] = tptr - (char *)(pCol->pData);
tptr = tptr + *(int16_t *)tptr + sizeof(int16_t);
((VarDataOffsetT *)(pCol->pData))[i] = offset;
offset += varDataTLen(tptr);
tptr = tptr + varDataTLen(tptr);
}
}
......@@ -294,8 +333,9 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
pCols->maxRowSize = maxRowSize;
pCols->maxCols = maxCols;
pCols->maxPoints = maxRows;
pCols->bufSize = maxRowSize * maxRows;
pCols->buf = malloc(maxRowSize * maxRows);
pCols->buf = malloc(pCols->bufSize);
if (pCols->buf == NULL) {
free(pCols);
return NULL;
......@@ -311,16 +351,8 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
void *ptr = pCols->buf;
for (int i = 0; i < schemaNCols(pSchema); i++) {
pCols->cols[i].type = colType(schemaColAt(pSchema, i));
pCols->cols[i].bytes = colBytes(schemaColAt(pSchema, i));
pCols->cols[i].offset = colOffset(schemaColAt(pSchema, i)) + TD_DATA_ROW_HEAD_SIZE;
pCols->cols[i].colId = colColId(schemaColAt(pSchema, i));
pCols->cols[i].pData = ptr;
ptr = ptr + colBytes(schemaColAt(pSchema, i)) * pCols->maxPoints;
if (colType(schemaColAt(pSchema, i)) == TSDB_DATA_TYPE_BINARY ||
colType(schemaColAt(pSchema, i)) == TSDB_DATA_TYPE_NCHAR)
ptr = ptr + (sizeof(int32_t) + sizeof(int16_t)) * pCols->maxPoints;
dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints);
ASSERT((char *)ptr - (char *)pCols <= pCols->bufSize);
}
}
......@@ -332,8 +364,7 @@ void tdFreeDataCols(SDataCols *pCols) {
}
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
SDataCols *pRet =
tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints);
SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints);
if (pRet == NULL) return NULL;
pRet->numOfCols = pDataCols->numOfCols;
......@@ -344,11 +375,24 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
pRet->cols[i].type = pDataCols->cols[i].type;
pRet->cols[i].colId = pDataCols->cols[i].colId;
pRet->cols[i].bytes = pDataCols->cols[i].bytes;
pRet->cols[i].len = pDataCols->cols[i].len;
pRet->cols[i].offset = pDataCols->cols[i].offset;
pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize;
pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf)));
if (keepData) memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
ASSERT(pDataCols->cols[i].dataOff != NULL);
pRet->cols[i].dataOff =
(int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf)));
}
if (keepData) {
pRet->cols[i].len = pDataCols->cols[i].len;
memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(int32_t) * pDataCols->maxPoints);
}
}
}
return pRet;
......@@ -357,7 +401,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
void tdResetDataCols(SDataCols *pCols) {
pCols->numOfPoints = 0;
for (int i = 0; i < pCols->maxCols; i++) {
pCols->cols[i].len = 0;
dataColReset(pCols->cols + i);
}
}
......@@ -381,37 +425,9 @@ void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
return;
}
int32_t offsetSize = sizeof(int32_t) * pCols->maxPoints;
int32_t toffset = 0;
int tlen = 0;
for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
SDataCol *pCol = pCols->cols + iCol;
ASSERT(pCol->len > 0);
switch (pCol->type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
// memmove offset part
memmove(pCol->pData, pCol->pData + sizeof(int32_t) * pointsToPop, sizeof(int32_t) * pointsLeft);
// memmove string part
toffset = *(int32_t *)pCol->pData;
ASSERT(toffset >= offsetSize);
tlen = pCol->len - toffset;
memmove(pCol->pData + offsetSize, pCol->pData + toffset, tlen);
// update offset part
for (int i = 0; i < pointsLeft; i++) {
((int32_t *)(pCol->pData))[i] -= (toffset - offsetSize);
}
// Update length
pCol->len = offsetSize + tlen;
break;
default:
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * pCols->numOfPoints);
pCol->len = TYPE_BYTES[pCol->type] * pointsLeft;
memmove((void *)(pCol->pData), (void *)((char *)(pCol->pData) + TYPE_BYTES[pCol->type] * pointsToPop),
pCol->len);
break;
}
dataColPopPoints(pCol, pointsToPop, pCols->numOfPoints);
}
pCols->numOfPoints = pointsLeft;
}
......
......@@ -578,7 +578,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
INT32_MAX, comp, buffer, bufferSize);
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
pDataCol->len += (sizeof(int32_t) * maxPoints);
dataColSetOffset(pDataCol, numOfPoints, maxPoints);
dataColSetOffset(pDataCol, numOfPoints);
}
} else {
// No need to decompress, just memcpy it
......@@ -588,7 +588,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
pDataCol->len = sizeof(int32_t) * maxPoints;
memcpy((char *)pDataCol->pData + pDataCol->len, content, len - sizeof(TSCKSUM));
pDataCol->len += (len - sizeof(TSCKSUM));
dataColSetOffset(pDataCol, numOfPoints, maxPoints);
dataColSetOffset(pDataCol, numOfPoints);
break;
default:
......@@ -736,12 +736,8 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
pCompCol->offset = toffset;
void *pStart = NULL;
int32_t tlen = 0;
dataColGetNEleStartAndLen(pDataCol, rowsToWrite, &pStart, &tlen, pDataCols->maxPoints);
int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite);
// TODO: compresee the data
if (pHelper->config.compress) {
if (pHelper->config.compress == TWO_STAGE_COMP) {
pHelper->compBuffer = trealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES);
......@@ -749,11 +745,11 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
}
pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))(
(char *)pStart, tlen, rowsToWrite, tptr, tsizeof(pHelper->blockBuffer) - lsize, pHelper->config.compress,
pHelper->compBuffer, tsizeof(pHelper->compBuffer));
(char *)pDataCol->pData, tlen, rowsToWrite, tptr, tsizeof(pHelper->blockBuffer) - lsize,
pHelper->config.compress, pHelper->compBuffer, tsizeof(pHelper->compBuffer));
} else {
pCompCol->len = tlen;
memcpy(tptr, pStart, pCompCol->len);
memcpy(tptr, pDataCol->pData, pCompCol->len);
}
// Add checksum
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册