未验证 提交 ba621647 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18396 from taosdata/enh/row_optimize2

enh: row optimize2
...@@ -27,17 +27,17 @@ ...@@ -27,17 +27,17 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SBuffer SBuffer; typedef struct SBuffer SBuffer;
typedef struct SSchema SSchema; typedef struct SSchema SSchema;
typedef struct STColumn STColumn; typedef struct STColumn STColumn;
typedef struct STSchema STSchema; typedef struct STSchema STSchema;
typedef struct SValue SValue; typedef struct SValue SValue;
typedef struct SColVal SColVal; typedef struct SColVal SColVal;
typedef struct STSRow2 STSRow2; typedef struct SRow SRow;
typedef struct STSRowBuilder STSRowBuilder; typedef struct SRowIter SRowIter;
typedef struct STagVal STagVal; typedef struct STagVal STagVal;
typedef struct STag STag; typedef struct STag STag;
typedef struct SColData SColData; typedef struct SColData SColData;
#define HAS_NONE ((uint8_t)0x1) #define HAS_NONE ((uint8_t)0x1)
#define HAS_NULL ((uint8_t)0x2) #define HAS_NULL ((uint8_t)0x2)
...@@ -68,13 +68,10 @@ struct SBuffer { ...@@ -68,13 +68,10 @@ struct SBuffer {
void tBufferDestroy(SBuffer *pBuffer); void tBufferDestroy(SBuffer *pBuffer);
int32_t tBufferInit(SBuffer *pBuffer, int64_t size); int32_t tBufferInit(SBuffer *pBuffer, int64_t size);
int32_t tBufferPut(SBuffer *pBuffer, const void *pData, int64_t nData); int32_t tBufferPut(SBuffer *pBuffer, const void *pData, int64_t nData);
int32_t tBufferReserve(SBuffer *pBuffer, int64_t nData, void **ppData);
// STSchema ================================ // STSchema ================================
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t nCols, STSchema **ppTSchema); void tDestroyTSchema(STSchema *pTSchema);
void tTSchemaDestroy(STSchema *pTSchema);
// SValue ================================
static FORCE_INLINE int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type);
// SColVal ================================ // SColVal ================================
#define CV_FLAG_VALUE ((int8_t)0x0) #define CV_FLAG_VALUE ((int8_t)0x0)
...@@ -89,26 +86,14 @@ static FORCE_INLINE int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type); ...@@ -89,26 +86,14 @@ static FORCE_INLINE int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type);
#define COL_VAL_IS_NULL(CV) ((CV)->flag == CV_FLAG_NULL) #define COL_VAL_IS_NULL(CV) ((CV)->flag == CV_FLAG_NULL)
#define COL_VAL_IS_VALUE(CV) ((CV)->flag == CV_FLAG_VALUE) #define COL_VAL_IS_VALUE(CV) ((CV)->flag == CV_FLAG_VALUE)
// STSRow2 ================================ // SRow ================================
#define TSROW_LEN(PROW, V) tGetI32v((uint8_t *)(PROW)->data, (V) ? &(V) : NULL) int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SBuffer *pBuffer);
#define TSROW_SVER(PROW, V) tGetI32v((PROW)->data + TSROW_LEN(PROW, NULL), (V) ? &(V) : NULL) void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow); // SRowIter ================================
int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow); int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter);
void tTSRowFree(STSRow2 *pRow); void tRowIterClose(SRowIter **ppIter);
void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); SColVal *tRowIterNext(SRowIter *pIter);
int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray);
int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow);
int32_t tGetTSRow(uint8_t *p, STSRow2 **ppRow);
// STSRowBuilder ================================
#define tsRowBuilderInit() ((STSRowBuilder){0})
#define tsRowBuilderClear(B) \
do { \
if ((B)->pBuf) { \
taosMemoryFree((B)->pBuf); \
} \
} while (0)
// STag ================================ // STag ================================
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag); int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);
...@@ -147,29 +132,17 @@ struct STSchema { ...@@ -147,29 +132,17 @@ struct STSchema {
int32_t numOfCols; int32_t numOfCols;
int32_t version; int32_t version;
int32_t flen; int32_t flen;
int32_t vlen;
int32_t tlen; int32_t tlen;
STColumn columns[]; STColumn columns[];
}; };
#define TSROW_HAS_NONE ((uint8_t)0x1) struct SRow {
#define TSROW_HAS_NULL ((uint8_t)0x2U) uint8_t flag;
#define TSROW_HAS_VAL ((uint8_t)0x4U) uint8_t rsv;
#define TSROW_KV_SMALL ((uint8_t)0x10U) uint16_t sver;
#define TSROW_KV_MID ((uint8_t)0x20U) uint32_t len;
#define TSROW_KV_BIG ((uint8_t)0x40U) TSKEY ts;
#pragma pack(push, 1) uint8_t data[];
struct STSRow2 {
TSKEY ts;
uint8_t flags;
uint8_t data[];
};
#pragma pack(pop)
struct STSRowBuilder {
// STSRow2 tsRow;
int32_t szBuf;
uint8_t *pBuf;
}; };
struct SValue { struct SValue {
...@@ -258,37 +231,17 @@ typedef struct { ...@@ -258,37 +231,17 @@ typedef struct {
int32_t nCols; int32_t nCols;
schema_ver_t version; schema_ver_t version;
uint16_t flen; uint16_t flen;
int32_t vlen;
int32_t tlen; int32_t tlen;
STColumn *columns; STColumn *columns;
} STSchemaBuilder; } STSchemaBuilder;
// use 2 bits for bitmap(default: STSRow/sub block)
#define TD_VTYPE_BITS 2
#define TD_VTYPE_PARTS 4 // PARTITIONS: 1 byte / 2 bits
#define TD_VTYPE_OPTR 3 // OPERATOR: 4 - 1, utilize to get remainder
#define TD_BITMAP_BYTES(cnt) (((cnt) + TD_VTYPE_OPTR) >> 2)
// use 1 bit for bitmap(super block)
#define TD_VTYPE_BITS_I 1
#define TD_VTYPE_PARTS_I 8 // PARTITIONS: 1 byte / 1 bit
#define TD_VTYPE_OPTR_I 7 // OPERATOR: 8 - 1, utilize to get remainder
#define TD_BITMAP_BYTES_I(cnt) (((cnt) + TD_VTYPE_OPTR_I) >> 3)
int32_t tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version); int32_t tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version);
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder); void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder);
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version); void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version);
int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t flags, col_id_t colId, col_bytes_t bytes); int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t flags, col_id_t colId, col_bytes_t bytes);
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder); STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
static FORCE_INLINE int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type) { STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version);
if (IS_VAR_DATA_TYPE(type)) {
return tGetBinary(p, &pValue->pData, pValue ? &pValue->nData : NULL);
} else {
memcpy(&pValue->val, p, tDataTypes[type].bytes);
return tDataTypes[type].bytes;
}
}
#endif #endif
......
...@@ -55,6 +55,14 @@ typedef struct STSRow { ...@@ -55,6 +55,14 @@ typedef struct STSRow {
#define TD_ROW_TP 0x0U // default #define TD_ROW_TP 0x0U // default
#define TD_ROW_KV 0x01U #define TD_ROW_KV 0x01U
#define TD_VTYPE_PARTS 4 // PARTITIONS: 1 byte / 2 bits
#define TD_VTYPE_OPTR 3 // OPERATOR: 4 - 1, utilize to get remainder
#define TD_BITMAP_BYTES(cnt) (((cnt) + TD_VTYPE_OPTR) >> 2)
#define TD_VTYPE_PARTS_I 8 // PARTITIONS: 1 byte / 1 bit
#define TD_VTYPE_OPTR_I 7 // OPERATOR: 8 - 1, utilize to get remainder
#define TD_BITMAP_BYTES_I(cnt) (((cnt) + TD_VTYPE_OPTR_I) >> 3)
/** /**
* @brief value type * @brief value type
* - for data from client input and STSRow in memory, 3 types of value none/null/norm available * - for data from client input and STSRow in memory, 3 types of value none/null/norm available
...@@ -244,7 +252,7 @@ int32_t tdGetBitmapValTypeI(const void *pBitmap, int16_t colIdx, TDRowValT *pVal ...@@ -244,7 +252,7 @@ int32_t tdGetBitmapValTypeI(const void *pBitmap, int16_t colIdx, TDRowValT *pVal
*/ */
static FORCE_INLINE void *tdGetBitmapAddrTp(STSRow *pRow, uint32_t flen) { static FORCE_INLINE void *tdGetBitmapAddrTp(STSRow *pRow, uint32_t flen) {
// The primary TS key is stored separatedly. // The primary TS key is stored separatedly.
return POINTER_SHIFT(TD_ROW_DATA(pRow), flen - sizeof(TSKEY)); return POINTER_SHIFT(TD_ROW_DATA(pRow), flen);
// return POINTER_SHIFT(pRow->ts, flen); // return POINTER_SHIFT(pRow->ts, flen);
} }
......
...@@ -1275,6 +1275,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) ...@@ -1275,6 +1275,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
nVar++; nVar++;
} }
} }
fLen -= sizeof(TSKEY);
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
(int32_t)TD_BITMAP_BYTES(numOfCols - 1); (int32_t)TD_BITMAP_BYTES(numOfCols - 1);
...@@ -1333,7 +1334,9 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) ...@@ -1333,7 +1334,9 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
} }
} }
offset += TYPE_BYTES[pColumn->type]; if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
offset += TYPE_BYTES[pColumn->type];
}
} }
tdSRowEnd(&rb); tdSRowEnd(&rb);
int32_t rowLen = TD_ROW_LEN(rowData); int32_t rowLen = TD_ROW_LEN(rowData);
...@@ -1503,6 +1506,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { ...@@ -1503,6 +1506,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
nVar++; nVar++;
} }
} }
fLen -= sizeof(TSKEY);
int32_t rows = rspObj.resInfo.numOfRows; int32_t rows = rspObj.resInfo.numOfRows;
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
...@@ -1585,8 +1589,9 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { ...@@ -1585,8 +1589,9 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k); tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k);
} }
} }
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
offset += TYPE_BYTES[pColumn->type]; offset += TYPE_BYTES[pColumn->type];
}
} }
tdSRowEnd(&rb); tdSRowEnd(&rb);
int32_t rowLen = TD_ROW_LEN(rowData); int32_t rowLen = TD_ROW_LEN(rowData);
...@@ -1803,6 +1808,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1803,6 +1808,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
nVar++; nVar++;
} }
} }
fLen -= sizeof(TSKEY);
int32_t rows = rspObj.resInfo.numOfRows; int32_t rows = rspObj.resInfo.numOfRows;
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
...@@ -1888,8 +1894,9 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1888,8 +1894,9 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k); tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k);
} }
} }
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
offset += TYPE_BYTES[pColumn->type]; offset += TYPE_BYTES[pColumn->type];
}
} }
tdSRowEnd(&rb); tdSRowEnd(&rb);
int32_t rowLen = TD_ROW_LEN(rowData); int32_t rowLen = TD_ROW_LEN(rowData);
......
...@@ -2052,6 +2052,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB ...@@ -2052,6 +2052,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB
isStartKey = true; isStartKey = true;
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true,
offset, k); offset, k);
continue; // offset should keep 0 for next column
} else if (colDataIsNull_s(pColInfoData, j)) { } else if (colDataIsNull_s(pColInfoData, j)) {
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NULL, NULL, tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NULL, NULL,
......
此差异已折叠。
...@@ -192,7 +192,7 @@ bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t fl ...@@ -192,7 +192,7 @@ bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t fl
return true; return true;
} }
void *pBitmap = tdGetBitmapAddrTp(pRow, flen); void *pBitmap = tdGetBitmapAddrTp(pRow, flen);
tdGetTpRowValOfCol(pVal, pRow, pBitmap, colType, offset - sizeof(TSKEY), colIdx); tdGetTpRowValOfCol(pVal, pRow, pBitmap, colType, offset, colIdx);
return true; return true;
} }
...@@ -217,7 +217,7 @@ bool tdSTSRowIterFetch(STSRowIter *pIter, col_id_t colId, col_type_t colType, SC ...@@ -217,7 +217,7 @@ bool tdSTSRowIterFetch(STSRowIter *pIter, col_id_t colId, col_type_t colType, SC
return false; return false;
} }
} }
tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset - sizeof(TSKEY), pVal); tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset, pVal);
++pIter->colIdx; ++pIter->colIdx;
} else if (TD_IS_KV_ROW(pIter->pRow)) { } else if (TD_IS_KV_ROW(pIter->pRow)) {
return tdSTSRowIterGetKvVal(pIter, colId, &pIter->kvIdx, pVal); return tdSTSRowIterGetKvVal(pIter, colId, &pIter->kvIdx, pVal);
...@@ -244,7 +244,7 @@ bool tdSTSRowIterNext(STSRowIter *pIter, SCellVal *pVal) { ...@@ -244,7 +244,7 @@ bool tdSTSRowIterNext(STSRowIter *pIter, SCellVal *pVal) {
} }
if (TD_IS_TP_ROW(pIter->pRow)) { if (TD_IS_TP_ROW(pIter->pRow)) {
tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset - sizeof(TSKEY), pVal); tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset, pVal);
} else if (TD_IS_KV_ROW(pIter->pRow)) { } else if (TD_IS_KV_ROW(pIter->pRow)) {
tdSTSRowIterGetKvVal(pIter, pCol->colId, &pIter->kvIdx, pVal); tdSTSRowIterGetKvVal(pIter, pCol->colId, &pIter->kvIdx, pVal);
} else { } else {
...@@ -469,7 +469,7 @@ bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCell ...@@ -469,7 +469,7 @@ bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCell
#ifdef TD_SUPPORT_BITMAP #ifdef TD_SUPPORT_BITMAP
colIdx = POINTER_DISTANCE(pCol, pSchema->columns) / sizeof(STColumn); colIdx = POINTER_DISTANCE(pCol, pSchema->columns) / sizeof(STColumn);
#endif #endif
tdGetTpRowValOfCol(pVal, pRow, pIter->pBitmap, pCol->type, pCol->offset - sizeof(TSKEY), colIdx - 1); tdGetTpRowValOfCol(pVal, pRow, pIter->pBitmap, pCol->type, pCol->offset, colIdx - 1);
} else if (TD_IS_KV_ROW(pRow)) { } else if (TD_IS_KV_ROW(pRow)) {
SKvRowIdx *pIdx = (SKvRowIdx *)taosbsearch(&colId, TD_ROW_COL_IDX(pRow), tdRowGetNCols(pRow), sizeof(SKvRowIdx), SKvRowIdx *pIdx = (SKvRowIdx *)taosbsearch(&colId, TD_ROW_COL_IDX(pRow), tdRowGetNCols(pRow), sizeof(SKvRowIdx),
compareKvRowColId, TD_EQ); compareKvRowColId, TD_EQ);
...@@ -757,11 +757,10 @@ int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowValT valType, const vo ...@@ -757,11 +757,10 @@ int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowValT valType, const vo
int32_t tdAppendColValToTpRow(SRowBuilder *pBuilder, TDRowValT valType, const void *val, bool isCopyVarData, int32_t tdAppendColValToTpRow(SRowBuilder *pBuilder, TDRowValT valType, const void *val, bool isCopyVarData,
int8_t colType, int16_t colIdx, int32_t offset) { int8_t colType, int16_t colIdx, int32_t offset) {
if ((offset < (int32_t)sizeof(TSKEY)) || (colIdx < 1)) { if (colIdx < 1) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
offset -= sizeof(TSKEY);
--colIdx; --colIdx;
#ifdef TD_SUPPORT_BITMAP #ifdef TD_SUPPORT_BITMAP
...@@ -853,7 +852,7 @@ int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) { ...@@ -853,7 +852,7 @@ int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
memset(pBuilder->pBitmap, TD_VTYPE_NONE_BYTE_II, pBuilder->nBitmaps); memset(pBuilder->pBitmap, TD_VTYPE_NONE_BYTE_II, pBuilder->nBitmaps);
#endif #endif
// the primary TS key is stored separatedly // the primary TS key is stored separatedly
len = TD_ROW_HEAD_LEN + pBuilder->flen - sizeof(TSKEY) + pBuilder->nBitmaps; len = TD_ROW_HEAD_LEN + pBuilder->flen + pBuilder->nBitmaps;
TD_ROW_SET_LEN(pBuilder->pBuf, len); TD_ROW_SET_LEN(pBuilder->pBuf, len);
TD_ROW_SET_SVER(pBuilder->pBuf, pBuilder->sver); TD_ROW_SET_SVER(pBuilder->pBuf, pBuilder->sver);
break; break;
......
...@@ -61,7 +61,7 @@ tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX] = { ...@@ -61,7 +61,7 @@ tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX] = {
static float floatMin = -FLT_MAX, floatMax = FLT_MAX; static float floatMin = -FLT_MAX, floatMax = FLT_MAX;
static double doubleMin = -DBL_MAX, doubleMax = DBL_MAX; static double doubleMin = -DBL_MAX, doubleMax = DBL_MAX;
FORCE_INLINE void *getDataMin(int32_t type, void* value) { FORCE_INLINE void *getDataMin(int32_t type, void *value) {
switch (type) { switch (type) {
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
*(float *)value = floatMin; *(float *)value = floatMin;
...@@ -77,7 +77,7 @@ FORCE_INLINE void *getDataMin(int32_t type, void* value) { ...@@ -77,7 +77,7 @@ FORCE_INLINE void *getDataMin(int32_t type, void* value) {
return value; return value;
} }
FORCE_INLINE void *getDataMax(int32_t type, void* value) { FORCE_INLINE void *getDataMax(int32_t type, void *value) {
switch (type) { switch (type) {
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
*(float *)value = floatMax; *(float *)value = floatMax;
......
...@@ -56,7 +56,7 @@ typedef struct SDataFWriter SDataFWriter; ...@@ -56,7 +56,7 @@ typedef struct SDataFWriter SDataFWriter;
typedef struct SDataFReader SDataFReader; typedef struct SDataFReader SDataFReader;
typedef struct SDelFWriter SDelFWriter; typedef struct SDelFWriter SDelFWriter;
typedef struct SDelFReader SDelFReader; typedef struct SDelFReader SDelFReader;
typedef struct SRowIter SRowIter; typedef struct STSDBRowIter STSDBRowIter;
typedef struct STsdbFS STsdbFS; typedef struct STsdbFS STsdbFS;
typedef struct SRowMerger SRowMerger; typedef struct SRowMerger SRowMerger;
typedef struct STsdbReadSnap STsdbReadSnap; typedef struct STsdbReadSnap STsdbReadSnap;
...@@ -111,9 +111,9 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) { ...@@ -111,9 +111,9 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) {
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
// int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); // int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
int32_t tsdbRowCmprFn(const void *p1, const void *p2); int32_t tsdbRowCmprFn(const void *p1, const void *p2);
// SRowIter // STSDBRowIter
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); void tsdbRowIterInit(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
SColVal *tRowIterNext(SRowIter *pIter); SColVal *tsdbRowIterNext(STSDBRowIter *pIter);
// SRowMerger // SRowMerger
int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema); int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
...@@ -562,7 +562,7 @@ struct SDFileSet { ...@@ -562,7 +562,7 @@ struct SDFileSet {
SSttFile *aSttF[TSDB_MAX_STT_TRIGGER]; SSttFile *aSttF[TSDB_MAX_STT_TRIGGER];
}; };
struct SRowIter { struct STSDBRowIter {
TSDBROW *pRow; TSDBROW *pRow;
STSchema *pTSchema; STSchema *pTSchema;
SColVal colVal; SColVal colVal;
......
...@@ -341,7 +341,7 @@ int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo ...@@ -341,7 +341,7 @@ int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo
pSkmInfo->suid = suid; pSkmInfo->suid = suid;
pSkmInfo->uid = uid; pSkmInfo->uid = uid;
tTSchemaDestroy(pSkmInfo->pTSchema); tDestroyTSchema(pSkmInfo->pTSchema);
code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema); code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -365,7 +365,7 @@ static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid ...@@ -365,7 +365,7 @@ static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid
pCommitter->skmRow.suid = suid; pCommitter->skmRow.suid = suid;
pCommitter->skmRow.uid = uid; pCommitter->skmRow.uid = uid;
tTSchemaDestroy(pCommitter->skmRow.pTSchema); tDestroyTSchema(pCommitter->skmRow.pTSchema);
code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema); code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -498,7 +498,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -498,7 +498,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
#if 0 #if 0
ASSERT(pCommitter->minKey <= pCommitter->nextKey && pCommitter->maxKey >= pCommitter->nextKey); ASSERT(pCommitter->minKey <= pCommitter->nextKey && pCommitter->maxKey >= pCommitter->nextKey);
#endif #endif
pCommitter->nextKey = TSKEY_MAX; pCommitter->nextKey = TSKEY_MAX;
// Reader // Reader
...@@ -623,7 +623,8 @@ int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapDa ...@@ -623,7 +623,8 @@ int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapDa
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
tstrerror(code));
} }
return code; return code;
} }
...@@ -666,7 +667,8 @@ int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray ...@@ -666,7 +667,8 @@ int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
tstrerror(code));
} }
return code; return code;
} }
...@@ -706,7 +708,8 @@ static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilde ...@@ -706,7 +708,8 @@ static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilde
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
tstrerror(code));
} }
return code; return code;
} }
...@@ -919,8 +922,8 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) { ...@@ -919,8 +922,8 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) {
#else #else
tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1); tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1);
#endif #endif
tTSchemaDestroy(pCommitter->skmTable.pTSchema); tDestroyTSchema(pCommitter->skmTable.pTSchema);
tTSchemaDestroy(pCommitter->skmRow.pTSchema); tDestroyTSchema(pCommitter->skmRow.pTSchema);
} }
static int32_t tsdbCommitData(SCommitter *pCommitter) { static int32_t tsdbCommitData(SCommitter *pCommitter) {
......
...@@ -595,21 +595,21 @@ int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTS ...@@ -595,21 +595,21 @@ int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTS
if (pBuilder->bi.minKey > kRow.ts) pBuilder->bi.minKey = kRow.ts; if (pBuilder->bi.minKey > kRow.ts) pBuilder->bi.minKey = kRow.ts;
if (pBuilder->bi.maxKey < kRow.ts) pBuilder->bi.maxKey = kRow.ts; if (pBuilder->bi.maxKey < kRow.ts) pBuilder->bi.maxKey = kRow.ts;
SRowIter iter = {0}; STSDBRowIter iter = {0};
tRowIterInit(&iter, pRow, pTSchema); tsdbRowIterInit(&iter, pRow, pTSchema);
SColVal *pColVal = tRowIterNext(&iter); SColVal *pColVal = tsdbRowIterNext(&iter);
for (int32_t iBuilder = 0; iBuilder < pBuilder->nBuilder; iBuilder++) { for (int32_t iBuilder = 0; iBuilder < pBuilder->nBuilder; iBuilder++) {
SDiskColBuilder *pDCBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iBuilder); SDiskColBuilder *pDCBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iBuilder);
while (pColVal && pColVal->cid < pDCBuilder->cid) { while (pColVal && pColVal->cid < pDCBuilder->cid) {
pColVal = tRowIterNext(&iter); pColVal = tsdbRowIterNext(&iter);
} }
if (pColVal && pColVal->cid == pDCBuilder->cid) { if (pColVal && pColVal->cid == pDCBuilder->cid) {
code = tDiskColAddVal(pDCBuilder, pColVal); code = tDiskColAddVal(pDCBuilder, pColVal);
if (code) return code; if (code) return code;
pColVal = tRowIterNext(&iter); pColVal = tsdbRowIterNext(&iter);
} else { } else {
code = tDiskColAddVal(pDCBuilder, &COL_VAL_NONE(pDCBuilder->cid, pDCBuilder->type)); code = tDiskColAddVal(pDCBuilder, &COL_VAL_NONE(pDCBuilder->cid, pDCBuilder->type));
if (code) return code; if (code) return code;
......
...@@ -555,7 +555,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { ...@@ -555,7 +555,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
} }
tBlockDataDestroy(&pReader->bData, 1); tBlockDataDestroy(&pReader->bData, 1);
tTSchemaDestroy(pReader->skmTable.pTSchema); tDestroyTSchema(pReader->skmTable.pTSchema);
// del // del
if (pReader->pDelFReader) tsdbDelFReaderClose(&pReader->pDelFReader); if (pReader->pDelFReader) tsdbDelFReaderClose(&pReader->pDelFReader);
...@@ -1416,7 +1416,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { ...@@ -1416,7 +1416,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
taosArrayDestroy(pWriter->dReader.aBlockIdx); taosArrayDestroy(pWriter->dReader.aBlockIdx);
tBlockDataDestroy(&pWriter->bData, 1); tBlockDataDestroy(&pWriter->bData, 1);
tTSchemaDestroy(pWriter->skmTable.pTSchema); tDestroyTSchema(pWriter->skmTable.pTSchema);
for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) { for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) {
tFree(pWriter->aBuf[iBuf]); tFree(pWriter->aBuf[iBuf]);
......
...@@ -579,8 +579,8 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2) { ...@@ -579,8 +579,8 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2) {
return tsdbKeyCmprFn(&TSDBROW_KEY((TSDBROW *)p1), &TSDBROW_KEY((TSDBROW *)p2)); return tsdbKeyCmprFn(&TSDBROW_KEY((TSDBROW *)p1), &TSDBROW_KEY((TSDBROW *)p2));
} }
// SRowIter ====================================================== // STSDBRowIter ======================================================
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) { void tsdbRowIterInit(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) {
pIter->pRow = pRow; pIter->pRow = pRow;
if (pRow->type == 0) { if (pRow->type == 0) {
ASSERT(pTSchema); ASSERT(pTSchema);
...@@ -594,7 +594,7 @@ void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) { ...@@ -594,7 +594,7 @@ void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) {
} }
} }
SColVal *tRowIterNext(SRowIter *pIter) { SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
if (pIter->pRow->type == 0) { if (pIter->pRow->type == 0) {
if (pIter->i < pIter->pTSchema->numOfCols) { if (pIter->i < pIter->pTSchema->numOfCols) {
tTSRowGetVal(pIter->pRow->pTSRow, pIter->pTSchema, pIter->i, &pIter->colVal); tTSRowGetVal(pIter->pRow->pTSRow, pIter->pTSchema, pIter->i, &pIter->colVal);
...@@ -1084,11 +1084,11 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch ...@@ -1084,11 +1084,11 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch
cv.flag = CV_FLAG_VALUE; cv.flag = CV_FLAG_VALUE;
if (IS_VAR_DATA_TYPE(pTColumn->type)) { if (IS_VAR_DATA_TYPE(pTColumn->type)) {
void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset);
cv.value.nData = varDataLen(pData); cv.value.nData = varDataLen(pData);
cv.value.pData = varDataVal(pData); cv.value.pData = varDataVal(pData);
} else { } else {
memcpy(&cv.value.val, pRow->data + pTColumn->offset - sizeof(TSKEY), pTColumn->bytes); memcpy(&cv.value.val, pRow->data + pTColumn->offset, pTColumn->bytes);
} }
code = tColDataAppendValue(pColData, &cv); code = tColDataAppendValue(pColData, &cv);
...@@ -1106,11 +1106,11 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch ...@@ -1106,11 +1106,11 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch
cv.flag = CV_FLAG_VALUE; cv.flag = CV_FLAG_VALUE;
if (IS_VAR_DATA_TYPE(pTColumn->type)) { if (IS_VAR_DATA_TYPE(pTColumn->type)) {
void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset);
cv.value.nData = varDataLen(pData); cv.value.nData = varDataLen(pData);
cv.value.pData = varDataVal(pData); cv.value.pData = varDataVal(pData);
} else { } else {
memcpy(&cv.value.val, pRow->data + pTColumn->offset - sizeof(TSKEY), pTColumn->bytes); memcpy(&cv.value.val, pRow->data + pTColumn->offset, pTColumn->bytes);
} }
code = tColDataAppendValue(pColData, &cv); code = tColDataAppendValue(pColData, &cv);
......
...@@ -139,8 +139,8 @@ void insSetBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_i ...@@ -139,8 +139,8 @@ void insSetBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_i
if (i > 0) { if (i > 0) {
pColList->cols[i].offset = pColList->cols[i - 1].offset + pSchema[i - 1].bytes; pColList->cols[i].offset = pColList->cols[i - 1].offset + pSchema[i - 1].bytes;
pColList->cols[i].toffset = pColList->flen; pColList->cols[i].toffset = pColList->flen;
pColList->flen += TYPE_BYTES[type];
} }
pColList->flen += TYPE_BYTES[type];
switch (type) { switch (type) {
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
pColList->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES); pColList->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册