提交 3d42cd54 编写于 作者: C Cary Xu

code optimization

上级 d71fece2
......@@ -49,9 +49,17 @@ extern "C" {
#define TD_VTYPE_NORM 0x02U // normal val: not none, not null
#define TD_VTYPE_MAX 0x03U //
#define TD_VTYPE_NORM_BYTE 0x0U
#define TD_VTYPE_NONE_BYTE 0x55U
#define TD_VTYPE_NULL_BYTE 0xAAU
#define TD_VTYPE_NONE_BYTE 0x0U
#define TD_VTYPE_NULL_BYTE 0x55U
#define TD_VTYPE_NORM_BYTE 0xAAU
#define TD_ROWS_ALL_NORM 0x01U
#define TD_ROWS_NULL_NORM 0x0U
#define TD_COL_ROWS_NORM(c) ((c)->bitmap == TD_ROWS_ALL_NORM) // all rows of SDataCol/SBlockCol is NORM
#define TD_SET_COL_ROWS_BTIMAP(c, v) ((c)->bitmap = (v))
#define TD_SET_COL_ROWS_NORM(c) TD_SET_COL_ROWS_BTIMAP((c), TD_ROWS_ALL_NORM)
#define TD_SET_COL_ROWS_MISC(c) TD_SET_COL_ROWS_BTIMAP((c), TD_ROWS_NULL_NORM)
#define KvConvertRatio (0.9f)
#define isSelectKVRow(klen, tlen) ((klen) < ((tlen)*KvConvertRatio))
......@@ -128,10 +136,8 @@ typedef struct {
};
/// row total length
uint32_t len;
/// nCols of SRow(only valid for K-V row)
uint64_t ncols : 16;
/// row version
uint64_t ver : 48;
uint64_t ver;
/// timestamp
TSKEY ts;
/// the inline data, maybe a tuple or a k-v tuple
......@@ -157,12 +163,13 @@ typedef struct {
} SRowBuilder;
#define TD_ROW_HEAD_LEN (sizeof(STSRow))
#define TD_ROW_NCOLS_LEN (sizeof(col_id_t))
#define TD_ROW_TYPE(r) ((r)->type)
#define TD_ROW_DELETE(r) ((r)->del)
#define TD_ROW_ENDIAN(r) ((r)->endian)
#define TD_ROW_SVER(r) ((r)->sver)
#define TD_ROW_NCOLS(r) ((r)->ncols)
#define TD_ROW_NCOLS(r) ((r)->data) // only valid for SKvRow
#define TD_ROW_DATA(r) ((r)->data)
#define TD_ROW_LEN(r) ((r)->len)
#define TD_ROW_KEY(r) ((r)->ts)
......@@ -176,6 +183,7 @@ typedef struct {
#define TD_ROW_SET_DELETE(r) (TD_ROW_DELETE(r) = 1)
#define TD_ROW_SET_SVER(r, v) (TD_ROW_SVER(r) = (v))
#define TD_ROW_SET_LEN(r, l) (TD_ROW_LEN(r) = (l))
#define TD_ROW_SET_NCOLS(r, n) (*(col_id_t *)TD_ROW_NCOLS(r) = (n))
#define TD_ROW_IS_DELETED(r) (TD_ROW_DELETE(r) == 1)
#define TD_IS_TP_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_TP)
......@@ -186,15 +194,20 @@ typedef struct {
#define TD_BOOL_STR(b) ((b) ? "true" : "false")
#define isUtilizeKVRow(k, d) ((k) < ((d)*KVRatioConvert))
#define TD_KV_ROW_COL_IDX(r) TD_ROW_DATA(r)
#define TD_ROW_COL_IDX(r) POINTER_SHIFT(TD_ROW_DATA(r), sizeof(col_id_t))
static FORCE_INLINE void tdRowSetVal(SCellVal *pVal, uint8_t valType, void *val) {
pVal->valType = valType;
pVal->val = val;
}
static FORCE_INLINE col_id_t tdRowGetNCols(STSRow *pRow) { return *(col_id_t *)TD_ROW_NCOLS(pRow); }
static FORCE_INLINE void tdRowCpy(void *dst, const STSRow *pRow) { memcpy(dst, pRow, TD_ROW_LEN(pRow)); }
static FORCE_INLINE const char *tdRowEnd(STSRow *pRow) { return (const char *)POINTER_SHIFT(pRow, TD_ROW_LEN(pRow)); }
STSRow *tdRowDup(STSRow *row);
static FORCE_INLINE SKvRowIdx *tdKvRowColIdxAt(STSRow *pRow, uint16_t idx) {
return (SKvRowIdx *)TD_KV_ROW_COL_IDX(pRow) + idx;
static FORCE_INLINE SKvRowIdx *tdKvRowColIdxAt(STSRow *pRow, col_id_t idx) {
return (SKvRowIdx *)TD_ROW_COL_IDX(pRow) + idx;
}
static FORCE_INLINE void *tdKVRowColVal(STSRow *pRow, SKvRowIdx *pIdx) { return POINTER_SHIFT(pRow, pIdx->offset); }
......@@ -221,13 +234,13 @@ int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCol
*/
static FORCE_INLINE void *tdGetBitmapAddrTp(STSRow *pRow, uint32_t flen) {
// The primary TS key is stored separatedly.
return POINTER_SHIFT(pRow->data, flen - sizeof(TSKEY));
return POINTER_SHIFT(TD_ROW_DATA(pRow), flen - sizeof(TSKEY));
// return POINTER_SHIFT(pRow->ts, flen);
}
static FORCE_INLINE void *tdGetBitmapAddrKv(STSRow *pRow, col_id_t nKvCols) {
// The primary TS key is stored separatedly and is Norm value, thus should minus 1 firstly
return POINTER_SHIFT(pRow->data, (--nKvCols) * sizeof(SKvRowIdx));
return POINTER_SHIFT(TD_ROW_COL_IDX(pRow), (--nKvCols) * sizeof(SKvRowIdx));
}
static FORCE_INLINE void *tdGetBitmapAddr(STSRow *pRow, uint8_t rowType, uint32_t flen, col_id_t nKvCols) {
#ifdef TD_SUPPORT_BITMAP
......@@ -470,9 +483,11 @@ static int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
#ifdef TD_SUPPORT_BITMAP
pBuilder->pBitmap = tdGetBitmapAddrKv(pBuilder->pBuf, pBuilder->nBoundCols);
#endif
len = TD_ROW_HEAD_LEN + (pBuilder->nBoundCols - 1) * sizeof(SKvRowIdx) + pBuilder->nBoundBitmaps;
len = TD_ROW_HEAD_LEN + TD_ROW_NCOLS_LEN + (pBuilder->nBoundCols - 1) * sizeof(SKvRowIdx) +
pBuilder->nBoundBitmaps; // add
TD_ROW_SET_LEN(pBuilder->pBuf, len);
TD_ROW_SET_SVER(pBuilder->pBuf, pBuilder->sver);
TD_ROW_SET_NCOLS(pBuilder->pBuf, pBuilder->nBoundCols);
break;
default:
TASSERT(0);
......@@ -541,13 +556,13 @@ static FORCE_INLINE int32_t tdAppendColValToTpRow(SRowBuilder *pBuilder, TDRowVa
// TODO: The layout of new data types imported since 3.0 like blob/medium blob is the same with binary/nchar.
if (IS_VAR_DATA_TYPE(colType)) {
// ts key stored in STSRow.ts
*(VarDataOffsetT *)POINTER_SHIFT(row->data, offset) = TD_ROW_LEN(row);
*(VarDataOffsetT *)POINTER_SHIFT(TD_ROW_DATA(row), offset) = TD_ROW_LEN(row);
if (isCopyVarData) {
memcpy(POINTER_SHIFT(row, TD_ROW_LEN(row)), val, varDataTLen(val));
}
TD_ROW_LEN(row) += varDataTLen(val);
} else {
memcpy(POINTER_SHIFT(row->data, offset), val, TYPE_BYTES[colType]);
memcpy(POINTER_SHIFT(TD_ROW_DATA(row), offset), val, TYPE_BYTES[colType]);
}
}
#ifdef TD_SUPPORT_BACK2
......@@ -557,14 +572,14 @@ static FORCE_INLINE int32_t tdAppendColValToTpRow(SRowBuilder *pBuilder, TDRowVa
const void *nullVal = getNullValue(colType);
if (IS_VAR_DATA_TYPE(colType)) {
// ts key stored in STSRow.ts
*(VarDataOffsetT *)POINTER_SHIFT(row->data, offset) = TD_ROW_LEN(row);
*(VarDataOffsetT *)POINTER_SHIFT(TD_ROW_DATA(row), offset) = TD_ROW_LEN(row);
if (isCopyVarData) {
memcpy(POINTER_SHIFT(row, TD_ROW_LEN(row)), nullVal, varDataTLen(nullVal));
}
TD_ROW_LEN(row) += varDataTLen(nullVal);
} else {
memcpy(POINTER_SHIFT(row->data, offset), nullVal, TYPE_BYTES[colType]);
memcpy(POINTER_SHIFT(TD_ROW_DATA(row), offset), nullVal, TYPE_BYTES[colType]);
}
}
#endif
......@@ -594,7 +609,7 @@ static FORCE_INLINE int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowVa
// No need to store None/Null values.
if (tdValIsNorm(valType, val, colType)) {
// ts key stored in STSRow.ts
SKvRowIdx *pColIdx = (SKvRowIdx *)POINTER_SHIFT(row->data, offset);
SKvRowIdx *pColIdx = (SKvRowIdx *)POINTER_SHIFT(TD_ROW_COL_IDX(row), offset);
char * ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row));
pColIdx->colId = colId;
pColIdx->offset = TD_ROW_LEN(row); // the offset include the TD_ROW_HEAD_LEN
......@@ -612,7 +627,7 @@ static FORCE_INLINE int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowVa
#ifdef TD_SUPPORT_BACK2
// NULL/None value
else {
SKvRowIdx *pColIdx = (SKvRowIdx *)POINTER_SHIFT(row->data, offset);
SKvRowIdx *pColIdx = (SKvRowIdx *)POINTER_SHIFT(TD_ROW_COL_IDX(row), offset);
char * ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row));
pColIdx->colId = colId;
pColIdx->offset = TD_ROW_LEN(row); // the offset include the TD_ROW_HEAD_LEN
......@@ -692,16 +707,16 @@ static FORCE_INLINE int32_t tdGetTpRowValOfCol(SCellVal *output, STSRow *pRow, v
}
if (tdValTypeIsNorm(output->valType)) {
if (IS_VAR_DATA_TYPE(colType)) {
output->val = POINTER_SHIFT(pRow, *(VarDataOffsetT *)POINTER_SHIFT(pRow->data, offset));
output->val = POINTER_SHIFT(pRow, *(VarDataOffsetT *)POINTER_SHIFT(TD_ROW_DATA(pRow), offset));
} else {
output->val = POINTER_SHIFT(pRow->data, offset);
output->val = POINTER_SHIFT(TD_ROW_DATA(pRow), offset);
}
}
#else
if (IS_VAR_DATA_TYPE(colType)) {
output->val = POINTER_SHIFT(pRow, *(VarDataOffsetT *)POINTER_SHIFT(pRow->data, offset));
output->val = POINTER_SHIFT(pRow, *(VarDataOffsetT *)POINTER_SHIFT(TD_ROW_DATA(pRow), offset));
} else {
output->val = POINTER_SHIFT(pRow->data, offset);
output->val = POINTER_SHIFT(TD_ROW_DATA(pRow), offset);
}
output->valType = isNull(output->val, colType) ? TD_VTYPE_NULL : TD_VTYPE_NORM;
#endif
......@@ -718,10 +733,10 @@ static FORCE_INLINE int compareKvRowColId(const void *key1, const void *key2) {
}
}
// internal
static FORCE_INLINE int32_t tdGetKvRowValOfCol(SCellVal *output, STSRow *pRow, void *pBitmap, col_type_t colType,
int32_t offset, int16_t colIdx) {
static FORCE_INLINE int32_t tdGetKvRowValOfCol(SCellVal *output, STSRow *pRow, void *pBitmap, int32_t offset,
int16_t colIdx) {
#ifdef TD_SUPPORT_BITMAP
TASSERT(colIdx < TD_ROW_NCOLS(pRow) - 1);
TASSERT(colIdx < tdRowGetNCols(pRow) - 1);
if (tdGetBitmapValType(pBitmap, colIdx, &output->valType) != TSDB_CODE_SUCCESS) {
output->valType = TD_VTYPE_NONE;
return terrno;
......@@ -735,6 +750,7 @@ static FORCE_INLINE int32_t tdGetKvRowValOfCol(SCellVal *output, STSRow *pRow, v
output->val = POINTER_SHIFT(pRow, offset);
}
#else
TASSERT(0);
if (offset < 0) {
terrno = TSDB_CODE_INVALID_PARA;
output->valType = TD_VTYPE_NONE;
......@@ -746,15 +762,6 @@ static FORCE_INLINE int32_t tdGetKvRowValOfCol(SCellVal *output, STSRow *pRow, v
return TSDB_CODE_SUCCESS;
}
// static FORCE_INLINE int32_t tdGetSRowValOfCol(SCellVal *output, STSRow *row, void *pBitmap, int8_t colType,
// int32_t offset, uint16_t flen, int16_t colIdx) {
// if (TD_IS_TP_ROW(row)) {
// return tdGetTpRowValOfCol(output, row, pBitmap, colType, offset, colIdx);
// } else {
// return tdGetKvRowValOfCol(output, row, pBitmap, colType, offset, colIdx);
// }
// }
typedef struct {
STSchema *pSchema;
STSRow * pRow;
......@@ -767,7 +774,7 @@ typedef struct {
static FORCE_INLINE void tdSTSRowIterReset(STSRowIter *pIter, STSRow *pRow) {
pIter->pRow = pRow;
pIter->pBitmap = tdGetBitmapAddr(pRow, pRow->type, pIter->pSchema->flen, pRow->ncols);
pIter->pBitmap = tdGetBitmapAddr(pRow, pRow->type, pIter->pSchema->flen, tdRowGetNCols(pRow));
pIter->offset = 0;
pIter->colIdx = PRIMARYKEY_TIMESTAMP_COL_ID;
pIter->kvIdx = 0;
......@@ -824,14 +831,14 @@ static FORCE_INLINE bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_t
#endif
tdGetTpRowValOfCol(pVal, pRow, pIter->pBitmap, pCol->type, pCol->offset - sizeof(TSKEY), colIdx - 1);
} else if (TD_IS_KV_ROW(pRow)) {
SKvRowIdx *pIdx =
(SKvRowIdx *)taosbsearch(&colId, pRow->data, pRow->ncols, sizeof(SKvRowIdx), compareKvRowColId, TD_EQ);
SKvRowIdx *pIdx = (SKvRowIdx *)taosbsearch(&colId, TD_ROW_COL_IDX(pRow), tdRowGetNCols(pRow), sizeof(SKvRowIdx),
compareKvRowColId, TD_EQ);
#ifdef TD_SUPPORT_BITMAP
if (pIdx) {
colIdx = POINTER_DISTANCE(pRow->data, pIdx) / sizeof(SKvRowIdx);
colIdx = POINTER_DISTANCE(TD_ROW_COL_IDX(pRow), pIdx) / sizeof(SKvRowIdx);
}
#endif
tdGetKvRowValOfCol(pVal, pRow, pIter->pBitmap, colType, pIdx ? pIdx->offset : -1, colIdx);
tdGetKvRowValOfCol(pVal, pRow, pIter->pBitmap, pIdx ? pIdx->offset : -1, colIdx);
} else {
if (COL_REACH_END(colId, pIter->maxColId)) return false;
pVal->valType = TD_VTYPE_NONE;
......@@ -844,9 +851,9 @@ static FORCE_INLINE bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_t
static FORCE_INLINE bool tdGetTpRowDataOfCol(STSRowIter *pIter, col_type_t colType, int32_t offset, SCellVal *pVal) {
STSRow *pRow = pIter->pRow;
if (IS_VAR_DATA_TYPE(colType)) {
pVal->val = POINTER_SHIFT(pRow, *(VarDataOffsetT *)POINTER_SHIFT(pRow->data, offset));
pVal->val = POINTER_SHIFT(pRow, *(VarDataOffsetT *)POINTER_SHIFT(TD_ROW_DATA(pRow), offset));
} else {
pVal->val = POINTER_SHIFT(pRow->data, offset);
pVal->val = POINTER_SHIFT(TD_ROW_DATA(pRow), offset);
}
#ifdef TD_SUPPORT_BITMAP
......@@ -866,8 +873,9 @@ static FORCE_INLINE bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId,
STSRow * pRow = pIter->pRow;
SKvRowIdx *pKvIdx = NULL;
bool colFound = false;
while (*nIdx < pRow->ncols) {
pKvIdx = (SKvRowIdx *)POINTER_SHIFT(pRow->data, *nIdx * sizeof(SKvRowIdx));
col_id_t kvNCols = tdRowGetNCols(pRow);
while (*nIdx < kvNCols) {
pKvIdx = (SKvRowIdx *)POINTER_SHIFT(TD_ROW_COL_IDX(pRow), *nIdx * sizeof(SKvRowIdx));
if (pKvIdx->colId == colId) {
++(*nIdx);
pVal->val = POINTER_SHIFT(pRow, pKvIdx->offset);
......@@ -885,7 +893,7 @@ static FORCE_INLINE bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId,
#ifdef TD_SUPPORT_BITMAP
int16_t colIdx = -1;
if (pKvIdx) colIdx = POINTER_DISTANCE(pRow->data, pKvIdx) / sizeof(SKvRowIdx);
if (pKvIdx) colIdx = POINTER_DISTANCE(TD_ROW_COL_IDX(pRow), pKvIdx) / sizeof(SKvRowIdx);
if (tdGetBitmapValType(pIter->pBitmap, colIdx, &pVal->valType) != TSDB_CODE_SUCCESS) {
pVal->valType = TD_VTYPE_NONE;
}
......@@ -956,7 +964,7 @@ static FORCE_INLINE int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, in
if (tdGetBitmapValType(pCol->pBitmap, row, &(pVal->valType)) < 0) {
return terrno;
}
if ((pCol->bitmap == 1) || tdValTypeIsNorm(pVal->valType)) {
if (TD_COL_ROWS_NORM(pCol) || tdValTypeIsNorm(pVal->valType)) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pVal->val = POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
} else {
......@@ -973,6 +981,27 @@ static FORCE_INLINE int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, in
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t flen, uint32_t offset,
col_id_t colIdx, SCellVal *pVal) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
tdRowSetVal(pVal, TD_VTYPE_NORM, TD_ROW_KEY_ADDR(pRow));
return true;
}
void *pBitmap = tdGetBitmapAddrTp(pRow, flen);
tdGetTpRowValOfCol(pVal, pRow, pBitmap, colType, offset - sizeof(TSKEY), colIdx - 1);
return true;
}
static FORCE_INLINE bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, uint32_t offset, col_id_t colIdx, SCellVal *pVal) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
tdRowSetVal(pVal, TD_VTYPE_NORM, TD_ROW_KEY_ADDR(pRow));
return true;
}
void *pBitmap = tdGetBitmapAddrKv(pRow, tdRowGetNCols(pRow));
tdGetKvRowValOfCol(pVal, pRow, pBitmap, offset, colIdx - 1);
return true;
}
static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) {
ASSERT(rows > 0);
int32_t result = 0;
......
......@@ -213,9 +213,9 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
int rcol = 0;
int dcol = 1;
int tRowCols = TD_ROW_NCOLS(pRow) - 1; // the primary TS key not included in kvRowColIdx part
int tRowCols = tdRowGetNCols(pRow) - 1; // the primary TS key not included in kvRowColIdx part
int tSchemaCols = schemaNCols(pSchema) - 1;
void *pBitmap = tdGetBitmapAddrKv(pRow, TD_ROW_NCOLS(pRow));
void *pBitmap = tdGetBitmapAddrKv(pRow, tdRowGetNCols(pRow));
SDataCol *pDataCol = &(pCols->cols[0]);
if (pDataCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
......@@ -237,7 +237,7 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
}
SCellVal sVal = {0};
if (pIdx->colId == pDataCol->colId) {
if (tdGetKvRowValOfCol(&sVal, pRow, pBitmap, pDataCol->type, pIdx->offset, colIdx) < 0) {
if (tdGetKvRowValOfCol(&sVal, pRow, pBitmap, pIdx->offset, colIdx) < 0) {
return terrno;
}
tdAppendValToDataCol(pDataCol, sVal.valType, sVal.val, pCols->numOfRows, pCols->maxPoints);
......
......@@ -1163,12 +1163,12 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
&(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex),
&(pBlockCol->numOfNull));
if (pBlockCol->numOfNull == 0) {
pBlockCol->bitmap = 1;
TD_SET_COL_ROWS_NORM(pBlockCol);
} else {
pBlockCol->bitmap = 0;
TD_SET_COL_ROWS_MISC(pBlockCol);
}
} else {
pBlockCol->bitmap = 0;
TD_SET_COL_ROWS_MISC(pBlockCol);
}
nColsNotAllNull++;
}
......@@ -1198,7 +1198,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
#ifdef TD_SUPPORT_BITMAP
int32_t tBitmaps = 0;
if ((ncol != 0) && (pBlockCol->bitmap == 0)) {
if ((ncol != 0) && !TD_COL_ROWS_NORM(pBlockCol)) {
if (IS_VAR_DATA_TYPE(pDataCol->type)) {
tBitmaps = nBitmaps;
tlen += tBitmaps;
......
......@@ -1476,42 +1476,42 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, STSRow* row1,
STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2,
bool forceSetNull) {
#if 0
char* pData = NULL;
STSchema* pSchema;
STSRow* row;
int16_t colId;
int16_t offset;
#if 1
char* pData = NULL;
STSchema* pSchema;
STSRow* row;
int16_t colId;
int16_t offset;
bool isRow1DataRow = TD_IS_TP_ROW(row1);
bool isRow2DataRow;
bool isChosenRowDataRow;
int32_t chosen_itr;
void *value;
SCellVal sVal = {0};
// the schema version info is embeded in SDataRow
// the schema version info is embeded in STSRow
int32_t numOfColsOfRow1 = 0;
if (pSchema1 == NULL) {
pSchema1 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, 0);
pSchema1 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, TD_ROW_SVER(row1));
}
if(isRow1DataRow) {
numOfColsOfRow1 = schemaNCols(pSchema1);
} else {
numOfColsOfRow1 = TD_ROW_NCOLS(row1);
numOfColsOfRow1 = tdRowGetNCols(row1);
}
int32_t numOfColsOfRow2 = 0;
if(row2) {
isRow2DataRow = TD_IS_TP_ROW(row2);
if (pSchema2 == NULL) {
pSchema2 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, 0);
pSchema2 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, TD_ROW_SVER(row2));
}
if(isRow2DataRow) {
numOfColsOfRow2 = schemaNCols(pSchema2);
} else {
numOfColsOfRow2 = TD_ROW_NCOLS(row2);
numOfColsOfRow2 = tdRowGetNCols(row2);
}
}
......@@ -1532,8 +1532,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
} else if(isRow1DataRow) {
colIdOfRow1 = pSchema1->columns[j].colId;
} else {
void *rowBody = memRowKvBody(row1);
SColIdx *pColIdx = kvRowColIdxAt(rowBody, j);
SKvRowIdx *pColIdx = tdKvRowColIdxAt(row1, j);
colIdOfRow1 = pColIdx->colId;
}
......@@ -1543,8 +1542,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
} else if(isRow2DataRow) {
colIdOfRow2 = pSchema2->columns[k].colId;
} else {
void *rowBody = memRowKvBody(row2);
SColIdx *pColIdx = kvRowColIdxAt(rowBody, k);
SKvRowIdx *pColIdx = tdKvRowColIdxAt(row2, k);
colIdOfRow2 = pColIdx->colId;
}
......@@ -1580,60 +1578,57 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
if(isChosenRowDataRow) {
colId = pSchema->columns[chosen_itr].colId;
offset = pSchema->columns[chosen_itr].offset;
void *rowBody = memRowDataBody(row);
value = tdGetRowDataOfCol(rowBody, (int8_t)pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + offset);
tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr, &sVal);
} else {
void *rowBody = memRowKvBody(row);
SColIdx *pColIdx = kvRowColIdxAt(rowBody, chosen_itr);
SKvRowIdx *pColIdx = tdKvRowColIdxAt(row, chosen_itr);
colId = pColIdx->colId;
offset = pColIdx->offset;
value = tdGetKvRowDataOfCol(rowBody, pColIdx->offset);
tdSKvRowGetVal(row, colId, offset, chosen_itr, &sVal);
}
if (colId == pColInfo->info.colId) {
if(forceSetNull || (!isNull(value, (int8_t)pColInfo->info.type))) {
if (tdValTypeIsNorm(sVal.valType)) {
switch (pColInfo->info.type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
memcpy(pData, value, varDataTLen(value));
memcpy(pData, sVal.val, varDataTLen(sVal.val));
break;
case TSDB_DATA_TYPE_NULL:
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
*(uint8_t *)pData = *(uint8_t *)value;
*(uint8_t *)pData = *(uint8_t *)sVal.val;
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
*(uint16_t *)pData = *(uint16_t *)value;
*(uint16_t *)pData = *(uint16_t *)sVal.val;
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
*(uint32_t *)pData = *(uint32_t *)value;
*(uint32_t *)pData = *(uint32_t *)sVal.val;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
*(uint64_t *)pData = *(uint64_t *)value;
*(uint64_t *)pData = *(uint64_t *)sVal.val;
break;
case TSDB_DATA_TYPE_FLOAT:
SET_FLOAT_PTR(pData, value);
SET_FLOAT_PTR(pData, sVal.val);
break;
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_PTR(pData, value);
SET_DOUBLE_PTR(pData, sVal.val);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
#if 0 // only TSKEY supported since 3.0
if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
*(TSKEY *)pData = tdGetKey(*(TKEY *)value);
} else {
*(TSKEY *)pData = *(TSKEY *)value;
}
#endif
*(TSKEY*)pData = *(TSKEY*)value;
*(TSKEY*)pData = *(TSKEY*)sVal.val;
break;
default:
memcpy(pData, value, pColInfo->info.bytes);
memcpy(pData, sVal.val, pColInfo->info.bytes);
}
} else if (forceSetNull) {
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
}
}
i++;
......
......@@ -487,12 +487,12 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
pDataCol->bitmap = pBlockCol->bitmap;
} else {
ASSERT(pDataCol->colId == tcolId);
pDataCol->bitmap = 1;
TD_SET_COL_ROWS_NORM(pDataCol);
}
int32_t tBitmaps = 0;
int32_t tLenBitmap = 0;
if ((dcol != 0) && (pBlockCol->bitmap == 0)) {
if ((dcol != 0) && !TD_COL_ROWS_NORM(pBlockCol)) {
if (IS_VAR_DATA_TYPE(pDataCol->type)) {
tBitmaps = nBitmaps;
tLenBitmap = tBitmaps;
......@@ -622,7 +622,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { // load the key row
blockCol.colId = colId;
blockCol.bitmap = 0; // default is NORM for the primary key column
TD_SET_COL_ROWS_NORM(&blockCol); // default is NORM for the primary key column
blockCol.len = pBlock->keyLen;
blockCol.type = pDataCol->type;
blockCol.offset = TSDB_KEY_COL_OFFSET;
......@@ -670,7 +670,7 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
int32_t tBitmaps = 0;
int32_t tLenBitmap = 0;
if (pBlockCol->bitmap == 0) {
if (!TD_COL_ROWS_NORM(pBlockCol)) {
if (IS_VAR_DATA_TYPE(pDataCol->type)) {
tBitmaps = nBitmaps;
tLenBitmap = tBitmaps;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册