未验证 提交 61fb7742 编写于 作者: L Liu Jicong 提交者: GitHub

[TD-5062] Feature/row partial update (#6970)

* [TD-5062]<feature>: support partial update
上级 3aa6acfc
......@@ -1864,8 +1864,8 @@ static SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
while (i < nColsBound) {
int16_t colId = payloadColId(p);
uint8_t colType = payloadColType(p);
tdAppendKvColVal(kvRow, POINTER_SHIFT(pVals,payloadColOffset(p)), colId, colType, toffset);
toffset += sizeof(SColIdx);
tdAppendKvColVal(kvRow, POINTER_SHIFT(pVals,payloadColOffset(p)), colId, colType, &toffset);
//toffset += sizeof(SColIdx);
p = payloadNextCol(p);
++i;
}
......
......@@ -232,6 +232,83 @@ static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t o
}
}
static FORCE_INLINE void *tdGetPtrToCol(SDataRow row, STSchema *pSchema, int idx) {
return POINTER_SHIFT(row, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[idx].offset);
}
static FORCE_INLINE void *tdGetColOfRowBySchema(SDataRow row, STSchema *pSchema, int idx) {
int16_t offset = TD_DATA_ROW_HEAD_SIZE + pSchema->columns[idx].offset;
int8_t type = pSchema->columns[idx].type;
return tdGetRowDataOfCol(row, type, offset);
}
static FORCE_INLINE bool tdIsColOfRowNullBySchema(SDataRow row, STSchema *pSchema, int idx) {
int16_t offset = TD_DATA_ROW_HEAD_SIZE + pSchema->columns[idx].offset;
int8_t type = pSchema->columns[idx].type;
return isNull(tdGetRowDataOfCol(row, type, offset), type);
}
static FORCE_INLINE void tdSetColOfRowNullBySchema(SDataRow row, STSchema *pSchema, int idx) {
int16_t offset = TD_DATA_ROW_HEAD_SIZE + pSchema->columns[idx].offset;
int8_t type = pSchema->columns[idx].type;
int16_t bytes = pSchema->columns[idx].bytes;
setNull(tdGetRowDataOfCol(row, type, offset), type, bytes);
}
static FORCE_INLINE void tdCopyColOfRowBySchema(SDataRow dst, STSchema *pDstSchema, int dstIdx, SDataRow src, STSchema *pSrcSchema, int srcIdx) {
int8_t type = pDstSchema->columns[dstIdx].type;
ASSERT(type == pSrcSchema->columns[srcIdx].type);
void *pData = tdGetPtrToCol(dst, pDstSchema, dstIdx);
void *value = tdGetPtrToCol(src, pSrcSchema, srcIdx);
switch (type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
*(VarDataOffsetT *)pData = *(VarDataOffsetT *)value;
pData = POINTER_SHIFT(dst, *(VarDataOffsetT *)pData);
value = POINTER_SHIFT(src, *(VarDataOffsetT *)value);
memcpy(pData, value, varDataTLen(value));
break;
case TSDB_DATA_TYPE_NULL:
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
*(uint8_t *)pData = *(uint8_t *)value;
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
*(uint16_t *)pData = *(uint16_t *)value;
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
*(uint32_t *)pData = *(uint32_t *)value;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
*(uint64_t *)pData = *(uint64_t *)value;
break;
case TSDB_DATA_TYPE_FLOAT:
SET_FLOAT_PTR(pData, value);
break;
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_PTR(pData, value);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
if (pSrcSchema->columns[srcIdx].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
*(TSKEY *)pData = tdGetKey(*(TKEY *)value);
} else {
*(TSKEY *)pData = *(TSKEY *)value;
}
break;
default:
memcpy(pData, value, pSrcSchema->columns[srcIdx].bytes);
}
}
// ----------------- Data column structure
typedef struct SDataCol {
int8_t type; // column type
......@@ -335,7 +412,7 @@ void tdResetDataCols(SDataCols *pCols);
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
SDataCols *tdFreeDataCols(SDataCols *pCols);
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset);
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull);
// ----------------- K-V data row structure
/* |<-------------------------------------- len -------------------------------------------->|
......@@ -398,9 +475,9 @@ static FORCE_INLINE void *tdGetKVRowIdxOfCol(SKVRow row, int16_t colId) {
}
// offset here not include kvRow header length
static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t colId, int8_t type, int32_t offset) {
static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t colId, int8_t type, int32_t *offset) {
ASSERT(value != NULL);
int32_t toffset = offset + TD_KV_ROW_HEAD_SIZE;
int32_t toffset = *offset + TD_KV_ROW_HEAD_SIZE;
SColIdx *pColIdx = (SColIdx *)POINTER_SHIFT(row, toffset);
char * ptr = (char *)POINTER_SHIFT(row, kvRowLen(row));
......@@ -411,7 +488,7 @@ static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t
memcpy(ptr, value, varDataTLen(value));
kvRowLen(row) += varDataTLen(value);
} else {
if (offset == 0) {
if (*offset == 0) {
ASSERT(type == TSDB_DATA_TYPE_TIMESTAMP);
TKEY tvalue = tdGetTKEY(*(TSKEY *)value);
memcpy(ptr, (void *)(&tvalue), TYPE_BYTES[type]);
......@@ -420,9 +497,27 @@ static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t
}
kvRowLen(row) += TYPE_BYTES[type];
}
*offset += sizeof(SColIdx);
return 0;
}
// NOTE: offset here including the header size
static FORCE_INLINE void *tdGetKvRowDataOfCol(void *row, int32_t offset) { return POINTER_SHIFT(row, offset); }
static FORCE_INLINE void *tdGetKVRowValOfColEx(SKVRow row, int16_t colId, int32_t *nIdx) {
while (*nIdx < kvRowNCols(row)) {
SColIdx *pColIdx = kvRowColIdxAt(row, *nIdx);
if (pColIdx->colId == colId) {
++(*nIdx);
return tdGetKvRowDataOfCol(row, pColIdx->offset);
} else if (pColIdx->colId > colId) {
return NULL;
} else {
++(*nIdx);
}
}
return NULL;
}
// ----------------- K-V data row builder
typedef struct {
......@@ -495,7 +590,7 @@ typedef void *SMemRow;
#define TD_MEM_ROW_KV_VER_SIZE sizeof(int16_t)
#define TD_MEM_ROW_KV_TYPE_VER_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE)
#define TD_MEM_ROW_DATA_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_DATA_ROW_HEAD_SIZE)
// #define TD_MEM_ROW_KV_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE + TD_KV_ROW_HEAD_SIZE)
#define TD_MEM_ROW_KV_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE + TD_KV_ROW_HEAD_SIZE)
#define SMEM_ROW_DATA 0U // SDataRow
#define SMEM_ROW_KV 1U // SKVRow
......@@ -538,27 +633,80 @@ typedef void *SMemRow;
#define memRowSetType(r, t) (memRowType(r) = (t))
#define memRowSetLen(r, l) (isDataRow(r) ? memRowDataLen(r) = (l) : memRowKvLen(r) = (l))
#define memRowSetVersion(r, v) (isDataRow(r) ? dataRowSetVersion(memRowDataBody(r), v) : memRowKvSetVersion(r, v))
#define memRowSetVersion(r, v) (isDataRow(r) ? dataRowSetVersion(memRowDataBody(r), v) : memRowSetKvVersion(r, v))
#define memRowCpy(dst, r) memcpy((dst), (r), memRowTLen(r))
#define memRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_MEM_ROW_DATA_HEAD_SIZE)
#define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r))
SMemRow tdMemRowDup(SMemRow row);
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols);
// NOTE: offset here including the header size
static FORCE_INLINE void *tdGetKvRowDataOfCol(void *row, int32_t offset) { return POINTER_SHIFT(row, offset); }
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull);
// NOTE: offset here including the header size
static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int8_t type, int32_t offset) {
static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int16_t colId, int8_t colType, uint16_t offset) {
if (isDataRow(row)) {
return tdGetRowDataOfCol(row, type, offset);
} else if (isKvRow(row)) {
return tdGetKvRowDataOfCol(row, offset);
return tdGetRowDataOfCol(memRowDataBody(row), colType, offset);
} else {
ASSERT(0);
return tdGetKVRowValOfCol(memRowKvBody(row), colId);
}
return NULL;
}
/**
* NOTE:
* 1. Applicable to scan columns one by one
* 2. offset here including the header size
*/
static FORCE_INLINE void *tdGetMemRowDataOfColEx(void *row, int16_t colId, int8_t colType, int32_t offset,
int32_t *kvNIdx) {
if (isDataRow(row)) {
return tdGetRowDataOfCol(memRowDataBody(row), colType, offset);
} else {
return tdGetKVRowValOfColEx(memRowKvBody(row), colId, kvNIdx);
}
}
static FORCE_INLINE int tdAppendMemColVal(SMemRow row, const void *value, int16_t colId, int8_t type, int32_t offset,
int32_t *kvOffset) {
if (isDataRow(row)) {
tdAppendColVal(memRowDataBody(row), value, type, offset);
} else {
tdAppendKvColVal(memRowKvBody(row), value, colId, type, kvOffset);
}
return 0;
}
// make sure schema->flen appended for SDataRow
static FORCE_INLINE int32_t tdGetColAppendLen(uint8_t rowType, const void *value, int8_t colType) {
int32_t len = 0;
if (IS_VAR_DATA_TYPE(colType)) {
len += varDataTLen(value);
if (rowType == SMEM_ROW_KV) {
len += sizeof(SColIdx);
}
} else {
if (rowType == SMEM_ROW_KV) {
len += TYPE_BYTES[colType];
len += sizeof(SColIdx);
}
}
return len;
}
typedef struct {
int16_t colId;
uint8_t colType;
char* colVal;
} SColInfo;
static FORCE_INLINE void setSColInfo(SColInfo* colInfo, int16_t colId, uint8_t colType, char* colVal) {
colInfo->colId = colId;
colInfo->colType = colType;
colInfo->colVal = colVal;
}
SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2);
// ----------------- Raw payload structure for row:
/* |<------------ Head ------------->|<----------- body of column data tuple ------------------->|
* | |<----------------- flen ------------->|<--- value part --->|
......@@ -608,4 +756,4 @@ static FORCE_INLINE char *payloadNextCol(char *pCol) { return (char *)POINTER_SH
}
#endif
#endif // _TD_DATA_FORMAT_H_
\ No newline at end of file
#endif // _TD_DATA_FORMAT_H_
......@@ -17,9 +17,10 @@
#include "talgo.h"
#include "tcoding.h"
#include "wchar.h"
#include "tarray.h"
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows);
int limit2, int tRows, bool forceSetNull);
/**
* Duplicate the schema and return a new object
......@@ -418,7 +419,8 @@ void tdResetDataCols(SDataCols *pCols) {
}
}
}
static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) {
static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row));
int rcol = 0;
......@@ -452,8 +454,10 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
} else if (pRowCol->colId < pDataCol->colId) {
rcol++;
} else {
// dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
if(forceSetNull) {
//dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
}
dcol++;
}
}
......@@ -461,7 +465,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
pCols->numOfRows++;
}
static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols) {
static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row));
int rcol = 0;
......@@ -498,8 +502,10 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
} else if (colIdx->colId < pDataCol->colId) {
++rcol;
} else {
// dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
if (forceSetNull) {
// dataColSetNullAt(pDataCol, pCols->numOfRows);
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
}
++dcol;
}
}
......@@ -507,17 +513,17 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
pCols->numOfRows++;
}
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols) {
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
if (isDataRow(row)) {
tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols);
tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols, forceSetNull);
} else if (isKvRow(row)) {
tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols);
tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols, forceSetNull);
} else {
ASSERT(0);
}
}
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset) {
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) {
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
ASSERT(target->numOfCols == source->numOfCols);
int offset = 0;
......@@ -546,7 +552,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *
int iter1 = 0;
tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, pOffset, source->numOfRows,
pTarget->numOfRows + rowsToMerge);
pTarget->numOfRows + rowsToMerge, forceSetNull);
}
tdFreeDataCols(pTarget);
......@@ -559,7 +565,7 @@ _err:
// src2 data has more priority than src1
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows) {
int limit2, int tRows, bool forceSetNull) {
tdResetDataCols(target);
ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
......@@ -588,7 +594,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) {
for (int i = 0; i < src2->numOfCols; i++) {
ASSERT(target->cols[i].type == src2->cols[i].type);
if (src2->cols[i].len > 0) {
if (src2->cols[i].len > 0 && (forceSetNull || (!forceSetNull && !isNull(src2->cols[i].pData, src2->cols[i].type)))) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
target->maxPoints);
}
......@@ -761,4 +767,97 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
return row;
}
\ No newline at end of file
}
SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2) {
#if 0
ASSERT(memRowKey(row1) == memRowKey(row2));
ASSERT(schemaVersion(pSchema1) == memRowVersion(row1));
ASSERT(schemaVersion(pSchema2) == memRowVersion(row2));
ASSERT(schemaVersion(pSchema1) >= schemaVersion(pSchema2));
#endif
SArray *stashRow = taosArrayInit(pSchema1->numOfCols, sizeof(SColInfo));
if (stashRow == NULL) {
return NULL;
}
SMemRow pRow = buffer;
SDataRow dataRow = memRowDataBody(pRow);
memRowSetType(pRow, SMEM_ROW_DATA);
dataRowSetVersion(dataRow, schemaVersion(pSchema1)); // use latest schema version
dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pSchema1->flen));
TDRowTLenT dataLen = 0, kvLen = TD_MEM_ROW_KV_HEAD_SIZE;
int32_t i = 0; // row1
int32_t j = 0; // row2
int32_t nCols1 = schemaNCols(pSchema1);
int32_t nCols2 = schemaNCols(pSchema2);
SColInfo colInfo = {0};
int32_t kvIdx1 = 0, kvIdx2 = 0;
while (i < nCols1) {
STColumn *pCol = schemaColAt(pSchema1, i);
void * val1 = tdGetMemRowDataOfColEx(row1, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx1);
// if val1 != NULL, use val1;
if (val1 != NULL && !isNull(val1, pCol->type)) {
tdAppendColVal(dataRow, val1, pCol->type, pCol->offset);
kvLen += tdGetColAppendLen(SMEM_ROW_KV, val1, pCol->type);
setSColInfo(&colInfo, pCol->colId, pCol->type, val1);
taosArrayPush(stashRow, &colInfo);
++i; // next col
continue;
}
void *val2 = NULL;
while (j < nCols2) {
STColumn *tCol = schemaColAt(pSchema2, j);
if (tCol->colId < pCol->colId) {
++j;
continue;
}
if (tCol->colId == pCol->colId) {
val2 = tdGetMemRowDataOfColEx(row2, tCol->colId, tCol->type, TD_DATA_ROW_HEAD_SIZE + tCol->offset, &kvIdx2);
} else if (tCol->colId > pCol->colId) {
// set NULL
}
break;
} // end of while(j<nCols2)
if (val2 == NULL) {
val2 = (void *)getNullValue(pCol->type);
}
tdAppendColVal(dataRow, val2, pCol->type, pCol->offset);
if (!isNull(val2, pCol->type)) {
kvLen += tdGetColAppendLen(SMEM_ROW_KV, val2, pCol->type);
setSColInfo(&colInfo, pCol->colId, pCol->type, val2);
taosArrayPush(stashRow, &colInfo);
}
++i; // next col
}
dataLen = memRowTLen(pRow);
if (kvLen < dataLen) {
// scan stashRow and generate SKVRow
memset(buffer, 0, sizeof(dataLen));
SMemRow tRow = buffer;
memRowSetType(tRow, SMEM_ROW_KV);
SKVRow kvRow = (SKVRow)memRowKvBody(tRow);
int16_t nKvNCols = (int16_t) taosArrayGetSize(stashRow);
kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nKvNCols));
kvRowSetNCols(kvRow, nKvNCols);
memRowSetKvVersion(tRow, pSchema1->version);
int32_t toffset = 0;
int16_t k;
for (k = 0; k < nKvNCols; ++k) {
SColInfo *pColInfo = taosArrayGet(stashRow, k);
tdAppendKvColVal(kvRow, pColInfo->colVal, pColInfo->colId, pColInfo->colType, &toffset);
}
ASSERT(kvLen == memRowTLen(tRow));
}
taosArrayDestroy(stashRow);
return buffer;
}
......@@ -409,7 +409,7 @@ bool isValidDataType(int32_t type) {
return type >= TSDB_DATA_TYPE_NULL && type <= TSDB_DATA_TYPE_UBIGINT;
}
void setVardataNull(char* val, int32_t type) {
void setVardataNull(void* val, int32_t type) {
if (type == TSDB_DATA_TYPE_BINARY) {
varDataSetLen(val, sizeof(int8_t));
*(uint8_t*) varDataVal(val) = TSDB_DATA_BINARY_NULL;
......@@ -421,75 +421,75 @@ void setVardataNull(char* val, int32_t type) {
}
}
void setNull(char *val, int32_t type, int32_t bytes) { setNullN(val, type, bytes, 1); }
void setNull(void *val, int32_t type, int32_t bytes) { setNullN(val, type, bytes, 1); }
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) {
void setNullN(void *val, int32_t type, int32_t bytes, int32_t numOfElems) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
for (int32_t i = 0; i < numOfElems; ++i) {
*(uint8_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_BOOL_NULL;
*(uint8_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_BOOL_NULL;
}
break;
case TSDB_DATA_TYPE_TINYINT:
for (int32_t i = 0; i < numOfElems; ++i) {
*(uint8_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_TINYINT_NULL;
*(uint8_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_TINYINT_NULL;
}
break;
case TSDB_DATA_TYPE_SMALLINT:
for (int32_t i = 0; i < numOfElems; ++i) {
*(uint16_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_SMALLINT_NULL;
*(uint16_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_SMALLINT_NULL;
}
break;
case TSDB_DATA_TYPE_INT:
for (int32_t i = 0; i < numOfElems; ++i) {
*(uint32_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_INT_NULL;
*(uint32_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_INT_NULL;
}
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
for (int32_t i = 0; i < numOfElems; ++i) {
*(uint64_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_BIGINT_NULL;
*(uint64_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_BIGINT_NULL;
}
break;
case TSDB_DATA_TYPE_UTINYINT:
for (int32_t i = 0; i < numOfElems; ++i) {
*(uint8_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_UTINYINT_NULL;
*(uint8_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_UTINYINT_NULL;
}
break;
case TSDB_DATA_TYPE_USMALLINT:
for (int32_t i = 0; i < numOfElems; ++i) {
*(uint16_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_USMALLINT_NULL;
*(uint16_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_USMALLINT_NULL;
}
break;
case TSDB_DATA_TYPE_UINT:
for (int32_t i = 0; i < numOfElems; ++i) {
*(uint32_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_UINT_NULL;
*(uint32_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_UINT_NULL;
}
break;
case TSDB_DATA_TYPE_UBIGINT:
for (int32_t i = 0; i < numOfElems; ++i) {
*(uint64_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_UBIGINT_NULL;
*(uint64_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_UBIGINT_NULL;
}
break;
case TSDB_DATA_TYPE_FLOAT:
for (int32_t i = 0; i < numOfElems; ++i) {
*(uint32_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_FLOAT_NULL;
*(uint32_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_FLOAT_NULL;
}
break;
case TSDB_DATA_TYPE_DOUBLE:
for (int32_t i = 0; i < numOfElems; ++i) {
*(uint64_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_DOUBLE_NULL;
*(uint64_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_DOUBLE_NULL;
}
break;
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY:
for (int32_t i = 0; i < numOfElems; ++i) {
setVardataNull(val + i * bytes, type);
setVardataNull(POINTER_SHIFT(val, i * bytes), type);
}
break;
default: {
for (int32_t i = 0; i < numOfElems; ++i) {
*(uint32_t *)(val + i * tDataTypes[TSDB_DATA_TYPE_INT].bytes) = TSDB_DATA_INT_NULL;
*(uint32_t *)(POINTER_SHIFT(val, i * tDataTypes[TSDB_DATA_TYPE_INT].bytes)) = TSDB_DATA_INT_NULL;
}
break;
}
......
......@@ -307,7 +307,7 @@ do { \
#define TSDB_DEFAULT_WAL_LEVEL 1
#define TSDB_MIN_DB_UPDATE 0
#define TSDB_MAX_DB_UPDATE 1
#define TSDB_MAX_DB_UPDATE 2
#define TSDB_DEFAULT_DB_UPDATE_OPTION 0
#define TSDB_MIN_DB_CACHE_LAST_ROW 0
......@@ -435,6 +435,12 @@ typedef enum {
TSDB_CHECK_ITEM_MAX
} ECheckItemType;
typedef enum {
TD_ROW_DISCARD_UPDATE = 0,
TD_ROW_OVERWRITE_UPDATE = 1,
TD_ROW_PARTIAL_UPDATE = 2
} TDUpdateConfig;
extern char *qtypeStr[];
#ifdef __cplusplus
......
......@@ -111,7 +111,7 @@ typedef struct {
uint64_t superUid;
STSchema * schema;
STSchema * tagSchema;
SDataRow tagValues;
SKVRow tagValues;
char * sql;
} STableCfg;
......
......@@ -141,7 +141,7 @@ typedef struct {
#define IS_VALID_FLOAT(_t) ((_t) >= -FLT_MAX && (_t) <= FLT_MAX)
#define IS_VALID_DOUBLE(_t) ((_t) >= -DBL_MAX && (_t) <= DBL_MAX)
static FORCE_INLINE bool isNull(const char *val, int32_t type) {
static FORCE_INLINE bool isNull(const void *val, int32_t type) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
return *(uint8_t *)val == TSDB_DATA_BOOL_NULL;
......@@ -193,9 +193,9 @@ extern tDataTypeDescriptor tDataTypes[15];
bool isValidDataType(int32_t type);
void setVardataNull(char* val, int32_t type);
void setNull(char *val, int32_t type, int32_t bytes);
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems);
void setVardataNull(void* val, int32_t type);
void setNull(void *val, int32_t type, int32_t bytes);
void setNullN(void *val, int32_t type, int32_t bytes, int32_t numOfElems);
const void *getNullValue(int32_t type);
void assignVal(char *val, const char *src, int32_t len, int32_t type);
......
......@@ -40,7 +40,7 @@ void tsdbFreeBufPool(STsdbBufPool* pBufPool);
int tsdbOpenBufPool(STsdbRepo* pRepo);
void tsdbCloseBufPool(STsdbRepo* pRepo);
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
int tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks);
int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks);
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode);
#endif /* _TD_TSDB_BUFFER_H_ */
\ No newline at end of file
#endif /* _TD_TSDB_BUFFER_H_ */
......@@ -16,6 +16,13 @@
#ifndef _TD_TSDB_READ_IMPL_H_
#define _TD_TSDB_READ_IMPL_H_
#include "tfs.h"
#include "tsdb.h"
#include "os.h"
#include "tsdbFile.h"
#include "tskiplist.h"
#include "tsdbMeta.h"
typedef struct SReadH SReadH;
typedef struct {
......@@ -143,4 +150,4 @@ static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
return 0;
}
#endif /*_TD_TSDB_READ_IMPL_H_*/
\ No newline at end of file
#endif /*_TD_TSDB_READ_IMPL_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TSDB_ROW_MERGE_BUF_H
#define TSDB_ROW_MERGE_BUF_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tsdb.h"
#include "tchecksum.h"
#include "tsdbReadImpl.h"
typedef void* SMergeBuf;
SDataRow tsdbMergeTwoRows(SMergeBuf *pBuf, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2);
static FORCE_INLINE int tsdbMergeBufMakeSureRoom(SMergeBuf *pBuf, STSchema* pSchema1, STSchema* pSchema2) {
return tsdbMakeRoom(pBuf, MAX(dataRowMaxBytesFromSchema(pSchema1), dataRowMaxBytesFromSchema(pSchema2)));
}
static FORCE_INLINE void tsdbFreeMergeBuf(SMergeBuf buf) {
taosTZfree(buf);
}
#ifdef __cplusplus
}
#endif
#endif /* ifndef TSDB_ROW_MERGE_BUF_H */
......@@ -68,6 +68,8 @@ extern "C" {
#include "tsdbCompact.h"
// Commit Queue
#include "tsdbCommitQueue.h"
#include "tsdbRowMergeBuf.h"
// Main definitions
struct STsdbRepo {
uint8_t state;
......@@ -92,6 +94,7 @@ struct STsdbRepo {
pthread_mutex_t mutex;
bool repoLocked;
int32_t code; // Commit code
SMergeBuf mergeBuf; //used when update=2
bool inCompact; // is in compact process?
};
......@@ -139,4 +142,4 @@ static FORCE_INLINE int tsdbGetNextMaxTables(int tid) {
}
#endif
#endif /* _TD_TSDB_INT_H_ */
\ No newline at end of file
#endif /* _TD_TSDB_INT_H_ */
......@@ -159,7 +159,7 @@ _err:
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
int tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) {
int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) {
if (oldTotalBlocks == pRepo->config.totalBlocks) {
return TSDB_CODE_SUCCESS;
}
......@@ -199,4 +199,4 @@ void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) {
tsdbFreeBufBlock(pBufBlock);
free(pNode);
pPool->nBufBlocks--;
}
\ No newline at end of file
}
......@@ -1290,7 +1290,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget);
tdAppendMemRowToDataCol(row, pSchema, pTarget, true);
}
tSkipListIterNext(pCommitIter->pIter);
......@@ -1302,7 +1302,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
ASSERT(pSchema != NULL);
}
tdAppendMemRowToDataCol(row, pSchema, pTarget);
tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE);
}
} else {
ASSERT(!isRowDel);
......
......@@ -138,7 +138,7 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2,
pSaveCfg->totalBlocks, oldCfg.cacheLastRow, pSaveCfg->cacheLastRow, oldTotalBlocks, pSaveCfg->totalBlocks);
int err = tsdbExpendPool(pRepo, oldTotalBlocks);
int err = tsdbExpandPool(pRepo, oldTotalBlocks);
if (!TAOS_SUCCEEDED(err)) {
tsdbError("vgId:%d expand pool from %d to %d fail,reason:%s",
REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err));
......
......@@ -455,7 +455,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
if (pReadh->pDCols[0]->numOfRows - ridx == 0) break;
int rowsToMerge = MIN(pReadh->pDCols[0]->numOfRows - ridx, defaultRows - pComph->pDataCols->numOfRows);
tdMergeDataCols(pComph->pDataCols, pReadh->pDCols[0], rowsToMerge, &ridx);
tdMergeDataCols(pComph->pDataCols, pReadh->pDCols[0], rowsToMerge, &ridx, pCfg->update != TD_ROW_PARTIAL_UPDATE);
if (pComph->pDataCols->numOfRows < defaultRows) {
break;
......
......@@ -14,6 +14,7 @@
*/
// no test file errors here
#include "taosdef.h"
#include "tsdbint.h"
#define IS_VALID_PRECISION(precision) \
......@@ -106,6 +107,8 @@ STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
return NULL;
}
pRepo->mergeBuf = NULL;
tsdbStartStream(pRepo);
tsdbDebug("vgId:%d, TSDB repository opened", REPO_ID(pRepo));
......@@ -518,7 +521,8 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
}
// update check
if (pCfg->update != 0) pCfg->update = 1;
if (pCfg->update < TD_ROW_DISCARD_UPDATE || pCfg->update > TD_ROW_PARTIAL_UPDATE)
pCfg->update = TD_ROW_DISCARD_UPDATE;
// update cacheLastRow
if (pCfg->cacheLastRow != 0) {
......@@ -597,6 +601,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) {
tsdbFreeFS(pRepo->fs);
tsdbFreeBufPool(pRepo->pPool);
tsdbFreeMeta(pRepo->tsdbMeta);
tsdbFreeMergeBuf(pRepo->mergeBuf);
// tsdbFreeMemTable(pRepo->mem);
// tsdbFreeMemTable(pRepo->imem);
tsem_destroy(&(pRepo->readyToCommit));
......
......@@ -13,7 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdataformat.h"
#include "tfunctional.h"
#include "tsdbint.h"
#include "tskiplist.h"
#include "tsdbRowMergeBuf.h"
#define TSDB_DATA_SKIPLIST_LEVEL 5
#define TSDB_MAX_INSERT_BATCH 512
......@@ -30,24 +34,22 @@ typedef struct {
void * pMsg;
} SSubmitMsgIter;
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
static void tsdbFreeMemTable(SMemTable *pMemTable);
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void tsdbFreeTableData(STableData *pTableData);
static char * tsdbGetTsTupleKey(const void *data);
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
static void tsdbFreeMemTable(SMemTable *pMemTable);
static STableData* tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void tsdbFreeTableData(STableData *pTableData);
static char * tsdbGetTsTupleKey(const void *data);
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row);
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
static SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
static SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg);
static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows);
static int tsdbCopyRowToMem(STsdbRepo *pRepo, SMemRow row, STable *pTable, void **ppRow);
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter);
static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter);
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row);
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row);
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now);
......@@ -342,7 +344,7 @@ int tsdbSyncCommit(STsdbRepo *repo) {
* 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
* 4. operations in pCols not exceeds its max capacity if pCols is given
*
* The function tries to procceed AS MUSH AS POSSIBLE.
* The function tries to procceed AS MUCH AS POSSIBLE.
*/
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) {
......@@ -523,9 +525,15 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
pTableData->keyLast = 0;
pTableData->numOfRows = 0;
uint8_t skipListCreateFlags;
if(pCfg->update == TD_ROW_DISCARD_UPDATE)
skipListCreateFlags = SL_DISCARD_DUP_KEY;
else
skipListCreateFlags = SL_UPDATE_DUP_KEY;
pTableData->pData =
tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP],
tkeyComparFn, pCfg->update ? SL_UPDATE_DUP_KEY : SL_DISCARD_DUP_KEY, tsdbGetTsTupleKey);
tkeyComparFn, skipListCreateFlags, tsdbGetTsTupleKey);
if (pTableData->pData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
free(pTableData);
......@@ -581,7 +589,7 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema *
}
}
tdAppendMemRowToDataCol(row, *ppSchema, pCols);
tdAppendMemRowToDataCol(row, *ppSchema, pCols, true);
}
return 0;
......@@ -693,95 +701,162 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
return 0;
}
static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows) {
STsdbMeta * pMeta = pRepo->tsdbMeta;
int64_t points = 0;
STable * pTable = NULL;
SSubmitBlkIter blkIter = {0};
SMemRow row = NULL;
void * rows[TSDB_MAX_INSERT_BATCH] = {0};
int rowCounter = 0;
//row1 has higher priority
static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRepo, STSchema **ppSchema1, STSchema **ppSchema2, STable* pTable, int32_t* pAffectedRows, int64_t* pPoints, SMemRow* pLastRow) {
//for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows!
if(row1 == NULL && row2 == NULL && pRepo->config.update == TD_ROW_DISCARD_UPDATE) {
(*pAffectedRows)++;
(*pPoints)++;
return NULL;
}
ASSERT(pBlock->tid < pMeta->maxTables);
pTable = pMeta->tables[pBlock->tid];
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
if(row2 == NULL || pRepo->config.update != TD_ROW_PARTIAL_UPDATE) {
void* pMem = tsdbAllocBytes(pRepo, memRowTLen(row1));
if(pMem == NULL) return NULL;
memRowCpy(pMem, row1);
(*pAffectedRows)++;
(*pPoints)++;
*pLastRow = pMem;
return pMem;
}
tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCopyRowToMem(pRepo, row, pTable, &(rows[rowCounter])) < 0) {
tsdbFreeRows(pRepo, rows, rowCounter);
goto _err;
STSchema *pSchema1 = *ppSchema1;
STSchema *pSchema2 = *ppSchema2;
SMergeBuf * pBuf = &pRepo->mergeBuf;
int dv1 = memRowVersion(row1);
int dv2 = memRowVersion(row2);
if(pSchema1 == NULL || schemaVersion(pSchema1) != dv1) {
if(pSchema2 != NULL && schemaVersion(pSchema2) == dv1) {
*ppSchema1 = pSchema2;
} else {
*ppSchema1 = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row1));
}
pSchema1 = *ppSchema1;
}
(*affectedrows)++;
points++;
if (rows[rowCounter] != NULL) {
rowCounter++;
if(pSchema2 == NULL || schemaVersion(pSchema2) != dv2) {
if(schemaVersion(pSchema1) == dv2) {
pSchema2 = pSchema1;
} else {
*ppSchema2 = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row2));
pSchema2 = *ppSchema2;
}
}
if (rowCounter == TSDB_MAX_INSERT_BATCH) {
if (tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) {
goto _err;
}
SMemRow tmp = tsdbMergeTwoRows(pBuf, row1, row2, pSchema1, pSchema2);
rowCounter = 0;
memset(rows, 0, sizeof(rows));
}
}
void* pMem = tsdbAllocBytes(pRepo, memRowTLen(tmp));
if(pMem == NULL) return NULL;
memRowCpy(pMem, tmp);
if (rowCounter > 0 && tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) {
goto _err;
}
(*pAffectedRows)++;
(*pPoints)++;
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion);
pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
pRepo->stat.totalStorage += points * schemaVLen(pSchema);
*pLastRow = pMem;
return pMem;
}
return 0;
static void* tsdbInsertDupKeyMergePacked(void** args) {
return tsdbInsertDupKeyMerge(args[0], args[1], args[2], (STSchema**)&args[3], (STSchema**)&args[4], args[5], args[6], args[7], args[8]);
}
_err:
return -1;
static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STable *pTable, int32_t* pAffectedRows, int64_t* pPoints, SMemRow* pLastRow) {
if(pSkipList->insertHandleFn == NULL) {
tGenericSavedFunc *dupHandleSavedFunc = genericSavedFuncInit((GenericVaFunc)&tsdbInsertDupKeyMergePacked, 9);
dupHandleSavedFunc->args[2] = pRepo;
dupHandleSavedFunc->args[3] = NULL;
dupHandleSavedFunc->args[4] = NULL;
dupHandleSavedFunc->args[5] = pTable;
dupHandleSavedFunc->args[6] = pAffectedRows;
dupHandleSavedFunc->args[7] = pPoints;
dupHandleSavedFunc->args[8] = pLastRow;
pSkipList->insertHandleFn = dupHandleSavedFunc;
}
}
static int tsdbCopyRowToMem(STsdbRepo *pRepo, SMemRow row, STable *pTable, void **ppRow) {
STsdbCfg * pCfg = &pRepo->config;
TKEY tkey = memRowTKey(row);
TSKEY key = memRowKey(row);
bool isRowDelete = TKEY_IS_DELETED(tkey);
static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *pAffectedRows) {
if (isRowDelete) {
if (!pCfg->update) {
tsdbWarn("vgId:%d vnode is not allowed to update but try to delete a data row", REPO_ID(pRepo));
terrno = TSDB_CODE_TDB_INVALID_ACTION;
STsdbMeta *pMeta = pRepo->tsdbMeta;
int64_t points = 0;
STable *pTable = NULL;
SSubmitBlkIter blkIter = {0};
SMemTable *pMemTable = NULL;
STableData *pTableData = NULL;
STsdbCfg *pCfg = &(pRepo->config);
tsdbInitSubmitBlkIter(pBlock, &blkIter);
if(blkIter.row == NULL) return 0;
TSKEY firstRowKey = memRowKey(blkIter.row);
tsdbAllocBytes(pRepo, 0);
pMemTable = pRepo->mem;
ASSERT(pMemTable != NULL);
ASSERT(pBlock->tid < pMeta->maxTables);
pTable = pMeta->tables[pBlock->tid];
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
if (TABLE_TID(pTable) >= pMemTable->maxTables) {
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
return -1;
}
}
pTableData = pMemTable->tData[TABLE_TID(pTable)];
if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
if (pTableData != NULL) {
taosWLockLatch(&(pMemTable->latch));
pMemTable->tData[TABLE_TID(pTable)] = NULL;
tsdbFreeTableData(pTableData);
taosWUnLockLatch(&(pMemTable->latch));
}
TSKEY lastKey = tsdbGetTableLastKeyImpl(pTable);
if (key > lastKey) {
tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64,
REPO_ID(pRepo), key, lastKey);
return 0;
pTableData = tsdbNewTableData(pCfg, pTable);
if (pTableData == NULL) {
tsdbError("vgId:%d failed to insert data to table %s uid %" PRId64 " tid %d since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), tstrerror(terrno));
return -1;
}
}
void *pRow = tsdbAllocBytes(pRepo, memRowTLen(row));
if (pRow == NULL) {
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %" PRIu32 " bytes since %s",
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), memRowTLen(row), tstrerror(terrno));
return -1;
pRepo->mem->tData[TABLE_TID(pTable)] = pTableData;
}
memRowCpy(pRow, row);
ppRow[0] = pRow; // save the memory address of data rows
ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
SMemRow lastRow = NULL;
int64_t osize = SL_SIZE(pTableData->pData);
tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, pAffectedRows, &points, &lastRow);
tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tsdbGetSubmitBlkNext);
int64_t dsize = SL_SIZE(pTableData->pData) - osize;
tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
isRowDelete ? "deleted from" : "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable),
key);
if(lastRow != NULL) {
TSKEY lastRowKey = memRowKey(lastRow);
if (pMemTable->keyFirst > firstRowKey) pMemTable->keyFirst = firstRowKey;
pMemTable->numOfRows += dsize;
if (pTableData->keyFirst > firstRowKey) pTableData->keyFirst = firstRowKey;
pTableData->numOfRows += dsize;
if (pMemTable->keyLast < lastRowKey) pMemTable->keyLast = lastRowKey;
if (pTableData->keyLast < lastRowKey) pTableData->keyLast = lastRowKey;
if (tsdbUpdateTableLatestInfo(pRepo, pTable, lastRow) < 0) {
return -1;
}
}
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion);
pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
pRepo->stat.totalStorage += points * schemaVLen(pSchema);
return 0;
}
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
......@@ -889,106 +964,6 @@ static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pT
return 0;
}
static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter) {
if (rowCounter < 1) return 0;
SMemTable * pMemTable = NULL;
STableData *pTableData = NULL;
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbCfg * pCfg = &(pRepo->config);
ASSERT(pRepo->mem != NULL);
pMemTable = pRepo->mem;
if (TABLE_TID(pTable) >= pMemTable->maxTables) {
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
tsdbFreeRows(pRepo, rows, rowCounter);
return -1;
}
}
pTableData = pMemTable->tData[TABLE_TID(pTable)];
if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
if (pTableData != NULL) {
taosWLockLatch(&(pMemTable->latch));
pMemTable->tData[TABLE_TID(pTable)] = NULL;
tsdbFreeTableData(pTableData);
taosWUnLockLatch(&(pMemTable->latch));
}
pTableData = tsdbNewTableData(pCfg, pTable);
if (pTableData == NULL) {
tsdbError("vgId:%d failed to insert data to table %s uid %" PRId64 " tid %d since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), tstrerror(terrno));
tsdbFreeRows(pRepo, rows, rowCounter);
return -1;
}
pRepo->mem->tData[TABLE_TID(pTable)] = pTableData;
}
ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
int64_t osize = SL_SIZE(pTableData->pData);
tSkipListPutBatch(pTableData->pData, rows, rowCounter);
int64_t dsize = SL_SIZE(pTableData->pData) - osize;
TSKEY keyFirstRow = memRowKey(rows[0]);
TSKEY keyLastRow = memRowKey(rows[rowCounter - 1]);
if (pMemTable->keyFirst > keyFirstRow) pMemTable->keyFirst = keyFirstRow;
if (pMemTable->keyLast < keyLastRow) pMemTable->keyLast = keyLastRow;
pMemTable->numOfRows += dsize;
if (pTableData->keyFirst > keyFirstRow) pTableData->keyFirst = keyFirstRow;
if (pTableData->keyLast < keyLastRow) pTableData->keyLast = keyLastRow;
pTableData->numOfRows += dsize;
// update table latest info
if (tsdbUpdateTableLatestInfo(pRepo, pTable, rows[rowCounter - 1]) < 0) {
return -1;
}
return 0;
}
static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
ASSERT(pRepo->mem != NULL);
STsdbBufPool *pBufPool = pRepo->pPool;
for (int i = rowCounter - 1; i >= 0; --i) {
SMemRow row = (SMemRow)rows[i];
int bytes = (int)memRowTLen(row);
if (pRepo->mem->extraBuffList == NULL) {
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
ASSERT(pBufBlock != NULL && pBufBlock->offset >= bytes);
pBufBlock->offset -= bytes;
pBufBlock->remain += bytes;
ASSERT(row == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset));
tsdbTrace("vgId:%d free %d bytes to TSDB buffer pool, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
if (pBufBlock->offset == 0) { // return the block to buffer pool
if (tsdbLockRepo(pRepo) < 0) return;
SListNode *pNode = tdListPopTail(pRepo->mem->bufBlockList);
tdListPrependNode(pBufPool->bufBlockList, pNode);
if (tsdbUnlockRepo(pRepo) < 0) return;
}
} else {
ASSERT(listNEles(pRepo->mem->extraBuffList) > 0);
SListNode *pNode = tdListPopTail(pRepo->mem->extraBuffList);
ASSERT(row == pNode->data);
free(pNode);
tsdbTrace("vgId:%d free %d bytes to SYSTEM buffer pool", REPO_ID(pRepo), bytes);
if (listNEles(pRepo->mem->extraBuffList) == 0) {
tdListFree(pRepo->mem->extraBuffList);
pRepo->mem->extraBuffList = NULL;
}
}
}
}
static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow row) {
tsdbDebug("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data,
......@@ -1005,8 +980,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
}
SDataCol *pLatestCols = pTable->lastCols;
int32_t kvIdx = 0;
bool isDataRow = isDataRow(row);
for (int16_t j = 0; j < schemaNCols(pSchema); j++) {
STColumn *pTCol = schemaColAt(pSchema, j);
// ignore not exist colId
......@@ -1017,16 +992,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
void *value = NULL;
if (isDataRow) {
value = tdGetRowDataOfCol(memRowDataBody(row), (int8_t)pTCol->type,
TD_DATA_ROW_HEAD_SIZE + pTCol->offset);
} else {
// SKVRow
SColIdx *pColIdx = tdGetKVRowIdxOfCol(memRowKvBody(row), pTCol->colId);
if (pColIdx) {
value = tdGetKvRowDataOfCol(memRowKvBody(row), pColIdx->offset);
}
}
value = tdGetMemRowDataOfColEx(row, pTCol->colId, (int8_t)pTCol->type,
TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset, &kvIdx);
if ((value == NULL) || isNull(value, pTCol->type)) {
continue;
......@@ -1055,13 +1022,14 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r
// if cacheLastRow config has been reset, free the lastRow
if (!pCfg->cacheLastRow && pTable->lastRow != NULL) {
taosTZfree(pTable->lastRow);
SMemRow cachedLastRow = pTable->lastRow;
TSDB_WLOCK_TABLE(pTable);
pTable->lastRow = NULL;
TSDB_WUNLOCK_TABLE(pTable);
taosTZfree(cachedLastRow);
}
if (tsdbGetTableLastKeyImpl(pTable) < memRowKey(row)) {
if (tsdbGetTableLastKeyImpl(pTable) <= memRowKey(row)) {
if (CACHE_LAST_ROW(pCfg) || pTable->lastRow != NULL) {
SMemRow nrow = pTable->lastRow;
if (taosTSizeof(nrow) < memRowTLen(row)) {
......
此差异已折叠。
......@@ -244,6 +244,7 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
ASSERT(pBlock->numOfSubBlocks > 0);
int8_t update = pReadh->pRepo->config.update;
SBlock *iBlock = pBlock;
if (pBlock->numOfSubBlocks > 1) {
......@@ -258,7 +259,7 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
iBlock++;
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1;
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL) < 0) return -1;
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, update != TD_ROW_PARTIAL_UPDATE) < 0) return -1;
}
ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows);
......@@ -270,6 +271,7 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int16_t *colIds, int numOfColsIds) {
ASSERT(pBlock->numOfSubBlocks > 0);
int8_t update = pReadh->pRepo->config.update;
SBlock *iBlock = pBlock;
if (pBlock->numOfSubBlocks > 1) {
......@@ -284,7 +286,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
iBlock++;
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1;
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL) < 0) return -1;
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, update != TD_ROW_PARTIAL_UPDATE) < 0) return -1;
}
ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows);
......@@ -657,4 +659,4 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
}
return 0;
}
\ No newline at end of file
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbRowMergeBuf.h"
#include "tdataformat.h"
// row1 has higher priority
SMemRow tsdbMergeTwoRows(SMergeBuf *pBuf, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2) {
if(row2 == NULL) return row1;
if(row1 == NULL) return row2;
ASSERT(pSchema1->version == memRowVersion(row1));
ASSERT(pSchema2->version == memRowVersion(row2));
if(tsdbMergeBufMakeSureRoom(pBuf, pSchema1, pSchema2) < 0) {
return NULL;
}
return mergeTwoMemRows(*pBuf, row1, row2, pSchema1, pSchema2);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TD_TFUNCTIONAL_H
#define TD_TFUNCTIONAL_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
//TODO: hard to use, trying to rewrite it using va_list
typedef void* (*GenericVaFunc)(void* args[]);
typedef int32_t (*I32VaFunc) (void* args[]);
typedef void (*VoidVaFunc) (void* args[]);
typedef struct GenericSavedFunc {
GenericVaFunc func;
void * args[];
} tGenericSavedFunc;
typedef struct I32SavedFunc {
I32VaFunc func;
void * args[];
} tI32SavedFunc;
typedef struct VoidSavedFunc {
VoidVaFunc func;
void * args[];
} tVoidSavedFunc;
tGenericSavedFunc* genericSavedFuncInit(GenericVaFunc func, int numOfArgs);
tI32SavedFunc* i32SavedFuncInit(I32VaFunc func, int numOfArgs);
tVoidSavedFunc* voidSavedFuncInit(VoidVaFunc func, int numOfArgs);
void* genericInvoke(tGenericSavedFunc* const pSavedFunc);
int32_t i32Invoke(tI32SavedFunc* const pSavedFunc);
void voidInvoke(tVoidSavedFunc* const pSavedFunc);
#ifdef __cplusplus
}
#endif
#endif
......@@ -23,6 +23,7 @@ extern "C" {
#include "os.h"
#include "taosdef.h"
#include "tarray.h"
#include "tfunctional.h"
#define MAX_SKIP_LIST_LEVEL 15
#define SKIP_LIST_RECORD_PERFORMANCE 0
......@@ -30,13 +31,17 @@ extern "C" {
// For key property setting
#define SL_ALLOW_DUP_KEY (uint8_t)0x0 // Allow duplicate key exists (for tag index usage)
#define SL_DISCARD_DUP_KEY (uint8_t)0x1 // Discard duplicate key (for data update=0 case)
#define SL_UPDATE_DUP_KEY (uint8_t)0x2 // Update duplicate key by remove/insert (for data update=1 case)
#define SL_UPDATE_DUP_KEY (uint8_t)0x2 // Update duplicate key by remove/insert (for data update!=0 case)
// For thread safety setting
#define SL_THREAD_SAFE (uint8_t)0x4
typedef char *SSkipListKey;
typedef char *(*__sl_key_fn_t)(const void *);
typedef void (*sl_patch_row_fn_t)(void * pDst, const void * pSrc);
typedef void* (*iter_next_fn_t)(void *iter);
typedef struct SSkipListNode {
uint8_t level;
void * pData;
......@@ -95,6 +100,12 @@ typedef struct tSkipListState {
uint64_t nTotalElapsedTimeForInsert;
} tSkipListState;
typedef enum {
SSkipListPutSuccess = 0,
SSkipListPutEarlyStop = 1,
SSkipListPutSkipOne = 2
} SSkipListPutStatus;
typedef struct SSkipList {
unsigned int seed;
__compar_fn_t comparFn;
......@@ -111,6 +122,7 @@ typedef struct SSkipList {
#if SKIP_LIST_RECORD_PERFORMANCE
tSkipListState state; // skiplist state
#endif
tGenericSavedFunc* insertHandleFn;
} SSkipList;
typedef struct SSkipListIterator {
......@@ -118,7 +130,7 @@ typedef struct SSkipListIterator {
SSkipListNode *cur;
int32_t step; // the number of nodes that have been checked already
int32_t order; // order of the iterator
SSkipListNode *next; // next points to the true qualified node in skip list
SSkipListNode *next; // next points to the true qualified node in skiplist
} SSkipListIterator;
#define SL_IS_THREAD_SAFE(s) (((s)->flags) & SL_THREAD_SAFE)
......@@ -132,7 +144,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _
__sl_key_fn_t fn);
void tSkipListDestroy(SSkipList *pSkipList);
SSkipListNode * tSkipListPut(SSkipList *pSkipList, void *pData);
void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata);
void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t iterate);
SArray * tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey);
void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel);
SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tfunctional.h"
#include "tarray.h"
tGenericSavedFunc* genericSavedFuncInit(GenericVaFunc func, int numOfArgs) {
tGenericSavedFunc* pSavedFunc = malloc(sizeof(tGenericSavedFunc) + numOfArgs * (sizeof(void*)));
pSavedFunc->func = func;
return pSavedFunc;
}
tI32SavedFunc* i32SavedFuncInit(I32VaFunc func, int numOfArgs) {
tI32SavedFunc* pSavedFunc = malloc(sizeof(tI32SavedFunc) + numOfArgs * sizeof(void *));
pSavedFunc->func = func;
return pSavedFunc;
}
tVoidSavedFunc* voidSavedFuncInit(VoidVaFunc func, int numOfArgs) {
tVoidSavedFunc* pSavedFunc = malloc(sizeof(tVoidSavedFunc) + numOfArgs * sizeof(void*));
pSavedFunc->func = func;
return pSavedFunc;
}
FORCE_INLINE void* genericInvoke(tGenericSavedFunc* const pSavedFunc) {
return pSavedFunc->func(pSavedFunc->args);
}
FORCE_INLINE int32_t i32Invoke(tI32SavedFunc* const pSavedFunc) {
return pSavedFunc->func(pSavedFunc->args);
}
FORCE_INLINE void voidInvoke(tVoidSavedFunc* const pSavedFunc) {
if(pSavedFunc) pSavedFunc->func(pSavedFunc->args);
}
......@@ -16,6 +16,7 @@
#include "tskiplist.h"
#include "os.h"
#include "tcompare.h"
#include "tdataformat.h"
#include "tulog.h"
#include "tutil.h"
......@@ -31,6 +32,7 @@ static SSkipListNode *tSkipListNewNode(uint8_t level);
static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipListNode **direction, bool isForward,
bool hasDup);
static FORCE_INLINE int tSkipListWLock(SSkipList *pSkipList);
static FORCE_INLINE int tSkipListRLock(SSkipList *pSkipList);
static FORCE_INLINE int tSkipListUnlock(SSkipList *pSkipList);
......@@ -80,6 +82,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _
#if SKIP_LIST_RECORD_PERFORMANCE
pSkipList->state.nTotalMemSize += sizeof(SSkipList);
#endif
pSkipList->insertHandleFn = NULL;
return pSkipList;
}
......@@ -97,6 +100,8 @@ void tSkipListDestroy(SSkipList *pSkipList) {
tSkipListFreeNode(pTemp);
}
tfree(pSkipList->insertHandleFn);
tSkipListUnlock(pSkipList);
if (pSkipList->lock != NULL) {
pthread_rwlock_destroy(pSkipList->lock);
......@@ -124,8 +129,7 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) {
return pNode;
}
// Put a batch of data into skiplist. The batch of data must be in ascending order
void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) {
void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t iterate) {
SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0};
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
bool hasDup = false;
......@@ -135,17 +139,21 @@ void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) {
tSkipListWLock(pSkipList);
void* pData = iterate(iter);
if(pData == NULL) return;
// backward to put the first data
hasDup = tSkipListGetPosToPut(pSkipList, backward, ppData[0]);
tSkipListPutImpl(pSkipList, ppData[0], backward, false, hasDup);
hasDup = tSkipListGetPosToPut(pSkipList, backward, pData);
tSkipListPutImpl(pSkipList, pData, backward, false, hasDup);
for (int level = 0; level < pSkipList->maxLevel; level++) {
forward[level] = SL_NODE_GET_BACKWARD_POINTER(backward[level], level);
}
// forward to put the rest of data
for (int idata = 1; idata < ndata; idata++) {
pDataKey = pSkipList->keyFn(ppData[idata]);
while ((pData = iterate(iter)) != NULL) {
pDataKey = pSkipList->keyFn(pData);
hasDup = false;
// Compare max key
......@@ -186,9 +194,8 @@ void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) {
}
}
tSkipListPutImpl(pSkipList, ppData[idata], forward, true, hasDup);
tSkipListPutImpl(pSkipList, pData, forward, true, hasDup);
}
tSkipListUnlock(pSkipList);
}
......@@ -661,18 +668,40 @@ static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipL
uint8_t dupMode = SL_DUP_MODE(pSkipList);
SSkipListNode *pNode = NULL;
if (hasDup && (dupMode == SL_DISCARD_DUP_KEY || dupMode == SL_UPDATE_DUP_KEY)) {
if (hasDup && (dupMode != SL_ALLOW_DUP_KEY)) {
if (dupMode == SL_UPDATE_DUP_KEY) {
if (isForward) {
pNode = SL_NODE_GET_FORWARD_POINTER(direction[0], 0);
} else {
pNode = SL_NODE_GET_BACKWARD_POINTER(direction[0], 0);
}
atomic_store_ptr(&(pNode->pData), pData);
if (pSkipList->insertHandleFn) {
pSkipList->insertHandleFn->args[0] = pData;
pSkipList->insertHandleFn->args[1] = pNode->pData;
pData = genericInvoke(pSkipList->insertHandleFn);
}
if(pData) {
atomic_store_ptr(&(pNode->pData), pData);
}
} else {
//for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows!
if(pSkipList->insertHandleFn) {
pSkipList->insertHandleFn->args[0] = NULL;
pSkipList->insertHandleFn->args[1] = NULL;
genericInvoke(pSkipList->insertHandleFn);
}
}
} else {
pNode = tSkipListNewNode(getSkipListRandLevel(pSkipList));
if (pNode != NULL) {
// insertHandleFn will be assigned only for timeseries data,
// in which case, pData is pointed to an memory to be freed later;
// while for metadata, the mem alloc will not be called.
if (pSkipList->insertHandleFn) {
pSkipList->insertHandleFn->args[0] = pData;
pSkipList->insertHandleFn->args[1] = NULL;
pData = genericInvoke(pSkipList->insertHandleFn);
}
pNode->pData = pData;
tSkipListDoInsert(pSkipList, direction, pNode, isForward);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册