From 802de9d5450a66001cf50964adad71e7ea118de3 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Mon, 27 Apr 2020 17:40:57 +0800 Subject: [PATCH] TD-166 --- src/client/src/tscUtil.c | 2 +- src/common/inc/tdataformat.h | 4 +- src/common/src/tdataformat.c | 107 ++++++++++++++++++++++++----------- src/tsdb/src/tsdbMain.c | 5 +- src/tsdb/src/tsdbRWHelper.c | 4 +- src/tsdb/src/tsdbRead.c | 2 +- src/util/inc/tscompression.h | 1 + 7 files changed, 87 insertions(+), 38 deletions(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 455eb8df5c..6bcf70dc99 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -656,7 +656,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { int toffset = 0; for (int32_t j = 0; j < tinfo.numOfColumns; j++) { - tdAppendColVal(trow, isNull(p, pSchema[j].type) ? NULL : p, pSchema[j].type, pSchema[j].bytes, toffset); + tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset); toffset += TYPE_BYTES[pSchema[j].type]; p += pSchema[j].bytes; } diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 3ce43f9dba..c938c1cfb1 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -110,6 +110,8 @@ typedef struct { int maxRowSize; int maxCols; // max number of columns int maxPoints; // max number of points + int exColBytes; // extra column bytes to allocate for each column + int numOfPoints; int numOfCols; // Total number of cols int sversion; // TODO: set sversion @@ -122,7 +124,7 @@ typedef struct { #define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0) #define dataColsKeyLast(pCols) dataColsKeyAt(pCols, (pCols)->numOfPoints - 1) -SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); +SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows, int exColBytes); void tdResetDataCols(SDataCols *pCols); void tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 27de4e0e54..1baf048f93 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -166,36 +166,38 @@ void tdFreeDataRow(SDataRow row) { * @param offset: offset in the data row tuple, not including the data row header */ int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) { + ASSERT(value != NULL); int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE; char * ptr = dataRowAt(row, dataRowLen(row)); switch (type) { case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: - if (value == NULL) { - *(int32_t *)dataRowAt(row, toffset) = -1; + // set offset + *(int32_t *)dataRowAt(row, toffset) = dataRowLen(row); + + // set length + int16_t slen = 0; + if (isNull(value, type)) { + slen = (type == TSDB_DATA_TYPE_BINARY) ? sizeof(int8_t) : sizeof(int32_t); } else { - int16_t slen = 0; if (type == TSDB_DATA_TYPE_BINARY) { slen = strnlen((char *)value, bytes); } else { slen = wcsnlen((wchar_t *)value, (bytes) / TSDB_NCHAR_SIZE) * TSDB_NCHAR_SIZE; } - if (slen > bytes) return -1; - - *(int32_t *)dataRowAt(row, toffset) = dataRowLen(row); - *(int16_t *)ptr = slen; - ptr += sizeof(int16_t); - memcpy((void *)ptr, value, slen); - dataRowLen(row) += (sizeof(int16_t) + slen); } + + ASSERT(slen <= bytes); + *(int16_t *)ptr = slen; + ptr += sizeof(int16_t); + + memcpy((void *)ptr, value, slen); + dataRowLen(row) += (sizeof(int16_t) + slen); + break; default: - if (value == NULL) { - setNull(dataRowAt(row, toffset), type, bytes); - } else { - memcpy(dataRowAt(row, toffset), value, TYPE_BYTES[type]); - } + memcpy(dataRowAt(row, toffset), value, TYPE_BYTES[type]); break; } @@ -212,15 +214,16 @@ SDataRow tdDataRowDup(SDataRow row) { return trow; } -SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { +SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows, int exColBytes) { SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols); if (pCols == NULL) return NULL; pCols->maxRowSize = maxRowSize; pCols->maxCols = maxCols; pCols->maxPoints = maxRows; + pCols->exColBytes = exColBytes; - pCols->buf = malloc(maxRowSize * maxRows); + pCols->buf = malloc(maxRowSize * maxRows + exColBytes * maxCols); if (pCols->buf == NULL) { free(pCols); return NULL; @@ -234,30 +237,34 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { tdResetDataCols(pCols); pCols->numOfCols = schemaNCols(pSchema); - pCols->cols[0].pData = pCols->buf; - int offset = TD_DATA_ROW_HEAD_SIZE; + void *ptr = pCols->buf; for (int i = 0; i < schemaNCols(pSchema); i++) { if (i > 0) { pCols->cols[i].pData = (char *)(pCols->cols[i - 1].pData) + schemaColAt(pSchema, i - 1)->bytes * pCols->maxPoints; } pCols->cols[i].type = colType(schemaColAt(pSchema, i)); pCols->cols[i].bytes = colBytes(schemaColAt(pSchema, i)); - pCols->cols[i].offset = offset; + pCols->cols[i].offset = colOffset(schemaColAt(pSchema, i)) + TD_DATA_ROW_HEAD_SIZE; pCols->cols[i].colId = colColId(schemaColAt(pSchema, i)); + pCols->cols[i].pData = ptr; - offset += TYPE_BYTES[pCols->cols[i].type]; + ptr = ptr + pCols->exColBytes + colBytes(schemaColAt(pSchema, i)) * pCols->maxPoints; + if (colType(schemaColAt(pSchema, i)) == TSDB_DATA_TYPE_BINARY || + colType(schemaColAt(pSchema, i)) == TSDB_DATA_TYPE_NCHAR) + ptr = ptr + (sizeof(int32_t) + sizeof(int16_t)) * pCols->maxPoints; } } void tdFreeDataCols(SDataCols *pCols) { if (pCols) { - if (pCols->buf) free(pCols->buf); + tfree(pCols->buf); free(pCols); } } SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { - SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints); + SDataCols *pRet = + tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints, pDataCols->exColBytes); if (pRet == NULL) return NULL; pRet->numOfCols = pDataCols->numOfCols; @@ -272,7 +279,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { pRet->cols[i].offset = pDataCols->cols[i].offset; pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf))); - if (keepData) memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pRet->cols[i].bytes * pDataCols->numOfPoints); + if (keepData) memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); } return pRet; @@ -288,22 +295,58 @@ void tdResetDataCols(SDataCols *pCols) { void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { for (int i = 0; i < pCols->numOfCols; i++) { SDataCol *pCol = pCols->cols + i; - memcpy((void *)((char *)(pCol->pData) + pCol->len), dataRowAt(row, pCol->offset), pCol->bytes); - pCol->len += pCol->bytes; + void *ptr = NULL; + int32_t toffset = 0; + + switch (pCol->type) + { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + if (pCols->numOfPoints == 0) pCol->len = sizeof(int32_t) * pCols->maxPoints; + toffset = *(int32_t *)dataRowAt(row, pCol->offset); + if (toffset < 0) { + // It is a NULL value + // TODO: make interface and macros to hide literal thing + ((int32_t *)pCol->pData)[pCols->numOfPoints] = -1; + } else { + ptr = dataRowAt(row, toffset); + // TODO: use interface to avoid int16_t stuff + memcpy(pCol->pData, ptr, *(int16_t *)ptr); + ((int32_t *)pCol->pData)[pCols->numOfPoints] = pCol->len; + } + break; + default: + ASSERT(pCol->len == TYPE_BYTES[pCol->type] * pCols->numOfPoints); + memcpy(pCol->pData + pCol->len, dataRowAt(row, pCol->offset), pCol->bytes); + pCol->len += pCol->bytes; + break; + } } pCols->numOfPoints++; } // Pop pointsToPop points from the SDataCols void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { int pointsLeft = pCols->numOfPoints - pointsToPop; + if (pointsLeft < 0) return; + if (pointsLeft == 0) { + tdResetDataCols(pCols); + return; + } for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { - SDataCol *p_col = pCols->cols + iCol; - if (p_col->len > 0) { - p_col->len = TYPE_BYTES[p_col->type] * pointsLeft; - if (pointsLeft > 0) { - memmove((void *)(p_col->pData), (void *)((char *)(p_col->pData) + TYPE_BYTES[p_col->type] * pointsToPop), p_col->len); - } + SDataCol *pCol = pCols->cols + iCol; + ASSERT(pCol->len > 0); + switch (pCol->type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + /* code */ + break; + default: + ASSERT(pCol->len == TYPE_BYTES[pCol->type] * pCols->numOfPoints); + pCol->len = TYPE_BYTES[pCol->type] * pointsLeft; + memmove((void *)(pCol->pData), (void *)((char *)(pCol->pData) + TYPE_BYTES[pCol->type] * pointsToPop), + pCol->len); + break; } } pCols->numOfPoints = pointsLeft; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 64c051df3f..299084e2ec 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -5,6 +5,7 @@ #include "tsdb.h" #include "tsdbMain.h" #include "tscompression.h" +#include "tchecksum.h" #define TSDB_DEFAULT_PRECISION TSDB_PRECISION_MILLI // default precision #define IS_VALID_PRECISION(precision) (((precision) >= TSDB_PRECISION_MILLI) && ((precision) <= TSDB_PRECISION_NANO)) @@ -878,7 +879,9 @@ static void *tsdbCommitData(void *arg) { } if (tsdbInitWriteHelper(&whelper, pRepo) < 0) goto _exit; - if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) goto _exit; + if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock, + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES)) == NULL) + goto _exit; int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 079b09c3a3..3bcaa8a8d7 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -90,8 +90,8 @@ static void tsdbResetHelperBlock(SRWHelper *pHelper) { } static int tsdbInitHelperBlock(SRWHelper *pHelper) { - pHelper->pDataCols[0] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows); - pHelper->pDataCols[1] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows); + pHelper->pDataCols[0] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows, sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES); + pHelper->pDataCols[1] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows, sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES); if (pHelper->pDataCols[0] == NULL || pHelper->pDataCols[1] == NULL) return -1; tsdbResetHelperBlockImpl(pHelper); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index eb35be5383..b168055107 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -406,7 +406,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo SArray* sa = getDefaultLoadColumns(pQueryHandle, true); if (pCheckInfo->pDataCols == NULL) { - pCheckInfo->pDataCols = tdNewDataCols(1000, 2, 4096); + pCheckInfo->pDataCols = tdNewDataCols(1000, 2, 4096, 0); } tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj)); diff --git a/src/util/inc/tscompression.h b/src/util/inc/tscompression.h index 55e282296f..a1a3c060be 100644 --- a/src/util/inc/tscompression.h +++ b/src/util/inc/tscompression.h @@ -22,6 +22,7 @@ extern "C" { #include "taosdef.h" +#define COMP_OVERFLOW_BYTES 2 #define BITS_PER_BYTE 8 // Masks #define INT64MASK(_x) ((1ul << _x) - 1) -- GitLab