提交 52eaa880 编写于 作者: C Cary Xu

code optimization

上级 6a6f67c0
......@@ -340,13 +340,7 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints);
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints);
// value from timestamp should be TKEY here instead of TSKEY
int dataColAppendValEx(SDataCol *pCol, const void *value, int numOfRows, int maxPoints, bool isAppend);
// value from timestamp should be TKEY here instead of TSKEY
FORCE_INLINE int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
return dataColAppendValEx(pCol, value, numOfRows, maxPoints, true);
}
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints, int rowOffset);
void dataColSetOffset(SDataCol *pCol, int nEle);
......@@ -678,7 +672,7 @@ static FORCE_INLINE char *memRowEnd(SMemRow row) {
#define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r))
SMemRow tdMemRowDup(SMemRow row);
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull, bool isAppend);
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull, int rowOffset);
// NOTE: offset here including the header size
static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int16_t colId, int8_t colType, uint16_t offset) {
......
......@@ -239,9 +239,12 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) {
pDataCol->len = 0;
}
// value from timestamp should be TKEY here instead of TSKEY
int dataColAppendValEx(SDataCol *pCol, const void *value, int numOfRows, int maxPoints, bool isAppend) {
ASSERT(pCol != NULL && value != NULL);
/**
* value from timestamp should be TKEY here instead of TSKEY.
* - rowOffset: 0 for current row, -1 for previous row
*/
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints, int rowOffset) {
ASSERT(pCol != NULL && value != NULL && (rowOffset == 0 || rowOffset == -1));
if (isAllRowsNull(pCol)) {
if (isNull(value, pCol->type)) {
......@@ -257,21 +260,28 @@ int dataColAppendValEx(SDataCol *pCol, const void *value, int numOfRows, int max
}
if (IS_VAR_DATA_TYPE(pCol->type)) {
// set offset
pCol->dataOff[numOfRows] = pCol->len;
// Copy data
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
// Update the length
pCol->len += varDataTLen(value);
if (rowOffset == 0) {
// set offset
pCol->dataOff[numOfRows] = pCol->len;
// Copy data
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
// Update the length
pCol->len += varDataTLen(value);
} else {
// Copy data
void *lastValue = POINTER_SHIFT(pCol->pData, pCol->dataOff[numOfRows]);
int lastValLen = varDataTLen(lastValue);
memcpy(lastValue, value, varDataTLen(value));
// Update the length
pCol->len -= lastValLen;
pCol->len += varDataTLen(value);
}
} else {
if (isAppend) { // append the value
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
// update the value of last row with increasing the pCol->len and keeping the numOfRows for paritial update
ASSERT(pCol->len == (TYPE_BYTES[pCol->type] * (numOfRows - rowOffset)));
memcpy(POINTER_SHIFT(pCol->pData, (pCol->len + rowOffset * TYPE_BYTES[pCol->type])), value, pCol->bytes);
if (rowOffset == 0) {
pCol->len += pCol->bytes;
} else {
// update the value of last row by increasing the pCol->len and keeping the original numOfRows
ASSERT(pCol->len == (TYPE_BYTES[pCol->type] * (numOfRows + 1)));
memcpy(POINTER_SHIFT(pCol->pData, pCol->len - TYPE_BYTES[pCol->type]), value, pCol->bytes);
}
}
return 0;
......@@ -448,7 +458,7 @@ void tdResetDataCols(SDataCols *pCols) {
}
static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull,
bool isAppend) {
int rowOffset) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row));
int rcol = 0;
......@@ -458,7 +468,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) {
dataColAppendValEx(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, isAppend);
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, rowOffset);
dcol++;
continue;
}
......@@ -467,14 +477,14 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
if (pRowCol->colId == pDataCol->colId) {
void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendValEx(pDataCol, value, pCols->numOfRows, pCols->maxPoints, isAppend);
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints, rowOffset);
dcol++;
rcol++;
} else if (pRowCol->colId < pDataCol->colId) {
rcol++;
} else {
if(forceSetNull || setCol) {
dataColAppendValEx(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, isAppend);
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, rowOffset);
}
dcol++;
}
......@@ -482,7 +492,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
pCols->numOfRows++;
}
static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull, bool isAppend) {
static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull, int rowOffset) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row));
int rcol = 0;
......@@ -494,7 +504,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
dataColAppendValEx(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, isAppend);
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, rowOffset);
++dcol;
continue;
}
......@@ -504,14 +514,14 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
if (colIdx->colId == pDataCol->colId) {
void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendValEx(pDataCol, value, pCols->numOfRows, pCols->maxPoints, isAppend);
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints, rowOffset);
++dcol;
++rcol;
} else if (colIdx->colId < pDataCol->colId) {
++rcol;
} else {
if (forceSetNull || setCol) {
dataColAppendValEx(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, isAppend);
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, rowOffset);
}
++dcol;
}
......@@ -519,11 +529,11 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
pCols->numOfRows++;
}
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull, bool isAppend) {
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull, int rowOffset) {
if (isDataRow(row)) {
tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols, forceSetNull, isAppend);
tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols, forceSetNull, rowOffset);
} else if (isKvRow(row)) {
tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols, forceSetNull, isAppend);
tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols, forceSetNull, rowOffset);
} else {
ASSERT(0);
}
......@@ -546,7 +556,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *
for (int j = 0; j < source->numOfCols; j++) {
if (source->cols[j].len > 0 || target->cols[j].len > 0) {
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows,
target->maxPoints);
target->maxPoints, 0);
}
}
target->numOfRows++;
......@@ -590,7 +600,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
ASSERT(target->cols[i].type == src1->cols[i].type);
if (src1->cols[i].len > 0 || target->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints);
target->maxPoints, 0);
}
}
......@@ -602,10 +612,10 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
ASSERT(target->cols[i].type == src2->cols[i].type);
if (src2->cols[i].len > 0 && !isNull(src2->cols[i].pData, src2->cols[i].type)) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
target->maxPoints);
target->maxPoints, 0);
} else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints);
target->maxPoints, 0);
} else if(target->cols[i].len > 0) {
dataColSetNullAt(&target->cols[i], target->numOfRows);
}
......
......@@ -1276,7 +1276,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
for (int i = 0; i < pDataCols->numOfCols; i++) {
//TODO: dataColAppendVal may fail
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints);
pTarget->maxPoints, 0);
}
pTarget->numOfRows++;
......@@ -1288,7 +1288,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, true, true);
tdAppendMemRowToDataCol(row, pSchema, pTarget, true, 0);
tSkipListIterNext(pCommitIter->pIter);
} else {
......@@ -1297,7 +1297,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
for (int i = 0; i < pDataCols->numOfCols; i++) {
//TODO: dataColAppendVal may fail
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints);
pTarget->maxPoints, 0);
}
if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++;
......@@ -1311,7 +1311,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE,
update != TD_ROW_PARTIAL_UPDATE);
update != TD_ROW_PARTIAL_UPDATE ? 0 : -1);
}
(*iter)++;
tSkipListIterNext(pCommitIter->pIter);
......
......@@ -589,7 +589,7 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema *
}
}
tdAppendMemRowToDataCol(row, *ppSchema, pCols, true, true);
tdAppendMemRowToDataCol(row, *ppSchema, pCols, true, 0);
}
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册