提交 21a8b091 编写于 作者: K kailixu

appendKvRow with NULL optimization

上级 f97d290b
...@@ -265,10 +265,11 @@ typedef struct SDataCol { ...@@ -265,10 +265,11 @@ typedef struct SDataCol {
TSKEY ts; // only used in last NULL column TSKEY ts; // only used in last NULL column
} SDataCol; } SDataCol;
#define isAllRowsNull(pCol) ((pCol)->len == 0)
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints); void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints);
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints); void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
void dataColSetOffset(SDataCol *pCol, int nEle); void dataColSetOffset(SDataCol *pCol, int nEle);
bool isNEleNull(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle);
...@@ -277,7 +278,10 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints); ...@@ -277,7 +278,10 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints);
const void *tdGetNullVal(int8_t type); const void *tdGetNullVal(int8_t type);
// Get the data pointer from a column-wised data // Get the data pointer from a column-wised data
static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) { static FORCE_INLINE const void *tdGetColDataOfRow(SDataCol *pCol, int row) {
if (isAllRowsNull(pCol)) {
return tdGetNullVal(pCol->type);
}
if (IS_VAR_DATA_TYPE(pCol->type)) { if (IS_VAR_DATA_TYPE(pCol->type)) {
return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]); return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
} else { } else {
......
...@@ -241,9 +241,21 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) ...@@ -241,9 +241,21 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints)
} }
} }
// value from timestamp should be TKEY here instead of TSKEY // value from timestamp should be TKEY here instead of TSKEY
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) { void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
ASSERT(pCol != NULL && value != NULL); ASSERT(pCol != NULL && value != NULL);
if (isAllRowsNull(pCol)) {
if (isNull(value, pCol->type)) {
// all null value yet, just return
return;
}
if (numOfRows > 0) {
// Find the first not null value, fill all previouse values as NULL
dataColSetNEleNull(pCol, numOfRows, maxPoints);
}
}
if (IS_VAR_DATA_TYPE(pCol->type)) { if (IS_VAR_DATA_TYPE(pCol->type)) {
// set offset // set offset
pCol->dataOff[numOfRows] = pCol->len; pCol->dataOff[numOfRows] = pCol->len;
...@@ -440,7 +452,8 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols ...@@ -440,7 +452,8 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
while (dcol < pCols->numOfCols) { while (dcol < pCols->numOfCols) {
SDataCol *pDataCol = &(pCols->cols[dcol]); SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) { if (rcol >= schemaNCols(pSchema)) {
dataColSetNullAt(pDataCol, pCols->numOfRows); // dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
dcol++; dcol++;
continue; continue;
} }
...@@ -454,7 +467,8 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols ...@@ -454,7 +467,8 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
} else if (pRowCol->colId < pDataCol->colId) { } else if (pRowCol->colId < pDataCol->colId) {
rcol++; rcol++;
} else { } else {
dataColSetNullAt(pDataCol, pCols->numOfRows); // dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
dcol++; dcol++;
} }
} }
...@@ -483,7 +497,8 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo ...@@ -483,7 +497,8 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
while (dcol < pCols->numOfCols) { while (dcol < pCols->numOfCols) {
SDataCol *pDataCol = &(pCols->cols[dcol]); SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
dataColSetNullAt(pDataCol, pCols->numOfRows); // dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
++dcol; ++dcol;
continue; continue;
} }
...@@ -498,7 +513,8 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo ...@@ -498,7 +513,8 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
} else if (colIdx->colId < pDataCol->colId) { } else if (colIdx->colId < pDataCol->colId) {
++rcol; ++rcol;
} else { } else {
dataColSetNullAt(pDataCol, pCols->numOfRows); // dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
++dcol; ++dcol;
} }
} }
......
...@@ -920,7 +920,8 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo ...@@ -920,7 +920,8 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo
SDataCol * pDataCol = pDataCols->cols + ncol; SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull; SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull;
if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it // if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it
if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it
continue; continue;
} }
......
...@@ -1391,7 +1391,7 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity ...@@ -1391,7 +1391,7 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity
// todo refactor, only copy one-by-one // todo refactor, only copy one-by-one
for (int32_t k = start; k < num + start; ++k) { for (int32_t k = start; k < num + start; ++k) {
char* p = tdGetColDataOfRow(src, k); const char* p = tdGetColDataOfRow(src, k);
memcpy(dst, p, varDataTLen(p)); memcpy(dst, p, varDataTLen(p));
dst += bytes; dst += bytes;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册