提交 7a2e667a 编写于 作者: K Kaili Xu

support STSRow

上级 cf859069
...@@ -26,6 +26,9 @@ extern "C" { ...@@ -26,6 +26,9 @@ extern "C" {
// Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap. // Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap.
#define TD_SUPPORT_BITMAP #define TD_SUPPORT_BITMAP
#define TD_SUPPORT_BACK2 // suppport back compatibility of 2.0
#define TASSERT(x) ASSERT(x)
#define STR_TO_VARSTR(x, str) \ #define STR_TO_VARSTR(x, str) \
do { \ do { \
...@@ -368,6 +371,7 @@ typedef struct SDataCol { ...@@ -368,6 +371,7 @@ typedef struct SDataCol {
} SDataCol; } SDataCol;
#define isAllRowsNull(pCol) ((pCol)->len == 0) #define isAllRowsNull(pCol) ((pCol)->len == 0)
#define isAllRowsNone(pCol) ((pCol)->len == 0)
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
int tdAllocMemForCol(SDataCol *pCol, int maxPoints); int tdAllocMemForCol(SDataCol *pCol, int maxPoints);
......
此差异已折叠。
...@@ -29,7 +29,7 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { ...@@ -29,7 +29,7 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
spaceNeeded += sizeof(VarDataOffsetT) * maxPoints; spaceNeeded += sizeof(VarDataOffsetT) * maxPoints;
} }
#ifdef TD_SUPPORT_BITMAP #ifdef TD_SUPPORT_BITMAP
spaceNeeded += (int)TD_BIT_TO_BYTES(maxPoints); spaceNeeded += (int)TD_BITMAP_BYTES(maxPoints);
#endif #endif
if(pCol->spaceSize < spaceNeeded) { if(pCol->spaceSize < spaceNeeded) {
void* ptr = realloc(pCol->pData, spaceNeeded); void* ptr = realloc(pCol->pData, spaceNeeded);
......
...@@ -15,9 +15,16 @@ ...@@ -15,9 +15,16 @@
#include "trow.h" #include "trow.h"
const uint8_t tdVTypeByte[3] = {
TD_VTYPE_NORM_BYTE, // TD_VTYPE_NORM
TD_VTYPE_NONE_BYTE, // TD_VTYPE_NONE
TD_VTYPE_NULL_BYTE, // TD_VTYPE_NULL
};
static void dataColSetNEleNull(SDataCol *pCol, int nEle); static void dataColSetNEleNull(SDataCol *pCol, int nEle);
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, static void tdMergeTwoDat 6z, z,
int limit2, int tRows, bool forceSetNull); zaCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2,
int tRows, bool forceSetNull);
static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) {
if (IS_VAR_DATA_TYPE(pCol->type)) { if (IS_VAR_DATA_TYPE(pCol->type)) {
...@@ -43,6 +50,47 @@ static void dataColSetNEleNull(SDataCol *pCol, int nEle) { ...@@ -43,6 +50,47 @@ static void dataColSetNEleNull(SDataCol *pCol, int nEle) {
} }
} }
static FORCE_INLINE int32_t tdSetBitmapValTypeN(void *pBitmap, int16_t nEle, TDRowValT valType) {
TASSERT(valType <= TD_VTYPE_NULL);
int16_t nBytes = nEle / TD_VTYPE_PARTS;
for (int i = 0; i < nBytes; ++i) {
*(uint8_t *)pBitmap = tdVTypeByte[valType];
pBitmap = POINTER_SHIFT(pBitmap, 1);
}
int16_t nLeft = nEle - nBytes * TD_VTYPE_BITS;
for (int j = 0; j < nLeft; ++j) {
tdSetBitmapValType(pBitmap, j, valType);
}
}
static FORCE_INLINE void dataColSetNoneAt(SDataCol *pCol, int index) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pCol->dataOff[index] = pCol->len;
char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
setVardataNull(ptr, pCol->type);
pCol->len += varDataTLen(ptr);
} else {
setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
pCol->len += TYPE_BYTES[pCol->type];
}
}
static void dataColSetNEleNone(SDataCol *pCol, int nEle) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pCol->len = 0;
for (int i = 0; i < nEle; ++i) {
dataColSetNoneAt(pCol, i);
}
} else {
setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
pCol->len = TYPE_BYTES[pCol->type] * nEle;
}
#ifdef TD_SUPPORT_BITMAP
tdSetBitmapValTypeN(pCol->pBitmap, nEle, TD_VTYPE_NONE);
#endif
}
#if 0 #if 0
void trbSetRowInfo(SRowBuilder *pRB, bool del, uint16_t sver) { void trbSetRowInfo(SRowBuilder *pRB, bool del, uint16_t sver) {
// TODO // TODO
...@@ -436,19 +484,22 @@ void tdResetDataCols(SDataCols *pCols) { ...@@ -436,19 +484,22 @@ void tdResetDataCols(SDataCols *pCols) {
int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int numOfRows, int maxPoints) { int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int numOfRows, int maxPoints) {
ASSERT(pCol != NULL); ASSERT(pCol != NULL);
if (isAllRowsNull(pCol)) { // Assume that, the columns not specified during insert/upsert is None.
if (tdValIsNone(valType) || tdValIsNull(valType, val, pCol->type)) { if (isAllRowsNone(pCol)) {
// all Null value yet, just return if (tdValIsNone(valType)) {
// all None value yet, just return
return 0; return 0;
} }
if(tdAllocMemForCol(pCol, maxPoints) < 0) return -1; if(tdAllocMemForCol(pCol, maxPoints) < 0) return -1;
if (numOfRows > 0) { if (numOfRows > 0) {
// Find the first not null value, fill all previous values as Null // Find the first not None value, fill all previous values as None
dataColSetNEleNull(pCol, numOfRows); dataColSetNEleNone(pCol, numOfRows);
} }
} }
if (!tdValTypeIsNorm(valType)) {
val = getNullValue(pCol->type);
}
if (IS_VAR_DATA_TYPE(pCol->type)) { if (IS_VAR_DATA_TYPE(pCol->type)) {
// set offset // set offset
pCol->dataOff[numOfRows] = pCol->len; pCol->dataOff[numOfRows] = pCol->len;
...@@ -461,97 +512,114 @@ int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int ...@@ -461,97 +512,114 @@ int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), val, pCol->bytes); memcpy(POINTER_SHIFT(pCol->pData, pCol->len), val, pCol->bytes);
pCol->len += pCol->bytes; pCol->len += pCol->bytes;
} }
#ifdef TD_SUPPORT_BITMAP
tdSetBitmapValType(pCol->pBitmap, numOfRows, valType);
#endif
return 0; return 0;
} }
static void tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) { // internal
static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_TSKEY(pRow)); ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_TSKEY(pRow));
int rcol = 0; int rcol = 1;
int dcol = 0; int dcol = 1;
void* pBitmap = tdGetBitmapAddrTp(pRow, pSchema->flen); void *pBitmap = tdGetBitmapAddrTp(pRow, pSchema->flen);
SDataCol *pDataCol = &(pCols->cols[0]);
if (pDataCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
tdAppendValToDataCol(pDataCol, pRow->ts, TD_VTYPE_NORM, pCols->numOfRows, pCols->maxPoints);
}
while (dcol < pCols->numOfCols) { while (dcol < pCols->numOfCols) {
bool setCol = 0; pDataCol = &(pCols->cols[dcol]);
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) { if (rcol >= schemaNCols(pSchema)) {
tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints); tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints);
dcol++; ++dcol;
continue; continue;
} }
STColumn *pRowCol = schemaColAt(pSchema, rcol); STColumn *pRowCol = schemaColAt(pSchema, rcol);
SCellVal sVal = {0}; SCellVal sVal = {0};
if (pRowCol->colId == pDataCol->colId) { if (pRowCol->colId == pDataCol->colId) {
if(tdGetTpRowValOfCol(&sVal, pRow, pBitmap, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE, rcol) < 0){ if (tdGetTpRowValOfCol(&sVal, pRow, pBitmap, pRowCol->type, pRowCol->offset - sizeof(TSKEY), rcol - 1) < 0) {
return terrno;
} }
if(!isNull(value, pDataCol->type)) setCol = 1; tdAppendValToDataCol(pDataCol, sVal.valType, sVal.val, pCols->numOfRows, pCols->maxPoints);
tdAppendValToDataCol(pDataCol, value, pCols->numOfRows, pCols->maxPoints); ++dcol;
dcol++; ++rcol;
rcol++;
} else if (pRowCol->colId < pDataCol->colId) { } else if (pRowCol->colId < pDataCol->colId) {
rcol++; ++rcol;
} else { } else {
if(forceSetNull || setCol) { tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints);
tdAppendValToDataCol(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); ++dcol;
}
dcol++;
} }
} }
pCols->numOfRows++; ++pCols->numOfRows;
}
static void tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) { return TSDB_CODE_SUCCESS;
}
// internal
static void tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_TSKEY(pRow)); ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_TSKEY(pRow));
int rcol = 0; int rcol = 0;
int dcol = 0; int dcol = 1;
int nRowCols = TD_ROW_NCOLS(pRow); int tRowCols = TD_ROW_NCOLS(pRow) - 1; // the primary TS key not included in kvRowColIdx part
int tSchemaCols = schemaNCols(pSchema) - 1;
void *pBitmap = tdGetBitmapAddrKv(pRow, nRowCols); void *pBitmap = tdGetBitmapAddrKv(pRow, nRowCols);
SDataCol *pDataCol = &(pCols->cols[0]);
if (pDataCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
tdAppendValToDataCol(pDataCol, pRow->ts, TD_VTYPE_NORM, pCols->numOfRows, pCols->maxPoints);
}
while (dcol < pCols->numOfCols) { while (dcol < pCols->numOfCols) {
bool setCol = 0; pDataCol = &(pCols->cols[dcol]);
SDataCol *pDataCol = &(pCols->cols[dcol]); if (rcol >= tRowCols || rcol >= tSchemaCols) {
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints);
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
++dcol; ++dcol;
continue; continue;
} }
SColIdx *colIdx = NULL;//kvRowColIdxAt(row, rcol); SKvRowIdx *pIdx = tdKvRowColIdxAt(pRow, rcol);
int16_t colIdx = -1;
if (colIdx->colId == pDataCol->colId) { if (pIdx) {
void *value = NULL; //tdGetKvRowDataOfCol(row, colIdx->offset); colIdx = POINTER_DISTANCE(pRow->data, pIdx) / sizeof(SKvRowIdx);
if(!isNull(value, pDataCol->type)) setCol = 1; }
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); SCellVal sVal = {0};
if (pIdx->colId == pDataCol->colId) {
if (tdGetKvRowValOfCol(&sVal, pRow, pBitmap, pDataCol->type, pIdx->offset, colIdx) < 0) {
return terrno;
}
tdAppendValToDataCol(pDataCol, sVal.valType, sVal.val, pCols->numOfRows, pCols->maxPoints);
++dcol; ++dcol;
++rcol; ++rcol;
} else if (colIdx->colId < pDataCol->colId) { } else if (pIdx->colId < pDataCol->colId) {
++rcol; ++rcol;
} else { } else {
if(forceSetNull || setCol) { tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints);
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
}
++dcol; ++dcol;
} }
} }
pCols->numOfRows++; ++pCols->numOfRows;
return TSDB_CODE_SUCCESS;
} }
/** /**
* @brief * @brief exposed
* *
* @param pRow * @param pRow
* @param pSchema * @param pSchema
* @param pCols * @param pCols
* @param forceSetNull * @param forceSetNull
*/ */
void tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) { void tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
if (TD_IS_TP_ROW(pRow)) { if (TD_IS_TP_ROW(pRow)) {
tdAppendTpRowToDataCol(pRow, pSchema, pCols, forceSetNull); tdAppendTpRowToDataCol(pRow, pSchema, pCols);
} else if (TD_IS_KV_ROW(pRow)) { } else if (TD_IS_KV_ROW(pRow)) {
tdAppendKvRowToDataCol(pRow, pSchema, pCols, forceSetNull); tdAppendKvRowToDataCol(pRow, pSchema, pCols);
} else { } else {
ASSERT(0); ASSERT(0);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册