提交 802de9d5 编写于 作者: H hzcheng

TD-166

上级 7ed514b7
......@@ -656,7 +656,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
int toffset = 0;
for (int32_t j = 0; j < tinfo.numOfColumns; j++) {
tdAppendColVal(trow, isNull(p, pSchema[j].type) ? NULL : p, pSchema[j].type, pSchema[j].bytes, toffset);
tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset);
toffset += TYPE_BYTES[pSchema[j].type];
p += pSchema[j].bytes;
}
......
......@@ -110,6 +110,8 @@ typedef struct {
int maxRowSize;
int maxCols; // max number of columns
int maxPoints; // max number of points
int exColBytes; // extra column bytes to allocate for each column
int numOfPoints;
int numOfCols; // Total number of cols
int sversion; // TODO: set sversion
......@@ -122,7 +124,7 @@ typedef struct {
#define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0)
#define dataColsKeyLast(pCols) dataColsKeyAt(pCols, (pCols)->numOfPoints - 1)
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows, int exColBytes);
void tdResetDataCols(SDataCols *pCols);
void tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
......
......@@ -166,36 +166,38 @@ void tdFreeDataRow(SDataRow row) {
* @param offset: offset in the data row tuple, not including the data row header
*/
int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) {
ASSERT(value != NULL);
int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE;
char * ptr = dataRowAt(row, dataRowLen(row));
switch (type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
if (value == NULL) {
*(int32_t *)dataRowAt(row, toffset) = -1;
// set offset
*(int32_t *)dataRowAt(row, toffset) = dataRowLen(row);
// set length
int16_t slen = 0;
if (isNull(value, type)) {
slen = (type == TSDB_DATA_TYPE_BINARY) ? sizeof(int8_t) : sizeof(int32_t);
} else {
int16_t slen = 0;
if (type == TSDB_DATA_TYPE_BINARY) {
slen = strnlen((char *)value, bytes);
} else {
slen = wcsnlen((wchar_t *)value, (bytes) / TSDB_NCHAR_SIZE) * TSDB_NCHAR_SIZE;
}
if (slen > bytes) return -1;
*(int32_t *)dataRowAt(row, toffset) = dataRowLen(row);
*(int16_t *)ptr = slen;
ptr += sizeof(int16_t);
memcpy((void *)ptr, value, slen);
dataRowLen(row) += (sizeof(int16_t) + slen);
}
ASSERT(slen <= bytes);
*(int16_t *)ptr = slen;
ptr += sizeof(int16_t);
memcpy((void *)ptr, value, slen);
dataRowLen(row) += (sizeof(int16_t) + slen);
break;
default:
if (value == NULL) {
setNull(dataRowAt(row, toffset), type, bytes);
} else {
memcpy(dataRowAt(row, toffset), value, TYPE_BYTES[type]);
}
memcpy(dataRowAt(row, toffset), value, TYPE_BYTES[type]);
break;
}
......@@ -212,15 +214,16 @@ SDataRow tdDataRowDup(SDataRow row) {
return trow;
}
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows, int exColBytes) {
SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols);
if (pCols == NULL) return NULL;
pCols->maxRowSize = maxRowSize;
pCols->maxCols = maxCols;
pCols->maxPoints = maxRows;
pCols->exColBytes = exColBytes;
pCols->buf = malloc(maxRowSize * maxRows);
pCols->buf = malloc(maxRowSize * maxRows + exColBytes * maxCols);
if (pCols->buf == NULL) {
free(pCols);
return NULL;
......@@ -234,30 +237,34 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
tdResetDataCols(pCols);
pCols->numOfCols = schemaNCols(pSchema);
pCols->cols[0].pData = pCols->buf;
int offset = TD_DATA_ROW_HEAD_SIZE;
void *ptr = pCols->buf;
for (int i = 0; i < schemaNCols(pSchema); i++) {
if (i > 0) {
pCols->cols[i].pData = (char *)(pCols->cols[i - 1].pData) + schemaColAt(pSchema, i - 1)->bytes * pCols->maxPoints;
}
pCols->cols[i].type = colType(schemaColAt(pSchema, i));
pCols->cols[i].bytes = colBytes(schemaColAt(pSchema, i));
pCols->cols[i].offset = offset;
pCols->cols[i].offset = colOffset(schemaColAt(pSchema, i)) + TD_DATA_ROW_HEAD_SIZE;
pCols->cols[i].colId = colColId(schemaColAt(pSchema, i));
pCols->cols[i].pData = ptr;
offset += TYPE_BYTES[pCols->cols[i].type];
ptr = ptr + pCols->exColBytes + colBytes(schemaColAt(pSchema, i)) * pCols->maxPoints;
if (colType(schemaColAt(pSchema, i)) == TSDB_DATA_TYPE_BINARY ||
colType(schemaColAt(pSchema, i)) == TSDB_DATA_TYPE_NCHAR)
ptr = ptr + (sizeof(int32_t) + sizeof(int16_t)) * pCols->maxPoints;
}
}
void tdFreeDataCols(SDataCols *pCols) {
if (pCols) {
if (pCols->buf) free(pCols->buf);
tfree(pCols->buf);
free(pCols);
}
}
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints);
SDataCols *pRet =
tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints, pDataCols->exColBytes);
if (pRet == NULL) return NULL;
pRet->numOfCols = pDataCols->numOfCols;
......@@ -272,7 +279,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
pRet->cols[i].offset = pDataCols->cols[i].offset;
pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf)));
if (keepData) memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pRet->cols[i].bytes * pDataCols->numOfPoints);
if (keepData) memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
}
return pRet;
......@@ -288,22 +295,58 @@ void tdResetDataCols(SDataCols *pCols) {
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) {
for (int i = 0; i < pCols->numOfCols; i++) {
SDataCol *pCol = pCols->cols + i;
memcpy((void *)((char *)(pCol->pData) + pCol->len), dataRowAt(row, pCol->offset), pCol->bytes);
pCol->len += pCol->bytes;
void *ptr = NULL;
int32_t toffset = 0;
switch (pCol->type)
{
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
if (pCols->numOfPoints == 0) pCol->len = sizeof(int32_t) * pCols->maxPoints;
toffset = *(int32_t *)dataRowAt(row, pCol->offset);
if (toffset < 0) {
// It is a NULL value
// TODO: make interface and macros to hide literal thing
((int32_t *)pCol->pData)[pCols->numOfPoints] = -1;
} else {
ptr = dataRowAt(row, toffset);
// TODO: use interface to avoid int16_t stuff
memcpy(pCol->pData, ptr, *(int16_t *)ptr);
((int32_t *)pCol->pData)[pCols->numOfPoints] = pCol->len;
}
break;
default:
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * pCols->numOfPoints);
memcpy(pCol->pData + pCol->len, dataRowAt(row, pCol->offset), pCol->bytes);
pCol->len += pCol->bytes;
break;
}
}
pCols->numOfPoints++;
}
// Pop pointsToPop points from the SDataCols
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
int pointsLeft = pCols->numOfPoints - pointsToPop;
if (pointsLeft < 0) return;
if (pointsLeft == 0) {
tdResetDataCols(pCols);
return;
}
for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
SDataCol *p_col = pCols->cols + iCol;
if (p_col->len > 0) {
p_col->len = TYPE_BYTES[p_col->type] * pointsLeft;
if (pointsLeft > 0) {
memmove((void *)(p_col->pData), (void *)((char *)(p_col->pData) + TYPE_BYTES[p_col->type] * pointsToPop), p_col->len);
}
SDataCol *pCol = pCols->cols + iCol;
ASSERT(pCol->len > 0);
switch (pCol->type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
/* code */
break;
default:
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * pCols->numOfPoints);
pCol->len = TYPE_BYTES[pCol->type] * pointsLeft;
memmove((void *)(pCol->pData), (void *)((char *)(pCol->pData) + TYPE_BYTES[pCol->type] * pointsToPop),
pCol->len);
break;
}
}
pCols->numOfPoints = pointsLeft;
......
......@@ -5,6 +5,7 @@
#include "tsdb.h"
#include "tsdbMain.h"
#include "tscompression.h"
#include "tchecksum.h"
#define TSDB_DEFAULT_PRECISION TSDB_PRECISION_MILLI // default precision
#define IS_VALID_PRECISION(precision) (((precision) >= TSDB_PRECISION_MILLI) && ((precision) <= TSDB_PRECISION_NANO))
......@@ -878,7 +879,9 @@ static void *tsdbCommitData(void *arg) {
}
if (tsdbInitWriteHelper(&whelper, pRepo) < 0) goto _exit;
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) goto _exit;
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock,
sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES)) == NULL)
goto _exit;
int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision);
int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision);
......
......@@ -90,8 +90,8 @@ static void tsdbResetHelperBlock(SRWHelper *pHelper) {
}
static int tsdbInitHelperBlock(SRWHelper *pHelper) {
pHelper->pDataCols[0] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows);
pHelper->pDataCols[1] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows);
pHelper->pDataCols[0] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows, sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES);
pHelper->pDataCols[1] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows, sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES);
if (pHelper->pDataCols[0] == NULL || pHelper->pDataCols[1] == NULL) return -1;
tsdbResetHelperBlockImpl(pHelper);
......
......@@ -406,7 +406,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
if (pCheckInfo->pDataCols == NULL) {
pCheckInfo->pDataCols = tdNewDataCols(1000, 2, 4096);
pCheckInfo->pDataCols = tdNewDataCols(1000, 2, 4096, 0);
}
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj));
......
......@@ -22,6 +22,7 @@ extern "C" {
#include "taosdef.h"
#define COMP_OVERFLOW_BYTES 2
#define BITS_PER_BYTE 8
// Masks
#define INT64MASK(_x) ((1ul << _x) - 1)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册