From e94bb935089988dea49389d53d3d9a0335a6b7d9 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sat, 3 Jul 2021 14:59:04 +0800 Subject: [PATCH] add sversion for KV type of SMemRow --- src/client/inc/tscUtil.h | 2 + src/client/src/tscUtil.c | 19 +++-- src/common/inc/tdataformat.h | 131 +++++++++++------------------------ src/common/src/tdataformat.c | 41 ++--------- src/cq/src/cqMain.c | 8 +-- src/inc/taosdef.h | 4 +- src/inc/ttype.h | 1 + src/tsdb/src/tsdbCommit.c | 20 +++--- src/tsdb/src/tsdbMain.c | 12 ++-- src/tsdb/src/tsdbMemTable.c | 20 +++--- src/tsdb/src/tsdbRead.c | 6 +- 11 files changed, 93 insertions(+), 171 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 6515c7c4e5..1a9fb4c665 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -40,6 +40,8 @@ extern "C" { #define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE)) +#define KvRowNColsThresh 1 // default 1200. TODO: only for test, restore to default value after test finished + #pragma pack(push,1) // this struct is transfered as binary, padding two bytes to avoid // an 'uid' whose low bytes is 0xff being recoginized as NULL, diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7e4a656622..fb09ceb5cb 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1639,17 +1639,14 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i return TSDB_CODE_SUCCESS; } - -#define KvRowNColsThresh 1 // default value: 1200 TODO: for test, restore to default value after test finished - -static FORCE_INLINE uint8_t tdRowTypeJudger(SSchema* pSchema, void* pData, int32_t nCols, int32_t flen, - uint16_t* nColsNotNull) { +static FORCE_INLINE uint8_t checkTdRowType(SSchema* pSchema, void* pData, int32_t nCols, int32_t flen, + uint16_t* nColsNotNull) { ASSERT(pData != NULL); if (nCols < KvRowNColsThresh) { return SMEM_ROW_DATA; } int32_t dataRowLength = flen; - int32_t kvRowLength = 0; + int32_t kvRowLength = TD_MEM_ROW_KV_VER_SIZE; uint16_t nColsNull = 0; char* p = (char*)pData; @@ -1685,7 +1682,6 @@ static FORCE_INLINE uint8_t tdRowTypeJudger(SSchema* pSchema, void* pData, int32 return SMEM_ROW_DATA; } - SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { SSchema* pSchema = pBuilder->pSchema; char* p = (char*)pBuilder->buf; @@ -1696,13 +1692,13 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { } uint16_t nColsNotNull = 0; - uint8_t memRowType = tdRowTypeJudger(pSchema, p, pBuilder->nCols, pBuilder->flen, &nColsNotNull); - + uint8_t memRowType = checkTdRowType(pSchema, p, pBuilder->nCols, pBuilder->flen, &nColsNotNull); + // nColsNotNull = pBuilder->nCols; SMemRow* memRow = (SMemRow)pBuilder->pDataBlock; memRowSetType(memRow, memRowType); if (memRowType == SMEM_ROW_DATA) { - SDataRow trow = (SDataRow)memRowBody(memRow); + SDataRow trow = (SDataRow)memRowDataBody(memRow); dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen)); dataRowSetVersion(trow, pBuilder->sversion); @@ -1715,10 +1711,11 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { pBuilder->buf = p; } else if (memRowType == SMEM_ROW_KV) { ASSERT(nColsNotNull <= pBuilder->nCols); - SKVRow kvRow = (SKVRow)memRowBody(memRow); + SKVRow kvRow = (SKVRow)memRowKvBody(memRow); uint16_t tlen = TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nColsNotNull; kvRowSetLen(kvRow, tlen); kvRowSetNCols(kvRow, nColsNotNull); + memRowKvSetVersion(memRow, pBuilder->sversion); p = (char*)pBuilder->buf; for (int32_t j = 0; j < pBuilder->nCols; ++j) { diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 3901f3cec0..b0402d4674 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -24,31 +24,6 @@ extern "C" { #endif -typedef struct { - VarDataLenT len; - uint8_t data; -} SBinaryNullT; - -typedef struct { - VarDataLenT len; - uint32_t data; -} SNCharNullT; - -extern const uint8_t BoolNull; -extern const uint8_t TinyintNull; -extern const uint16_t SmallintNull; -extern const uint32_t IntNull; -extern const uint64_t BigintNull; -extern const uint64_t TimestampNull; -extern const uint8_t UTinyintNull; -extern const uint16_t USmallintNull; -extern const uint32_t UIntNull; -extern const uint64_t UBigintNull; -extern const uint32_t FloatNull; -extern const uint64_t DoubleNull; -extern const SBinaryNullT BinaryNull; -extern const SNCharNullT NcharNull; - #define STR_TO_VARSTR(x, str) \ do { \ VarDataLenT __len = (VarDataLenT)strlen(str); \ @@ -215,7 +190,7 @@ typedef void *SDataRow; #define TD_DATA_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t)) #define dataRowLen(r) (*(uint16_t *)(r)) -#define dataRowVersion(r) *(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)) +#define dataRowVersion(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))) #define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE) #define dataRowTKey(r) (*(TKEY *)(dataRowTuple(r))) #define dataRowKey(r) tdGetKey(dataRowTKey(r)) @@ -232,7 +207,7 @@ SDataRow tdDataRowDup(SDataRow row); SMemRow tdMemRowDup(SMemRow row); // offset here not include dataRow header length -static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t bytes, int32_t offset) { +static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) { ASSERT(value != NULL); int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE; char * ptr = (char *)POINTER_SHIFT(row, dataRowLen(row)); @@ -276,56 +251,17 @@ typedef struct SDataCol { TSKEY ts; // only used in last NULL column } SDataCol; -#define isAllRowOfColNull(pCol) ((pCol)->len == 0) static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints); -void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints); +void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints); void dataColSetOffset(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle); void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints); -static FORCE_INLINE const void *tdGetNullVal(int8_t type) { - switch (type) { - case TSDB_DATA_TYPE_BOOL: - return &BoolNull; - case TSDB_DATA_TYPE_TINYINT: - return &TinyintNull; - case TSDB_DATA_TYPE_SMALLINT: - return &SmallintNull; - case TSDB_DATA_TYPE_INT: - return &IntNull; - case TSDB_DATA_TYPE_BIGINT: - return &BigintNull; - case TSDB_DATA_TYPE_FLOAT: - return &FloatNull; - case TSDB_DATA_TYPE_DOUBLE: - return &DoubleNull; - case TSDB_DATA_TYPE_BINARY: - return &BinaryNull; - case TSDB_DATA_TYPE_TIMESTAMP: - return &TimestampNull; - case TSDB_DATA_TYPE_NCHAR: - return &NcharNull; - case TSDB_DATA_TYPE_UTINYINT: - return &UTinyintNull; - case TSDB_DATA_TYPE_USMALLINT: - return &USmallintNull; - case TSDB_DATA_TYPE_UINT: - return &UIntNull; - case TSDB_DATA_TYPE_UBIGINT: - return &UBigintNull; - default: - ASSERT(0); - return NULL; - } -} // Get the data pointer from a column-wised data -static FORCE_INLINE const void *tdGetColDataOfRow(SDataCol *pCol, int row) { - if (isAllRowOfColNull(pCol)) { - return tdGetNullVal(pCol->type); - } +static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) { if (IS_VAR_DATA_TYPE(pCol->type)) { return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]); } else { @@ -432,7 +368,6 @@ typedef struct { #define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i)) #define kvRowFree(r) tfree(r) #define kvRowEnd(r) POINTER_SHIFT(r, kvRowLen(r)) -#define kvRowVersion(r) (-1) #define kvRowTKey(r) (*(TKEY *)(kvRowValues(r))) #define kvRowKey(r) tdGetKey(kvRowTKey(r)) #define kvRowDeleted(r) TKEY_IS_DELETED(kvRowTKey(r)) @@ -532,7 +467,7 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, return 0; } -// ----------------- Sequential Data row structure +// ----------------- SMemRow appended with sequential data row structure /* * |-------------------------------+--------------------------- len ---------------------------------->| * |<-------- Head ------>|<--------- flen -------------->| | @@ -545,43 +480,55 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, * NOTE: timestamp in this row structure is TKEY instead of TSKEY */ -// ----------------- K-V data row structure -/* - * |--------------------+----------+---------------------------------+---------------------------------+ - * | uint8_t | uint16_t | int16_t | | | - * |---------+----------+----------+---------------------------------+---------------------------------+ - * | flag | len | ncols | cols index | data part | - * |---------+----------+----------+---------------------------------+---------------------------------+ +// ----------------- SMemRow appended with extended K-V data row structure +/* | + * |--------------------+----------+--------------------------------------------+---------------------------------+ + * | uint8_t | int16_t | uint16_t | int16_t | | | + * |---------+----------+----------+----------+---------------------------------+---------------------------------+ + * | flag | sversion | len | ncols | cols index | data part | + * |---------+----------+----------+----------+---------------------------------+---------------------------------+ */ #define TD_MEM_ROW_TYPE_SIZE sizeof(uint8_t) -#define TD_MEM_ROW_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + sizeof(uint16_t) + sizeof(int16_t)) +#define TD_MEM_ROW_KV_VER_SIZE sizeof(int16_t) +#define TD_MEM_ROW_KV_TYPE_VER_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE) +#define TD_MEM_ROW_DATA_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_DATA_ROW_HEAD_SIZE) +#define TD_MEM_ROW_KV_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE + TD_KV_ROW_HEAD_SIZE) #define SMEM_ROW_DATA 0U // SDataRow #define SMEM_ROW_KV 1U // SKVRow -#define TD_DO_NOTHING \ - do { \ - } while (0) #define memRowType(r) (*(uint8_t *)(r)) -#define memRowBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) -#define memRowLen(r) (*(uint16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) -#define memRowTLen(r) (*(uint16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) + (uint16_t)TD_MEM_ROW_TYPE_SIZE) - #define isDataRow(r) (SMEM_ROW_DATA == memRowType(r)) #define isKvRow(r) (SMEM_ROW_KV == memRowType(r)) -#define memRowVersion(r) (isDataRow(r) ? dataRowVersion(memRowBody(r)) : kvRowVersion(r)) // schema version -#define memRowTuple(r) (isDataRow(r) ? dataRowTuple(memRowBody(r)) : kvRowValues(memRowBody(r))) +#define memRowDataBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) // section after flag +#define memRowKvBody(r) \ + POINTER_SHIFT(r, TD_MEM_ROW_KV_TYPE_VER_SIZE) // section after flag + sversion as to reuse of SKVRow +// #define memRowBody(r) (isDataRow(r) ? memRowDataBody(r) : memRowKvBody(r)) + +#define memRowDataLen(r) (*(TDRowLenT *)memRowDataBody(r)) +#define memRowKvLen(r) (*(TDRowLenT *)memRowKvBody(r)) +#define memRowDataTLen(r) (memRowDataLen(r) + (TDRowLenT)TD_MEM_ROW_TYPE_SIZE) +#define memRowKvTLen(r) (memRowKvLen(r) + (TDRowLenT)TD_MEM_ROW_KV_TYPE_VER_SIZE) + +#define memRowLen(r) (isDataRow(r) ? memRowDataLen(r) : memRowKvLen(r)) +#define memRowTLen(r) (isDataRow(r) ? memRowDataTLen(r) : memRowKvTLen(r)) + +#define memRowDataVersion(r) dataRowVersion(memRowDataBody(r)) +#define memRowKvVersion(r) (*(int16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) +#define memRowVersion(r) (isDataRow(r) ? memRowDataVersion(r) : memRowKvVersion(r)) // schema version +#define memRowKvSetVersion(r, v) (memRowKvVersion(r) = (v)) +#define memRowTuple(r) (isDataRow(r) ? dataRowTuple(memRowDataBody(r)) : kvRowValues(memRowKvBody(r))) -#define memRowTKey(r) (isDataRow(r) ? dataRowTKey(memRowBody(r)) : kvRowTKey(memRowBody(r))) -#define memRowKey(r) (isDataRow(r) ? dataRowKey(memRowBody(r)) : kvRowKey(memRowBody(r))) +#define memRowTKey(r) (isDataRow(r) ? dataRowTKey(memRowDataBody(r)) : kvRowTKey(memRowKvBody(r))) +#define memRowKey(r) (isDataRow(r) ? dataRowKey(memRowDataBody(r)) : kvRowKey(memRowKvBody(r))) #define memRowSetType(r, t) (memRowType(r) = (t)) -#define memRowSetLen(r, l) (memRowLen(r) = (l)) -#define memRowSetVersion(r, v) (isDataRow(r) ? dataRowSetVersion(r, v) : TD_DO_NOTHING) +#define memRowSetLen(r, l) (isDataRow(r) ? memRowDataLen(r) = (l) : memRowKvLen(r) = (l)) +#define memRowSetVersion(r, v) (isDataRow(r) ? dataRowSetVersion(memRowDataBody(r), v) : memRowKvSetVersion(r, v)) #define memRowCpy(dst, r) memcpy((dst), (r), memRowTLen(r)) -#define memRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_MEM_ROW_HEAD_SIZE) +#define memRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_MEM_ROW_DATA_HEAD_SIZE) #define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r)) // NOTE: offset here including the header size diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 6061b84209..a46d2e84b0 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -18,21 +18,6 @@ #include "tcoding.h" #include "wchar.h" -const uint8_t BoolNull = TSDB_DATA_BOOL_NULL; -const uint8_t TinyintNull = TSDB_DATA_TINYINT_NULL; -const uint16_t SmallintNull = TSDB_DATA_SMALLINT_NULL; -const uint32_t IntNull = TSDB_DATA_INT_NULL; -const uint64_t BigintNull = TSDB_DATA_BIGINT_NULL; -const uint64_t TimestampNull = TSDB_DATA_BIGINT_NULL; -const uint8_t UTinyintNull = TSDB_DATA_UTINYINT_NULL; -const uint16_t USmallintNull = TSDB_DATA_USMALLINT_NULL; -const uint32_t UIntNull = TSDB_DATA_UINT_NULL; -const uint64_t UBigintNull = TSDB_DATA_UBIGINT_NULL; -const uint32_t FloatNull = TSDB_DATA_FLOAT_NULL; -const uint64_t DoubleNull = TSDB_DATA_DOUBLE_NULL; -const SBinaryNullT BinaryNull = {1, TSDB_DATA_BINARY_NULL}; -const SNCharNullT NcharNull = {4, TSDB_DATA_NCHAR_NULL}; - static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows); @@ -241,21 +226,9 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) } } // value from timestamp should be TKEY here instead of TSKEY -void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) { +void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) { ASSERT(pCol != NULL && value != NULL); - if (pCol->len == 0) { - if (isNull(value, pCol->type)) { - // all null value yet, just return - return; - } - - if (numOfRows > 0) { - // Find the first not null value, fill all previous values as NULL - dataColSetNEleNull(pCol, numOfRows, maxPoints); - } - } - if (IS_VAR_DATA_TYPE(pCol->type)) { // set offset pCol->dataOff[numOfRows] = pCol->len; @@ -452,7 +425,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols while (dcol < pCols->numOfCols) { SDataCol *pDataCol = &(pCols->cols[dcol]); if (rcol >= schemaNCols(pSchema)) { - dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + dataColSetNullAt(pDataCol, pCols->numOfRows); dcol++; continue; } @@ -466,7 +439,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols } else if (pRowCol->colId < pDataCol->colId) { rcol++; } else { - dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + dataColSetNullAt(pDataCol, pCols->numOfRows); dcol++; } } @@ -495,7 +468,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo while (dcol < pCols->numOfCols) { SDataCol *pDataCol = &(pCols->cols[dcol]); if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { - dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + dataColSetNullAt(pDataCol, pCols->numOfRows); ++dcol; continue; } @@ -510,7 +483,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo } else if (colIdx->colId < pDataCol->colId) { ++rcol; } else { - dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + dataColSetNullAt(pDataCol, pCols->numOfRows); ++dcol; } } @@ -520,9 +493,9 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols) { if (isDataRow(row)) { - tdAppendDataRowToDataCol(memRowBody(row), pSchema, pCols); + tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols); } else if (isKvRow(row)) { - tdAppendKvRowToDataCol(memRowBody(row), pSchema, pCols); + tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols); } else { ASSERT(0); } diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 6fe530dce5..ee1022ea0c 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -476,7 +476,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); - int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_MEM_ROW_HEAD_SIZE + pObj->rowSize; + int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_MEM_ROW_DATA_HEAD_SIZE + pObj->rowSize; char *buffer = calloc(size, 1); SWalHead *pHead = (SWalHead *)buffer; @@ -484,7 +484,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); SMemRow trow = (SMemRow)pBlk->data; - SDataRow dataRow = (SDataRow)memRowBody(trow); + SDataRow dataRow = (SDataRow)memRowDataBody(trow); memRowSetType(trow, SMEM_ROW_DATA); tdInitDataRow(dataRow, pSchema); @@ -504,7 +504,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { } tdAppendColVal(dataRow, val, c->type, c->bytes, c->offset); } - pBlk->dataLen = htonl(memRowTLen(trow)); + pBlk->dataLen = htonl(memRowDataTLen(trow)); pBlk->schemaLen = 0; pBlk->uid = htobe64(pObj->uid); @@ -513,7 +513,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { pBlk->sversion = htonl(pSchema->version); pBlk->padding = 0; - pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + memRowTLen(trow); + pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + memRowDataTLen(trow); pMsg->header.vgId = htonl(pContext->vgId); pMsg->header.contLen = htonl(pHead->len); diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 85183666fe..7b53fcc638 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -321,8 +321,8 @@ do { \ #define TSDB_MAX_JOIN_TABLE_NUM 10 #define TSDB_MAX_UNION_CLAUSE 5 -#define TSDB_MAX_BINARY_LEN (16384-TSDB_KEYSIZE) // TD-4666: TODO use TSDB_MAX_BYTES_PER_ROW later. -#define TSDB_MAX_NCHAR_LEN (16384-TSDB_KEYSIZE) // TD-4666: TODO use TSDB_MAX_BYTES_PER_ROW later. +#define TSDB_MAX_BINARY_LEN (16384-TSDB_KEYSIZE) // keep 16384 +#define TSDB_MAX_NCHAR_LEN (16384-TSDB_KEYSIZE) // keep 16384 #define PRIMARYKEY_TIMESTAMP_COL_INDEX 0 #define TSDB_MAX_RPC_THREADS 5 diff --git a/src/inc/ttype.h b/src/inc/ttype.h index 80b8ddcd4e..0900f45350 100644 --- a/src/inc/ttype.h +++ b/src/inc/ttype.h @@ -12,6 +12,7 @@ extern "C" { // ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR typedef int32_t VarDataOffsetT; typedef int16_t VarDataLenT; // maxVarDataLen: 32767 +typedef uint16_t TDRowLenT; typedef struct tstr { VarDataLenT len; diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 5700b87d5e..69d855e57c 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -920,21 +920,21 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo SDataCol * pDataCol = pDataCols->cols + ncol; SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull; - if (isAllRowOfColNull(pDataCol)) { // all data to commit are NULL, just ignore it + if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it continue; } - memset(pBlockCol, 0, sizeof(*pBlockCol)); + memset(pBlockCol, 0, sizeof(*pBlockCol)); - pBlockCol->colId = pDataCol->colId; - pBlockCol->type = pDataCol->type; - if (tDataTypes[pDataCol->type].statisFunc) { - (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), - &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), - &(pBlockCol->numOfNull)); + pBlockCol->colId = pDataCol->colId; + pBlockCol->type = pDataCol->type; + if (tDataTypes[pDataCol->type].statisFunc) { + (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), + &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), + &(pBlockCol->numOfNull)); + } + nColsNotAllNull++; } - nColsNotAllNull++; - } ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index b34308fd76..272a915d61 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -663,7 +663,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea } memRowSetType(row, SMEM_ROW_DATA); - tdInitDataRow(memRowBody(row), pSchema); + tdInitDataRow(memRowDataBody(row), pSchema); // first load block index info if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { @@ -720,9 +720,9 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea // OK,let's load row from backward to get not-null column for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) { SDataCol *pDataCol = pReadh->pDCols[0]->cols + i; - tdAppendColVal(memRowBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); + tdAppendColVal(memRowDataBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); //SDataCol *pDataCol = readh.pDCols[0]->cols + j; - void *value = tdGetRowDataOfCol(memRowBody(row), (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); + void *value = tdGetRowDataOfCol(memRowDataBody(row), (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); if (isNull(value, pCol->type)) { continue; } @@ -742,7 +742,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea // save row ts(in column 0) pDataCol = pReadh->pDCols[0]->cols + 0; pCol = schemaColAt(pSchema, 0); - tdAppendColVal(memRowBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); + tdAppendColVal(memRowDataBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); pLastCol->ts = memRowKey(row); pTable->restoreColumnNum += 1; @@ -785,11 +785,11 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, return -1; } memRowSetType(pTable->lastRow, SMEM_ROW_DATA); - tdInitDataRow(memRowBody(pTable->lastRow), pSchema); + tdInitDataRow(memRowDataBody(pTable->lastRow), pSchema); for (int icol = 0; icol < schemaNCols(pSchema); icol++) { STColumn *pCol = schemaColAt(pSchema, icol); SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol; - tdAppendColVal(memRowBody(pTable->lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, + tdAppendColVal(memRowDataBody(pTable->lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes, pCol->offset); } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 4314514105..ad097ec0f5 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -932,13 +932,15 @@ static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **ro int64_t osize = SL_SIZE(pTableData->pData); tSkipListPutBatch(pTableData->pData, rows, rowCounter); int64_t dsize = SL_SIZE(pTableData->pData) - osize; + TSKEY keyFirstRow = memRowKey(rows[0]); + TSKEY keyLastRow = memRowKey(rows[rowCounter - 1]); - if (pMemTable->keyFirst > memRowKey(rows[0])) pMemTable->keyFirst = memRowKey(rows[0]); - if (pMemTable->keyLast < memRowKey(rows[rowCounter - 1])) pMemTable->keyLast = memRowKey(rows[rowCounter - 1]); + if (pMemTable->keyFirst > keyFirstRow) pMemTable->keyFirst = keyFirstRow; + if (pMemTable->keyLast < keyLastRow) pMemTable->keyLast = keyLastRow; pMemTable->numOfRows += dsize; - if (pTableData->keyFirst > memRowKey(rows[0])) pTableData->keyFirst = memRowKey(rows[0]); - if (pTableData->keyLast < memRowKey(rows[rowCounter - 1])) pTableData->keyLast = memRowKey(rows[rowCounter - 1]); + if (pTableData->keyFirst > keyFirstRow) pTableData->keyFirst = keyFirstRow; + if (pTableData->keyLast < keyLastRow) pTableData->keyLast = keyLastRow; pTableData->numOfRows += dsize; // update table latest info @@ -1004,8 +1006,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro SDataCol *pLatestCols = pTable->lastCols; - bool isDataRow = isDataRow(row); - void *rowBody = memRowBody(row); + bool isDataRow = isDataRow(row); for (int16_t j = 0; j < schemaNCols(pSchema); j++) { STColumn *pTCol = schemaColAt(pSchema, j); // ignore not exist colId @@ -1017,12 +1018,13 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro void *value = NULL; if (isDataRow) { - value = tdGetRowDataOfCol(rowBody, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); + value = tdGetRowDataOfCol(memRowDataBody(row), (int8_t)pTCol->type, + TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); } else { // SKVRow - SColIdx *pColIdx = tdGetKVRowIdxOfCol(rowBody, pTCol->colId); + SColIdx *pColIdx = tdGetKVRowIdxOfCol(memRowKvBody(row), pTCol->colId); if (pColIdx) { - value = tdGetKvRowDataOfCol(rowBody, pColIdx->offset); + value = tdGetKvRowDataOfCol(memRowKvBody(row), pColIdx->offset); } } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 7f344dc646..2d3e7b9e8b 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -1392,7 +1392,7 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity // todo refactor, only copy one-by-one for (int32_t k = start; k < num + start; ++k) { - const char* p = tdGetColDataOfRow(src, k); + char* p = tdGetColDataOfRow(src, k); memcpy(dst, p, varDataTLen(p)); dst += bytes; } @@ -1459,7 +1459,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t i = 0; if (isDataRow(row)) { - SDataRow dataRow = memRowBody(row); + SDataRow dataRow = memRowDataBody(row); int32_t j = 0; while (i < numOfCols && j < numOfRowCols) { SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); @@ -1529,7 +1529,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, } } } else if (isKvRow(row)) { - SKVRow kvRow = memRowBody(row); + SKVRow kvRow = memRowKvBody(row); int32_t k = 0; int32_t nKvRowCols = kvRowNCols(kvRow); -- GitLab