提交 e94bb935 编写于 作者: C Cary Xu

add sversion for KV type of SMemRow

上级 dd8aaaf3
...@@ -40,6 +40,8 @@ extern "C" { ...@@ -40,6 +40,8 @@ extern "C" {
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \ #define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE)) (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
#define KvRowNColsThresh 1 // default 1200. TODO: only for test, restore to default value after test finished
#pragma pack(push,1) #pragma pack(push,1)
// this struct is transfered as binary, padding two bytes to avoid // this struct is transfered as binary, padding two bytes to avoid
// an 'uid' whose low bytes is 0xff being recoginized as NULL, // an 'uid' whose low bytes is 0xff being recoginized as NULL,
......
...@@ -1639,17 +1639,14 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i ...@@ -1639,17 +1639,14 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE uint8_t checkTdRowType(SSchema* pSchema, void* pData, int32_t nCols, int32_t flen,
#define KvRowNColsThresh 1 // default value: 1200 TODO: for test, restore to default value after test finished uint16_t* nColsNotNull) {
static FORCE_INLINE uint8_t tdRowTypeJudger(SSchema* pSchema, void* pData, int32_t nCols, int32_t flen,
uint16_t* nColsNotNull) {
ASSERT(pData != NULL); ASSERT(pData != NULL);
if (nCols < KvRowNColsThresh) { if (nCols < KvRowNColsThresh) {
return SMEM_ROW_DATA; return SMEM_ROW_DATA;
} }
int32_t dataRowLength = flen; int32_t dataRowLength = flen;
int32_t kvRowLength = 0; int32_t kvRowLength = TD_MEM_ROW_KV_VER_SIZE;
uint16_t nColsNull = 0; uint16_t nColsNull = 0;
char* p = (char*)pData; char* p = (char*)pData;
...@@ -1685,7 +1682,6 @@ static FORCE_INLINE uint8_t tdRowTypeJudger(SSchema* pSchema, void* pData, int32 ...@@ -1685,7 +1682,6 @@ static FORCE_INLINE uint8_t tdRowTypeJudger(SSchema* pSchema, void* pData, int32
return SMEM_ROW_DATA; return SMEM_ROW_DATA;
} }
SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
SSchema* pSchema = pBuilder->pSchema; SSchema* pSchema = pBuilder->pSchema;
char* p = (char*)pBuilder->buf; char* p = (char*)pBuilder->buf;
...@@ -1696,13 +1692,13 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { ...@@ -1696,13 +1692,13 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
} }
uint16_t nColsNotNull = 0; uint16_t nColsNotNull = 0;
uint8_t memRowType = tdRowTypeJudger(pSchema, p, pBuilder->nCols, pBuilder->flen, &nColsNotNull); uint8_t memRowType = checkTdRowType(pSchema, p, pBuilder->nCols, pBuilder->flen, &nColsNotNull);
// nColsNotNull = pBuilder->nCols;
SMemRow* memRow = (SMemRow)pBuilder->pDataBlock; SMemRow* memRow = (SMemRow)pBuilder->pDataBlock;
memRowSetType(memRow, memRowType); memRowSetType(memRow, memRowType);
if (memRowType == SMEM_ROW_DATA) { if (memRowType == SMEM_ROW_DATA) {
SDataRow trow = (SDataRow)memRowBody(memRow); SDataRow trow = (SDataRow)memRowDataBody(memRow);
dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen)); dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen));
dataRowSetVersion(trow, pBuilder->sversion); dataRowSetVersion(trow, pBuilder->sversion);
...@@ -1715,10 +1711,11 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { ...@@ -1715,10 +1711,11 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
pBuilder->buf = p; pBuilder->buf = p;
} else if (memRowType == SMEM_ROW_KV) { } else if (memRowType == SMEM_ROW_KV) {
ASSERT(nColsNotNull <= pBuilder->nCols); ASSERT(nColsNotNull <= pBuilder->nCols);
SKVRow kvRow = (SKVRow)memRowBody(memRow); SKVRow kvRow = (SKVRow)memRowKvBody(memRow);
uint16_t tlen = TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nColsNotNull; uint16_t tlen = TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nColsNotNull;
kvRowSetLen(kvRow, tlen); kvRowSetLen(kvRow, tlen);
kvRowSetNCols(kvRow, nColsNotNull); kvRowSetNCols(kvRow, nColsNotNull);
memRowKvSetVersion(memRow, pBuilder->sversion);
p = (char*)pBuilder->buf; p = (char*)pBuilder->buf;
for (int32_t j = 0; j < pBuilder->nCols; ++j) { for (int32_t j = 0; j < pBuilder->nCols; ++j) {
......
...@@ -24,31 +24,6 @@ ...@@ -24,31 +24,6 @@
extern "C" { extern "C" {
#endif #endif
typedef struct {
VarDataLenT len;
uint8_t data;
} SBinaryNullT;
typedef struct {
VarDataLenT len;
uint32_t data;
} SNCharNullT;
extern const uint8_t BoolNull;
extern const uint8_t TinyintNull;
extern const uint16_t SmallintNull;
extern const uint32_t IntNull;
extern const uint64_t BigintNull;
extern const uint64_t TimestampNull;
extern const uint8_t UTinyintNull;
extern const uint16_t USmallintNull;
extern const uint32_t UIntNull;
extern const uint64_t UBigintNull;
extern const uint32_t FloatNull;
extern const uint64_t DoubleNull;
extern const SBinaryNullT BinaryNull;
extern const SNCharNullT NcharNull;
#define STR_TO_VARSTR(x, str) \ #define STR_TO_VARSTR(x, str) \
do { \ do { \
VarDataLenT __len = (VarDataLenT)strlen(str); \ VarDataLenT __len = (VarDataLenT)strlen(str); \
...@@ -215,7 +190,7 @@ typedef void *SDataRow; ...@@ -215,7 +190,7 @@ typedef void *SDataRow;
#define TD_DATA_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t)) #define TD_DATA_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t))
#define dataRowLen(r) (*(uint16_t *)(r)) #define dataRowLen(r) (*(uint16_t *)(r))
#define dataRowVersion(r) *(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)) #define dataRowVersion(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)))
#define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE) #define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE)
#define dataRowTKey(r) (*(TKEY *)(dataRowTuple(r))) #define dataRowTKey(r) (*(TKEY *)(dataRowTuple(r)))
#define dataRowKey(r) tdGetKey(dataRowTKey(r)) #define dataRowKey(r) tdGetKey(dataRowTKey(r))
...@@ -232,7 +207,7 @@ SDataRow tdDataRowDup(SDataRow row); ...@@ -232,7 +207,7 @@ SDataRow tdDataRowDup(SDataRow row);
SMemRow tdMemRowDup(SMemRow row); SMemRow tdMemRowDup(SMemRow row);
// offset here not include dataRow header length // offset here not include dataRow header length
static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t bytes, int32_t offset) { static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) {
ASSERT(value != NULL); ASSERT(value != NULL);
int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE; int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE;
char * ptr = (char *)POINTER_SHIFT(row, dataRowLen(row)); char * ptr = (char *)POINTER_SHIFT(row, dataRowLen(row));
...@@ -276,56 +251,17 @@ typedef struct SDataCol { ...@@ -276,56 +251,17 @@ typedef struct SDataCol {
TSKEY ts; // only used in last NULL column TSKEY ts; // only used in last NULL column
} SDataCol; } SDataCol;
#define isAllRowOfColNull(pCol) ((pCol)->len == 0)
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints); void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints);
void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints); void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints);
void dataColSetOffset(SDataCol *pCol, int nEle); void dataColSetOffset(SDataCol *pCol, int nEle);
bool isNEleNull(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle);
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints); void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints);
static FORCE_INLINE const void *tdGetNullVal(int8_t type) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
return &BoolNull;
case TSDB_DATA_TYPE_TINYINT:
return &TinyintNull;
case TSDB_DATA_TYPE_SMALLINT:
return &SmallintNull;
case TSDB_DATA_TYPE_INT:
return &IntNull;
case TSDB_DATA_TYPE_BIGINT:
return &BigintNull;
case TSDB_DATA_TYPE_FLOAT:
return &FloatNull;
case TSDB_DATA_TYPE_DOUBLE:
return &DoubleNull;
case TSDB_DATA_TYPE_BINARY:
return &BinaryNull;
case TSDB_DATA_TYPE_TIMESTAMP:
return &TimestampNull;
case TSDB_DATA_TYPE_NCHAR:
return &NcharNull;
case TSDB_DATA_TYPE_UTINYINT:
return &UTinyintNull;
case TSDB_DATA_TYPE_USMALLINT:
return &USmallintNull;
case TSDB_DATA_TYPE_UINT:
return &UIntNull;
case TSDB_DATA_TYPE_UBIGINT:
return &UBigintNull;
default:
ASSERT(0);
return NULL;
}
}
// Get the data pointer from a column-wised data // Get the data pointer from a column-wised data
static FORCE_INLINE const void *tdGetColDataOfRow(SDataCol *pCol, int row) { static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) {
if (isAllRowOfColNull(pCol)) {
return tdGetNullVal(pCol->type);
}
if (IS_VAR_DATA_TYPE(pCol->type)) { if (IS_VAR_DATA_TYPE(pCol->type)) {
return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]); return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
} else { } else {
...@@ -432,7 +368,6 @@ typedef struct { ...@@ -432,7 +368,6 @@ typedef struct {
#define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i)) #define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i))
#define kvRowFree(r) tfree(r) #define kvRowFree(r) tfree(r)
#define kvRowEnd(r) POINTER_SHIFT(r, kvRowLen(r)) #define kvRowEnd(r) POINTER_SHIFT(r, kvRowLen(r))
#define kvRowVersion(r) (-1)
#define kvRowTKey(r) (*(TKEY *)(kvRowValues(r))) #define kvRowTKey(r) (*(TKEY *)(kvRowValues(r)))
#define kvRowKey(r) tdGetKey(kvRowTKey(r)) #define kvRowKey(r) tdGetKey(kvRowTKey(r))
#define kvRowDeleted(r) TKEY_IS_DELETED(kvRowTKey(r)) #define kvRowDeleted(r) TKEY_IS_DELETED(kvRowTKey(r))
...@@ -532,7 +467,7 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, ...@@ -532,7 +467,7 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
return 0; return 0;
} }
// ----------------- Sequential Data row structure // ----------------- SMemRow appended with sequential data row structure
/* /*
* |-------------------------------+--------------------------- len ---------------------------------->| * |-------------------------------+--------------------------- len ---------------------------------->|
* |<-------- Head ------>|<--------- flen -------------->| | * |<-------- Head ------>|<--------- flen -------------->| |
...@@ -545,43 +480,55 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, ...@@ -545,43 +480,55 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
* NOTE: timestamp in this row structure is TKEY instead of TSKEY * NOTE: timestamp in this row structure is TKEY instead of TSKEY
*/ */
// ----------------- K-V data row structure // ----------------- SMemRow appended with extended K-V data row structure
/* /* |
* |--------------------+----------+---------------------------------+---------------------------------+ * |--------------------+----------+--------------------------------------------+---------------------------------+
* | uint8_t | uint16_t | int16_t | | | * | uint8_t | int16_t | uint16_t | int16_t | | |
* |---------+----------+----------+---------------------------------+---------------------------------+ * |---------+----------+----------+----------+---------------------------------+---------------------------------+
* | flag | len | ncols | cols index | data part | * | flag | sversion | len | ncols | cols index | data part |
* |---------+----------+----------+---------------------------------+---------------------------------+ * |---------+----------+----------+----------+---------------------------------+---------------------------------+
*/ */
#define TD_MEM_ROW_TYPE_SIZE sizeof(uint8_t) #define TD_MEM_ROW_TYPE_SIZE sizeof(uint8_t)
#define TD_MEM_ROW_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + sizeof(uint16_t) + sizeof(int16_t)) #define TD_MEM_ROW_KV_VER_SIZE sizeof(int16_t)
#define TD_MEM_ROW_KV_TYPE_VER_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE)
#define TD_MEM_ROW_DATA_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_DATA_ROW_HEAD_SIZE)
#define TD_MEM_ROW_KV_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE + TD_KV_ROW_HEAD_SIZE)
#define SMEM_ROW_DATA 0U // SDataRow #define SMEM_ROW_DATA 0U // SDataRow
#define SMEM_ROW_KV 1U // SKVRow #define SMEM_ROW_KV 1U // SKVRow
#define TD_DO_NOTHING \
do { \
} while (0)
#define memRowType(r) (*(uint8_t *)(r)) #define memRowType(r) (*(uint8_t *)(r))
#define memRowBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)
#define memRowLen(r) (*(uint16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE))
#define memRowTLen(r) (*(uint16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) + (uint16_t)TD_MEM_ROW_TYPE_SIZE)
#define isDataRow(r) (SMEM_ROW_DATA == memRowType(r)) #define isDataRow(r) (SMEM_ROW_DATA == memRowType(r))
#define isKvRow(r) (SMEM_ROW_KV == memRowType(r)) #define isKvRow(r) (SMEM_ROW_KV == memRowType(r))
#define memRowVersion(r) (isDataRow(r) ? dataRowVersion(memRowBody(r)) : kvRowVersion(r)) // schema version
#define memRowTuple(r) (isDataRow(r) ? dataRowTuple(memRowBody(r)) : kvRowValues(memRowBody(r))) #define memRowDataBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) // section after flag
#define memRowKvBody(r) \
POINTER_SHIFT(r, TD_MEM_ROW_KV_TYPE_VER_SIZE) // section after flag + sversion as to reuse of SKVRow
// #define memRowBody(r) (isDataRow(r) ? memRowDataBody(r) : memRowKvBody(r))
#define memRowDataLen(r) (*(TDRowLenT *)memRowDataBody(r))
#define memRowKvLen(r) (*(TDRowLenT *)memRowKvBody(r))
#define memRowDataTLen(r) (memRowDataLen(r) + (TDRowLenT)TD_MEM_ROW_TYPE_SIZE)
#define memRowKvTLen(r) (memRowKvLen(r) + (TDRowLenT)TD_MEM_ROW_KV_TYPE_VER_SIZE)
#define memRowLen(r) (isDataRow(r) ? memRowDataLen(r) : memRowKvLen(r))
#define memRowTLen(r) (isDataRow(r) ? memRowDataTLen(r) : memRowKvTLen(r))
#define memRowDataVersion(r) dataRowVersion(memRowDataBody(r))
#define memRowKvVersion(r) (*(int16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE))
#define memRowVersion(r) (isDataRow(r) ? memRowDataVersion(r) : memRowKvVersion(r)) // schema version
#define memRowKvSetVersion(r, v) (memRowKvVersion(r) = (v))
#define memRowTuple(r) (isDataRow(r) ? dataRowTuple(memRowDataBody(r)) : kvRowValues(memRowKvBody(r)))
#define memRowTKey(r) (isDataRow(r) ? dataRowTKey(memRowBody(r)) : kvRowTKey(memRowBody(r))) #define memRowTKey(r) (isDataRow(r) ? dataRowTKey(memRowDataBody(r)) : kvRowTKey(memRowKvBody(r)))
#define memRowKey(r) (isDataRow(r) ? dataRowKey(memRowBody(r)) : kvRowKey(memRowBody(r))) #define memRowKey(r) (isDataRow(r) ? dataRowKey(memRowDataBody(r)) : kvRowKey(memRowKvBody(r)))
#define memRowSetType(r, t) (memRowType(r) = (t)) #define memRowSetType(r, t) (memRowType(r) = (t))
#define memRowSetLen(r, l) (memRowLen(r) = (l)) #define memRowSetLen(r, l) (isDataRow(r) ? memRowDataLen(r) = (l) : memRowKvLen(r) = (l))
#define memRowSetVersion(r, v) (isDataRow(r) ? dataRowSetVersion(r, v) : TD_DO_NOTHING) #define memRowSetVersion(r, v) (isDataRow(r) ? dataRowSetVersion(memRowDataBody(r), v) : memRowKvSetVersion(r, v))
#define memRowCpy(dst, r) memcpy((dst), (r), memRowTLen(r)) #define memRowCpy(dst, r) memcpy((dst), (r), memRowTLen(r))
#define memRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_MEM_ROW_HEAD_SIZE) #define memRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_MEM_ROW_DATA_HEAD_SIZE)
#define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r)) #define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r))
// NOTE: offset here including the header size // NOTE: offset here including the header size
......
...@@ -18,21 +18,6 @@ ...@@ -18,21 +18,6 @@
#include "tcoding.h" #include "tcoding.h"
#include "wchar.h" #include "wchar.h"
const uint8_t BoolNull = TSDB_DATA_BOOL_NULL;
const uint8_t TinyintNull = TSDB_DATA_TINYINT_NULL;
const uint16_t SmallintNull = TSDB_DATA_SMALLINT_NULL;
const uint32_t IntNull = TSDB_DATA_INT_NULL;
const uint64_t BigintNull = TSDB_DATA_BIGINT_NULL;
const uint64_t TimestampNull = TSDB_DATA_BIGINT_NULL;
const uint8_t UTinyintNull = TSDB_DATA_UTINYINT_NULL;
const uint16_t USmallintNull = TSDB_DATA_USMALLINT_NULL;
const uint32_t UIntNull = TSDB_DATA_UINT_NULL;
const uint64_t UBigintNull = TSDB_DATA_UBIGINT_NULL;
const uint32_t FloatNull = TSDB_DATA_FLOAT_NULL;
const uint64_t DoubleNull = TSDB_DATA_DOUBLE_NULL;
const SBinaryNullT BinaryNull = {1, TSDB_DATA_BINARY_NULL};
const SNCharNullT NcharNull = {4, TSDB_DATA_NCHAR_NULL};
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows); int limit2, int tRows);
...@@ -241,21 +226,9 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) ...@@ -241,21 +226,9 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints)
} }
} }
// value from timestamp should be TKEY here instead of TSKEY // value from timestamp should be TKEY here instead of TSKEY
void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) { void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) {
ASSERT(pCol != NULL && value != NULL); ASSERT(pCol != NULL && value != NULL);
if (pCol->len == 0) {
if (isNull(value, pCol->type)) {
// all null value yet, just return
return;
}
if (numOfRows > 0) {
// Find the first not null value, fill all previous values as NULL
dataColSetNEleNull(pCol, numOfRows, maxPoints);
}
}
if (IS_VAR_DATA_TYPE(pCol->type)) { if (IS_VAR_DATA_TYPE(pCol->type)) {
// set offset // set offset
pCol->dataOff[numOfRows] = pCol->len; pCol->dataOff[numOfRows] = pCol->len;
...@@ -452,7 +425,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols ...@@ -452,7 +425,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
while (dcol < pCols->numOfCols) { while (dcol < pCols->numOfCols) {
SDataCol *pDataCol = &(pCols->cols[dcol]); SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) { if (rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColSetNullAt(pDataCol, pCols->numOfRows);
dcol++; dcol++;
continue; continue;
} }
...@@ -466,7 +439,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols ...@@ -466,7 +439,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
} else if (pRowCol->colId < pDataCol->colId) { } else if (pRowCol->colId < pDataCol->colId) {
rcol++; rcol++;
} else { } else {
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColSetNullAt(pDataCol, pCols->numOfRows);
dcol++; dcol++;
} }
} }
...@@ -495,7 +468,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo ...@@ -495,7 +468,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
while (dcol < pCols->numOfCols) { while (dcol < pCols->numOfCols) {
SDataCol *pDataCol = &(pCols->cols[dcol]); SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColSetNullAt(pDataCol, pCols->numOfRows);
++dcol; ++dcol;
continue; continue;
} }
...@@ -510,7 +483,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo ...@@ -510,7 +483,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
} else if (colIdx->colId < pDataCol->colId) { } else if (colIdx->colId < pDataCol->colId) {
++rcol; ++rcol;
} else { } else {
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColSetNullAt(pDataCol, pCols->numOfRows);
++dcol; ++dcol;
} }
} }
...@@ -520,9 +493,9 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo ...@@ -520,9 +493,9 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols) { void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols) {
if (isDataRow(row)) { if (isDataRow(row)) {
tdAppendDataRowToDataCol(memRowBody(row), pSchema, pCols); tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols);
} else if (isKvRow(row)) { } else if (isKvRow(row)) {
tdAppendKvRowToDataCol(memRowBody(row), pSchema, pCols); tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols);
} else { } else {
ASSERT(0); ASSERT(0);
} }
......
...@@ -476,7 +476,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -476,7 +476,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_MEM_ROW_HEAD_SIZE + pObj->rowSize; int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_MEM_ROW_DATA_HEAD_SIZE + pObj->rowSize;
char *buffer = calloc(size, 1); char *buffer = calloc(size, 1);
SWalHead *pHead = (SWalHead *)buffer; SWalHead *pHead = (SWalHead *)buffer;
...@@ -484,7 +484,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -484,7 +484,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg));
SMemRow trow = (SMemRow)pBlk->data; SMemRow trow = (SMemRow)pBlk->data;
SDataRow dataRow = (SDataRow)memRowBody(trow); SDataRow dataRow = (SDataRow)memRowDataBody(trow);
memRowSetType(trow, SMEM_ROW_DATA); memRowSetType(trow, SMEM_ROW_DATA);
tdInitDataRow(dataRow, pSchema); tdInitDataRow(dataRow, pSchema);
...@@ -504,7 +504,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -504,7 +504,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
} }
tdAppendColVal(dataRow, val, c->type, c->bytes, c->offset); tdAppendColVal(dataRow, val, c->type, c->bytes, c->offset);
} }
pBlk->dataLen = htonl(memRowTLen(trow)); pBlk->dataLen = htonl(memRowDataTLen(trow));
pBlk->schemaLen = 0; pBlk->schemaLen = 0;
pBlk->uid = htobe64(pObj->uid); pBlk->uid = htobe64(pObj->uid);
...@@ -513,7 +513,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -513,7 +513,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
pBlk->sversion = htonl(pSchema->version); pBlk->sversion = htonl(pSchema->version);
pBlk->padding = 0; pBlk->padding = 0;
pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + memRowTLen(trow); pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + memRowDataTLen(trow);
pMsg->header.vgId = htonl(pContext->vgId); pMsg->header.vgId = htonl(pContext->vgId);
pMsg->header.contLen = htonl(pHead->len); pMsg->header.contLen = htonl(pHead->len);
......
...@@ -321,8 +321,8 @@ do { \ ...@@ -321,8 +321,8 @@ do { \
#define TSDB_MAX_JOIN_TABLE_NUM 10 #define TSDB_MAX_JOIN_TABLE_NUM 10
#define TSDB_MAX_UNION_CLAUSE 5 #define TSDB_MAX_UNION_CLAUSE 5
#define TSDB_MAX_BINARY_LEN (16384-TSDB_KEYSIZE) // TD-4666: TODO use TSDB_MAX_BYTES_PER_ROW later. #define TSDB_MAX_BINARY_LEN (16384-TSDB_KEYSIZE) // keep 16384
#define TSDB_MAX_NCHAR_LEN (16384-TSDB_KEYSIZE) // TD-4666: TODO use TSDB_MAX_BYTES_PER_ROW later. #define TSDB_MAX_NCHAR_LEN (16384-TSDB_KEYSIZE) // keep 16384
#define PRIMARYKEY_TIMESTAMP_COL_INDEX 0 #define PRIMARYKEY_TIMESTAMP_COL_INDEX 0
#define TSDB_MAX_RPC_THREADS 5 #define TSDB_MAX_RPC_THREADS 5
......
...@@ -12,6 +12,7 @@ extern "C" { ...@@ -12,6 +12,7 @@ extern "C" {
// ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR // ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR
typedef int32_t VarDataOffsetT; typedef int32_t VarDataOffsetT;
typedef int16_t VarDataLenT; // maxVarDataLen: 32767 typedef int16_t VarDataLenT; // maxVarDataLen: 32767
typedef uint16_t TDRowLenT;
typedef struct tstr { typedef struct tstr {
VarDataLenT len; VarDataLenT len;
......
...@@ -920,21 +920,21 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo ...@@ -920,21 +920,21 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo
SDataCol * pDataCol = pDataCols->cols + ncol; SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull; SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull;
if (isAllRowOfColNull(pDataCol)) { // all data to commit are NULL, just ignore it if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it
continue; continue;
} }
memset(pBlockCol, 0, sizeof(*pBlockCol)); memset(pBlockCol, 0, sizeof(*pBlockCol));
pBlockCol->colId = pDataCol->colId; pBlockCol->colId = pDataCol->colId;
pBlockCol->type = pDataCol->type; pBlockCol->type = pDataCol->type;
if (tDataTypes[pDataCol->type].statisFunc) { if (tDataTypes[pDataCol->type].statisFunc) {
(*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max),
&(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex),
&(pBlockCol->numOfNull)); &(pBlockCol->numOfNull));
}
nColsNotAllNull++;
} }
nColsNotAllNull++;
}
ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols);
......
...@@ -663,7 +663,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -663,7 +663,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
} }
memRowSetType(row, SMEM_ROW_DATA); memRowSetType(row, SMEM_ROW_DATA);
tdInitDataRow(memRowBody(row), pSchema); tdInitDataRow(memRowDataBody(row), pSchema);
// first load block index info // first load block index info
if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { if (tsdbLoadBlockInfo(pReadh, NULL) < 0) {
...@@ -720,9 +720,9 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -720,9 +720,9 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
// OK,let's load row from backward to get not-null column // OK,let's load row from backward to get not-null column
for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) { for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) {
SDataCol *pDataCol = pReadh->pDCols[0]->cols + i; SDataCol *pDataCol = pReadh->pDCols[0]->cols + i;
tdAppendColVal(memRowBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); tdAppendColVal(memRowDataBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
//SDataCol *pDataCol = readh.pDCols[0]->cols + j; //SDataCol *pDataCol = readh.pDCols[0]->cols + j;
void *value = tdGetRowDataOfCol(memRowBody(row), (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); void *value = tdGetRowDataOfCol(memRowDataBody(row), (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset);
if (isNull(value, pCol->type)) { if (isNull(value, pCol->type)) {
continue; continue;
} }
...@@ -742,7 +742,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -742,7 +742,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
// save row ts(in column 0) // save row ts(in column 0)
pDataCol = pReadh->pDCols[0]->cols + 0; pDataCol = pReadh->pDCols[0]->cols + 0;
pCol = schemaColAt(pSchema, 0); pCol = schemaColAt(pSchema, 0);
tdAppendColVal(memRowBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); tdAppendColVal(memRowDataBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
pLastCol->ts = memRowKey(row); pLastCol->ts = memRowKey(row);
pTable->restoreColumnNum += 1; pTable->restoreColumnNum += 1;
...@@ -785,11 +785,11 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, ...@@ -785,11 +785,11 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh,
return -1; return -1;
} }
memRowSetType(pTable->lastRow, SMEM_ROW_DATA); memRowSetType(pTable->lastRow, SMEM_ROW_DATA);
tdInitDataRow(memRowBody(pTable->lastRow), pSchema); tdInitDataRow(memRowDataBody(pTable->lastRow), pSchema);
for (int icol = 0; icol < schemaNCols(pSchema); icol++) { for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
STColumn *pCol = schemaColAt(pSchema, icol); STColumn *pCol = schemaColAt(pSchema, icol);
SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol; SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol;
tdAppendColVal(memRowBody(pTable->lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, tdAppendColVal(memRowDataBody(pTable->lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type,
pCol->bytes, pCol->offset); pCol->bytes, pCol->offset);
} }
......
...@@ -932,13 +932,15 @@ static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **ro ...@@ -932,13 +932,15 @@ static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **ro
int64_t osize = SL_SIZE(pTableData->pData); int64_t osize = SL_SIZE(pTableData->pData);
tSkipListPutBatch(pTableData->pData, rows, rowCounter); tSkipListPutBatch(pTableData->pData, rows, rowCounter);
int64_t dsize = SL_SIZE(pTableData->pData) - osize; int64_t dsize = SL_SIZE(pTableData->pData) - osize;
TSKEY keyFirstRow = memRowKey(rows[0]);
TSKEY keyLastRow = memRowKey(rows[rowCounter - 1]);
if (pMemTable->keyFirst > memRowKey(rows[0])) pMemTable->keyFirst = memRowKey(rows[0]); if (pMemTable->keyFirst > keyFirstRow) pMemTable->keyFirst = keyFirstRow;
if (pMemTable->keyLast < memRowKey(rows[rowCounter - 1])) pMemTable->keyLast = memRowKey(rows[rowCounter - 1]); if (pMemTable->keyLast < keyLastRow) pMemTable->keyLast = keyLastRow;
pMemTable->numOfRows += dsize; pMemTable->numOfRows += dsize;
if (pTableData->keyFirst > memRowKey(rows[0])) pTableData->keyFirst = memRowKey(rows[0]); if (pTableData->keyFirst > keyFirstRow) pTableData->keyFirst = keyFirstRow;
if (pTableData->keyLast < memRowKey(rows[rowCounter - 1])) pTableData->keyLast = memRowKey(rows[rowCounter - 1]); if (pTableData->keyLast < keyLastRow) pTableData->keyLast = keyLastRow;
pTableData->numOfRows += dsize; pTableData->numOfRows += dsize;
// update table latest info // update table latest info
...@@ -1004,8 +1006,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro ...@@ -1004,8 +1006,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
SDataCol *pLatestCols = pTable->lastCols; SDataCol *pLatestCols = pTable->lastCols;
bool isDataRow = isDataRow(row); bool isDataRow = isDataRow(row);
void *rowBody = memRowBody(row);
for (int16_t j = 0; j < schemaNCols(pSchema); j++) { for (int16_t j = 0; j < schemaNCols(pSchema); j++) {
STColumn *pTCol = schemaColAt(pSchema, j); STColumn *pTCol = schemaColAt(pSchema, j);
// ignore not exist colId // ignore not exist colId
...@@ -1017,12 +1018,13 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro ...@@ -1017,12 +1018,13 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
void *value = NULL; void *value = NULL;
if (isDataRow) { if (isDataRow) {
value = tdGetRowDataOfCol(rowBody, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); value = tdGetRowDataOfCol(memRowDataBody(row), (int8_t)pTCol->type,
TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
} else { } else {
// SKVRow // SKVRow
SColIdx *pColIdx = tdGetKVRowIdxOfCol(rowBody, pTCol->colId); SColIdx *pColIdx = tdGetKVRowIdxOfCol(memRowKvBody(row), pTCol->colId);
if (pColIdx) { if (pColIdx) {
value = tdGetKvRowDataOfCol(rowBody, pColIdx->offset); value = tdGetKvRowDataOfCol(memRowKvBody(row), pColIdx->offset);
} }
} }
......
...@@ -1392,7 +1392,7 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity ...@@ -1392,7 +1392,7 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity
// todo refactor, only copy one-by-one // todo refactor, only copy one-by-one
for (int32_t k = start; k < num + start; ++k) { for (int32_t k = start; k < num + start; ++k) {
const char* p = tdGetColDataOfRow(src, k); char* p = tdGetColDataOfRow(src, k);
memcpy(dst, p, varDataTLen(p)); memcpy(dst, p, varDataTLen(p));
dst += bytes; dst += bytes;
} }
...@@ -1459,7 +1459,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, ...@@ -1459,7 +1459,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
int32_t i = 0; int32_t i = 0;
if (isDataRow(row)) { if (isDataRow(row)) {
SDataRow dataRow = memRowBody(row); SDataRow dataRow = memRowDataBody(row);
int32_t j = 0; int32_t j = 0;
while (i < numOfCols && j < numOfRowCols) { while (i < numOfCols && j < numOfRowCols) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
...@@ -1529,7 +1529,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, ...@@ -1529,7 +1529,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
} }
} }
} else if (isKvRow(row)) { } else if (isKvRow(row)) {
SKVRow kvRow = memRowBody(row); SKVRow kvRow = memRowKvBody(row);
int32_t k = 0; int32_t k = 0;
int32_t nKvRowCols = kvRowNCols(kvRow); int32_t nKvRowCols = kvRowNCols(kvRow);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册