diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 9dc541b1a6efce68c4f27fc4cbaed7e8c5b542a8..1c610b67a56440a7d99ae698366fbbf5f5db6488 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1864,8 +1864,8 @@ static SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { while (i < nColsBound) { int16_t colId = payloadColId(p); uint8_t colType = payloadColType(p); - tdAppendKvColVal(kvRow, POINTER_SHIFT(pVals,payloadColOffset(p)), colId, colType, toffset); - toffset += sizeof(SColIdx); + tdAppendKvColVal(kvRow, POINTER_SHIFT(pVals,payloadColOffset(p)), colId, colType, &toffset); + //toffset += sizeof(SColIdx); p = payloadNextCol(p); ++i; } diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index f8394dc2718acf15369ff36fa9cd61f67b2ebc74..47bd8a72b208fd09ef85511ccc6ef8e82e3f354a 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -232,6 +232,83 @@ static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t o } } +static FORCE_INLINE void *tdGetPtrToCol(SDataRow row, STSchema *pSchema, int idx) { + return POINTER_SHIFT(row, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[idx].offset); +} + +static FORCE_INLINE void *tdGetColOfRowBySchema(SDataRow row, STSchema *pSchema, int idx) { + int16_t offset = TD_DATA_ROW_HEAD_SIZE + pSchema->columns[idx].offset; + int8_t type = pSchema->columns[idx].type; + + return tdGetRowDataOfCol(row, type, offset); +} + +static FORCE_INLINE bool tdIsColOfRowNullBySchema(SDataRow row, STSchema *pSchema, int idx) { + int16_t offset = TD_DATA_ROW_HEAD_SIZE + pSchema->columns[idx].offset; + int8_t type = pSchema->columns[idx].type; + + return isNull(tdGetRowDataOfCol(row, type, offset), type); +} + +static FORCE_INLINE void tdSetColOfRowNullBySchema(SDataRow row, STSchema *pSchema, int idx) { + int16_t offset = TD_DATA_ROW_HEAD_SIZE + pSchema->columns[idx].offset; + int8_t type = pSchema->columns[idx].type; + int16_t bytes = pSchema->columns[idx].bytes; + + setNull(tdGetRowDataOfCol(row, type, offset), type, bytes); +} + +static FORCE_INLINE void tdCopyColOfRowBySchema(SDataRow dst, STSchema *pDstSchema, int dstIdx, SDataRow src, STSchema *pSrcSchema, int srcIdx) { + int8_t type = pDstSchema->columns[dstIdx].type; + ASSERT(type == pSrcSchema->columns[srcIdx].type); + void *pData = tdGetPtrToCol(dst, pDstSchema, dstIdx); + void *value = tdGetPtrToCol(src, pSrcSchema, srcIdx); + + switch (type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + *(VarDataOffsetT *)pData = *(VarDataOffsetT *)value; + pData = POINTER_SHIFT(dst, *(VarDataOffsetT *)pData); + value = POINTER_SHIFT(src, *(VarDataOffsetT *)value); + memcpy(pData, value, varDataTLen(value)); + break; + case TSDB_DATA_TYPE_NULL: + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT: + *(uint8_t *)pData = *(uint8_t *)value; + break; + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT: + *(uint16_t *)pData = *(uint16_t *)value; + break; + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: + *(uint32_t *)pData = *(uint32_t *)value; + break; + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: + *(uint64_t *)pData = *(uint64_t *)value; + break; + case TSDB_DATA_TYPE_FLOAT: + SET_FLOAT_PTR(pData, value); + break; + case TSDB_DATA_TYPE_DOUBLE: + SET_DOUBLE_PTR(pData, value); + break; + case TSDB_DATA_TYPE_TIMESTAMP: + if (pSrcSchema->columns[srcIdx].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + *(TSKEY *)pData = tdGetKey(*(TKEY *)value); + } else { + *(TSKEY *)pData = *(TSKEY *)value; + } + break; + default: + memcpy(pData, value, pSrcSchema->columns[srcIdx].bytes); + } +} + + // ----------------- Data column structure typedef struct SDataCol { int8_t type; // column type @@ -335,7 +412,7 @@ void tdResetDataCols(SDataCols *pCols); int tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); SDataCols *tdFreeDataCols(SDataCols *pCols); -int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset); +int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull); // ----------------- K-V data row structure /* |<-------------------------------------- len -------------------------------------------->| @@ -398,9 +475,9 @@ static FORCE_INLINE void *tdGetKVRowIdxOfCol(SKVRow row, int16_t colId) { } // offset here not include kvRow header length -static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t colId, int8_t type, int32_t offset) { +static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t colId, int8_t type, int32_t *offset) { ASSERT(value != NULL); - int32_t toffset = offset + TD_KV_ROW_HEAD_SIZE; + int32_t toffset = *offset + TD_KV_ROW_HEAD_SIZE; SColIdx *pColIdx = (SColIdx *)POINTER_SHIFT(row, toffset); char * ptr = (char *)POINTER_SHIFT(row, kvRowLen(row)); @@ -411,7 +488,7 @@ static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t memcpy(ptr, value, varDataTLen(value)); kvRowLen(row) += varDataTLen(value); } else { - if (offset == 0) { + if (*offset == 0) { ASSERT(type == TSDB_DATA_TYPE_TIMESTAMP); TKEY tvalue = tdGetTKEY(*(TSKEY *)value); memcpy(ptr, (void *)(&tvalue), TYPE_BYTES[type]); @@ -420,9 +497,27 @@ static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t } kvRowLen(row) += TYPE_BYTES[type]; } + *offset += sizeof(SColIdx); return 0; } +// NOTE: offset here including the header size +static FORCE_INLINE void *tdGetKvRowDataOfCol(void *row, int32_t offset) { return POINTER_SHIFT(row, offset); } + +static FORCE_INLINE void *tdGetKVRowValOfColEx(SKVRow row, int16_t colId, int32_t *nIdx) { + while (*nIdx < kvRowNCols(row)) { + SColIdx *pColIdx = kvRowColIdxAt(row, *nIdx); + if (pColIdx->colId == colId) { + ++(*nIdx); + return tdGetKvRowDataOfCol(row, pColIdx->offset); + } else if (pColIdx->colId > colId) { + return NULL; + } else { + ++(*nIdx); + } + } + return NULL; +} // ----------------- K-V data row builder typedef struct { @@ -495,7 +590,7 @@ typedef void *SMemRow; #define TD_MEM_ROW_KV_VER_SIZE sizeof(int16_t) #define TD_MEM_ROW_KV_TYPE_VER_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE) #define TD_MEM_ROW_DATA_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_DATA_ROW_HEAD_SIZE) -// #define TD_MEM_ROW_KV_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE + TD_KV_ROW_HEAD_SIZE) +#define TD_MEM_ROW_KV_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE + TD_KV_ROW_HEAD_SIZE) #define SMEM_ROW_DATA 0U // SDataRow #define SMEM_ROW_KV 1U // SKVRow @@ -538,27 +633,80 @@ typedef void *SMemRow; #define memRowSetType(r, t) (memRowType(r) = (t)) #define memRowSetLen(r, l) (isDataRow(r) ? memRowDataLen(r) = (l) : memRowKvLen(r) = (l)) -#define memRowSetVersion(r, v) (isDataRow(r) ? dataRowSetVersion(memRowDataBody(r), v) : memRowKvSetVersion(r, v)) +#define memRowSetVersion(r, v) (isDataRow(r) ? dataRowSetVersion(memRowDataBody(r), v) : memRowSetKvVersion(r, v)) #define memRowCpy(dst, r) memcpy((dst), (r), memRowTLen(r)) #define memRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_MEM_ROW_DATA_HEAD_SIZE) #define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r)) SMemRow tdMemRowDup(SMemRow row); -void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols); -// NOTE: offset here including the header size -static FORCE_INLINE void *tdGetKvRowDataOfCol(void *row, int32_t offset) { return POINTER_SHIFT(row, offset); } +void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull); + // NOTE: offset here including the header size -static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int8_t type, int32_t offset) { +static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int16_t colId, int8_t colType, uint16_t offset) { if (isDataRow(row)) { - return tdGetRowDataOfCol(row, type, offset); - } else if (isKvRow(row)) { - return tdGetKvRowDataOfCol(row, offset); + return tdGetRowDataOfCol(memRowDataBody(row), colType, offset); } else { - ASSERT(0); + return tdGetKVRowValOfCol(memRowKvBody(row), colId); } - return NULL; } +/** + * NOTE: + * 1. Applicable to scan columns one by one + * 2. offset here including the header size + */ +static FORCE_INLINE void *tdGetMemRowDataOfColEx(void *row, int16_t colId, int8_t colType, int32_t offset, + int32_t *kvNIdx) { + if (isDataRow(row)) { + return tdGetRowDataOfCol(memRowDataBody(row), colType, offset); + } else { + return tdGetKVRowValOfColEx(memRowKvBody(row), colId, kvNIdx); + } +} + +static FORCE_INLINE int tdAppendMemColVal(SMemRow row, const void *value, int16_t colId, int8_t type, int32_t offset, + int32_t *kvOffset) { + if (isDataRow(row)) { + tdAppendColVal(memRowDataBody(row), value, type, offset); + } else { + tdAppendKvColVal(memRowKvBody(row), value, colId, type, kvOffset); + } + return 0; +} + +// make sure schema->flen appended for SDataRow +static FORCE_INLINE int32_t tdGetColAppendLen(uint8_t rowType, const void *value, int8_t colType) { + int32_t len = 0; + if (IS_VAR_DATA_TYPE(colType)) { + len += varDataTLen(value); + if (rowType == SMEM_ROW_KV) { + len += sizeof(SColIdx); + } + } else { + if (rowType == SMEM_ROW_KV) { + len += TYPE_BYTES[colType]; + len += sizeof(SColIdx); + } + } + return len; +} + + +typedef struct { + int16_t colId; + uint8_t colType; + char* colVal; +} SColInfo; + +static FORCE_INLINE void setSColInfo(SColInfo* colInfo, int16_t colId, uint8_t colType, char* colVal) { + colInfo->colId = colId; + colInfo->colType = colType; + colInfo->colVal = colVal; +} + +SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2); + + // ----------------- Raw payload structure for row: /* |<------------ Head ------------->|<----------- body of column data tuple ------------------->| * | |<----------------- flen ------------->|<--- value part --->| @@ -608,4 +756,4 @@ static FORCE_INLINE char *payloadNextCol(char *pCol) { return (char *)POINTER_SH } #endif -#endif // _TD_DATA_FORMAT_H_ \ No newline at end of file +#endif // _TD_DATA_FORMAT_H_ diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 5a635886114141ddc44c9a4ce66e1db6c89fd19a..8ef3d083c75c58381fc8a71f076e7e04e976d774 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -17,9 +17,10 @@ #include "talgo.h" #include "tcoding.h" #include "wchar.h" +#include "tarray.h" static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, - int limit2, int tRows); + int limit2, int tRows, bool forceSetNull); /** * Duplicate the schema and return a new object @@ -418,7 +419,8 @@ void tdResetDataCols(SDataCols *pCols) { } } } -static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) { + +static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) { ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row)); int rcol = 0; @@ -452,8 +454,10 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols } else if (pRowCol->colId < pDataCol->colId) { rcol++; } else { - // dataColSetNullAt(pDataCol, pCols->numOfRows); - dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + if(forceSetNull) { + //dataColSetNullAt(pDataCol, pCols->numOfRows); + dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + } dcol++; } } @@ -461,7 +465,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols pCols->numOfRows++; } -static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols) { +static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) { ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row)); int rcol = 0; @@ -498,8 +502,10 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo } else if (colIdx->colId < pDataCol->colId) { ++rcol; } else { - // dataColSetNullAt(pDataCol, pCols->numOfRows); - dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + if (forceSetNull) { + // dataColSetNullAt(pDataCol, pCols->numOfRows); + dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + } ++dcol; } } @@ -507,17 +513,17 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo pCols->numOfRows++; } -void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols) { +void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) { if (isDataRow(row)) { - tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols); + tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols, forceSetNull); } else if (isKvRow(row)) { - tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols); + tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols, forceSetNull); } else { ASSERT(0); } } -int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset) { +int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) { ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); ASSERT(target->numOfCols == source->numOfCols); int offset = 0; @@ -546,7 +552,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int * int iter1 = 0; tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, pOffset, source->numOfRows, - pTarget->numOfRows + rowsToMerge); + pTarget->numOfRows + rowsToMerge, forceSetNull); } tdFreeDataCols(pTarget); @@ -559,7 +565,7 @@ _err: // src2 data has more priority than src1 static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, - int limit2, int tRows) { + int limit2, int tRows, bool forceSetNull) { tdResetDataCols(target); ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows); @@ -588,7 +594,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) { for (int i = 0; i < src2->numOfCols; i++) { ASSERT(target->cols[i].type == src2->cols[i].type); - if (src2->cols[i].len > 0) { + if (src2->cols[i].len > 0 && (forceSetNull || (!forceSetNull && !isNull(src2->cols[i].pData, src2->cols[i].type)))) { dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows, target->maxPoints); } @@ -761,4 +767,97 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) { memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size); return row; -} \ No newline at end of file +} + +SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2) { +#if 0 + ASSERT(memRowKey(row1) == memRowKey(row2)); + ASSERT(schemaVersion(pSchema1) == memRowVersion(row1)); + ASSERT(schemaVersion(pSchema2) == memRowVersion(row2)); + ASSERT(schemaVersion(pSchema1) >= schemaVersion(pSchema2)); +#endif + + SArray *stashRow = taosArrayInit(pSchema1->numOfCols, sizeof(SColInfo)); + if (stashRow == NULL) { + return NULL; + } + + SMemRow pRow = buffer; + SDataRow dataRow = memRowDataBody(pRow); + memRowSetType(pRow, SMEM_ROW_DATA); + dataRowSetVersion(dataRow, schemaVersion(pSchema1)); // use latest schema version + dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pSchema1->flen)); + + TDRowTLenT dataLen = 0, kvLen = TD_MEM_ROW_KV_HEAD_SIZE; + + int32_t i = 0; // row1 + int32_t j = 0; // row2 + int32_t nCols1 = schemaNCols(pSchema1); + int32_t nCols2 = schemaNCols(pSchema2); + SColInfo colInfo = {0}; + int32_t kvIdx1 = 0, kvIdx2 = 0; + + while (i < nCols1) { + STColumn *pCol = schemaColAt(pSchema1, i); + void * val1 = tdGetMemRowDataOfColEx(row1, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx1); + // if val1 != NULL, use val1; + if (val1 != NULL && !isNull(val1, pCol->type)) { + tdAppendColVal(dataRow, val1, pCol->type, pCol->offset); + kvLen += tdGetColAppendLen(SMEM_ROW_KV, val1, pCol->type); + setSColInfo(&colInfo, pCol->colId, pCol->type, val1); + taosArrayPush(stashRow, &colInfo); + ++i; // next col + continue; + } + + void *val2 = NULL; + while (j < nCols2) { + STColumn *tCol = schemaColAt(pSchema2, j); + if (tCol->colId < pCol->colId) { + ++j; + continue; + } + if (tCol->colId == pCol->colId) { + val2 = tdGetMemRowDataOfColEx(row2, tCol->colId, tCol->type, TD_DATA_ROW_HEAD_SIZE + tCol->offset, &kvIdx2); + } else if (tCol->colId > pCol->colId) { + // set NULL + } + break; + } // end of while(jtype); + } + tdAppendColVal(dataRow, val2, pCol->type, pCol->offset); + if (!isNull(val2, pCol->type)) { + kvLen += tdGetColAppendLen(SMEM_ROW_KV, val2, pCol->type); + setSColInfo(&colInfo, pCol->colId, pCol->type, val2); + taosArrayPush(stashRow, &colInfo); + } + + ++i; // next col + } + + dataLen = memRowTLen(pRow); + + if (kvLen < dataLen) { + // scan stashRow and generate SKVRow + memset(buffer, 0, sizeof(dataLen)); + SMemRow tRow = buffer; + memRowSetType(tRow, SMEM_ROW_KV); + SKVRow kvRow = (SKVRow)memRowKvBody(tRow); + int16_t nKvNCols = (int16_t) taosArrayGetSize(stashRow); + kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nKvNCols)); + kvRowSetNCols(kvRow, nKvNCols); + memRowSetKvVersion(tRow, pSchema1->version); + + int32_t toffset = 0; + int16_t k; + for (k = 0; k < nKvNCols; ++k) { + SColInfo *pColInfo = taosArrayGet(stashRow, k); + tdAppendKvColVal(kvRow, pColInfo->colVal, pColInfo->colId, pColInfo->colType, &toffset); + } + ASSERT(kvLen == memRowTLen(tRow)); + } + taosArrayDestroy(stashRow); + return buffer; +} diff --git a/src/common/src/ttypes.c b/src/common/src/ttypes.c index 13bcfe64800dfee693d2622a42df7ad9c7617fae..eeffe49adc9e5efe97dde580584afad280ee2993 100644 --- a/src/common/src/ttypes.c +++ b/src/common/src/ttypes.c @@ -409,7 +409,7 @@ bool isValidDataType(int32_t type) { return type >= TSDB_DATA_TYPE_NULL && type <= TSDB_DATA_TYPE_UBIGINT; } -void setVardataNull(char* val, int32_t type) { +void setVardataNull(void* val, int32_t type) { if (type == TSDB_DATA_TYPE_BINARY) { varDataSetLen(val, sizeof(int8_t)); *(uint8_t*) varDataVal(val) = TSDB_DATA_BINARY_NULL; @@ -421,75 +421,75 @@ void setVardataNull(char* val, int32_t type) { } } -void setNull(char *val, int32_t type, int32_t bytes) { setNullN(val, type, bytes, 1); } +void setNull(void *val, int32_t type, int32_t bytes) { setNullN(val, type, bytes, 1); } -void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) { +void setNullN(void *val, int32_t type, int32_t bytes, int32_t numOfElems) { switch (type) { case TSDB_DATA_TYPE_BOOL: for (int32_t i = 0; i < numOfElems; ++i) { - *(uint8_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_BOOL_NULL; + *(uint8_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_BOOL_NULL; } break; case TSDB_DATA_TYPE_TINYINT: for (int32_t i = 0; i < numOfElems; ++i) { - *(uint8_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_TINYINT_NULL; + *(uint8_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_TINYINT_NULL; } break; case TSDB_DATA_TYPE_SMALLINT: for (int32_t i = 0; i < numOfElems; ++i) { - *(uint16_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_SMALLINT_NULL; + *(uint16_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_SMALLINT_NULL; } break; case TSDB_DATA_TYPE_INT: for (int32_t i = 0; i < numOfElems; ++i) { - *(uint32_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_INT_NULL; + *(uint32_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_INT_NULL; } break; case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_TIMESTAMP: for (int32_t i = 0; i < numOfElems; ++i) { - *(uint64_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_BIGINT_NULL; + *(uint64_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_BIGINT_NULL; } break; case TSDB_DATA_TYPE_UTINYINT: for (int32_t i = 0; i < numOfElems; ++i) { - *(uint8_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_UTINYINT_NULL; + *(uint8_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_UTINYINT_NULL; } break; case TSDB_DATA_TYPE_USMALLINT: for (int32_t i = 0; i < numOfElems; ++i) { - *(uint16_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_USMALLINT_NULL; + *(uint16_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_USMALLINT_NULL; } break; case TSDB_DATA_TYPE_UINT: for (int32_t i = 0; i < numOfElems; ++i) { - *(uint32_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_UINT_NULL; + *(uint32_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_UINT_NULL; } break; case TSDB_DATA_TYPE_UBIGINT: for (int32_t i = 0; i < numOfElems; ++i) { - *(uint64_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_UBIGINT_NULL; + *(uint64_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_UBIGINT_NULL; } break; case TSDB_DATA_TYPE_FLOAT: for (int32_t i = 0; i < numOfElems; ++i) { - *(uint32_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_FLOAT_NULL; + *(uint32_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_FLOAT_NULL; } break; case TSDB_DATA_TYPE_DOUBLE: for (int32_t i = 0; i < numOfElems; ++i) { - *(uint64_t *)(val + i * tDataTypes[type].bytes) = TSDB_DATA_DOUBLE_NULL; + *(uint64_t *)(POINTER_SHIFT(val, i * tDataTypes[type].bytes)) = TSDB_DATA_DOUBLE_NULL; } break; case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_BINARY: for (int32_t i = 0; i < numOfElems; ++i) { - setVardataNull(val + i * bytes, type); + setVardataNull(POINTER_SHIFT(val, i * bytes), type); } break; default: { for (int32_t i = 0; i < numOfElems; ++i) { - *(uint32_t *)(val + i * tDataTypes[TSDB_DATA_TYPE_INT].bytes) = TSDB_DATA_INT_NULL; + *(uint32_t *)(POINTER_SHIFT(val, i * tDataTypes[TSDB_DATA_TYPE_INT].bytes)) = TSDB_DATA_INT_NULL; } break; } diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index ca8ad3cc09f99d06c1f0406bb7124a8aad6fef55..4a5497795f21252c6e8ed1f54a64d23a1c1aa13b 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -307,7 +307,7 @@ do { \ #define TSDB_DEFAULT_WAL_LEVEL 1 #define TSDB_MIN_DB_UPDATE 0 -#define TSDB_MAX_DB_UPDATE 1 +#define TSDB_MAX_DB_UPDATE 2 #define TSDB_DEFAULT_DB_UPDATE_OPTION 0 #define TSDB_MIN_DB_CACHE_LAST_ROW 0 @@ -435,6 +435,12 @@ typedef enum { TSDB_CHECK_ITEM_MAX } ECheckItemType; +typedef enum { + TD_ROW_DISCARD_UPDATE = 0, + TD_ROW_OVERWRITE_UPDATE = 1, + TD_ROW_PARTIAL_UPDATE = 2 +} TDUpdateConfig; + extern char *qtypeStr[]; #ifdef __cplusplus diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 79d9029dbc1603fe145e8b12553e2d03c028ac60..7fd6d8c10c4b2845be833b00d47d9307652f84ae 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -111,7 +111,7 @@ typedef struct { uint64_t superUid; STSchema * schema; STSchema * tagSchema; - SDataRow tagValues; + SKVRow tagValues; char * sql; } STableCfg; diff --git a/src/inc/ttype.h b/src/inc/ttype.h index fcd2651c701c6c7a31d276b807912f28111b4c4f..44e666106a7657691b0d97d259ccb7b61871b9a7 100644 --- a/src/inc/ttype.h +++ b/src/inc/ttype.h @@ -141,7 +141,7 @@ typedef struct { #define IS_VALID_FLOAT(_t) ((_t) >= -FLT_MAX && (_t) <= FLT_MAX) #define IS_VALID_DOUBLE(_t) ((_t) >= -DBL_MAX && (_t) <= DBL_MAX) -static FORCE_INLINE bool isNull(const char *val, int32_t type) { +static FORCE_INLINE bool isNull(const void *val, int32_t type) { switch (type) { case TSDB_DATA_TYPE_BOOL: return *(uint8_t *)val == TSDB_DATA_BOOL_NULL; @@ -193,9 +193,9 @@ extern tDataTypeDescriptor tDataTypes[15]; bool isValidDataType(int32_t type); -void setVardataNull(char* val, int32_t type); -void setNull(char *val, int32_t type, int32_t bytes); -void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems); +void setVardataNull(void* val, int32_t type); +void setNull(void *val, int32_t type, int32_t bytes); +void setNullN(void *val, int32_t type, int32_t bytes, int32_t numOfElems); const void *getNullValue(int32_t type); void assignVal(char *val, const char *src, int32_t len, int32_t type); diff --git a/src/tsdb/inc/tsdbBuffer.h b/src/tsdb/inc/tsdbBuffer.h index 4e18ac711a159cd97fd6255c5be0e8aa6ff4abaf..ec6b057aef142fb938993b3a27717c5e64937258 100644 --- a/src/tsdb/inc/tsdbBuffer.h +++ b/src/tsdb/inc/tsdbBuffer.h @@ -40,7 +40,7 @@ void tsdbFreeBufPool(STsdbBufPool* pBufPool); int tsdbOpenBufPool(STsdbRepo* pRepo); void tsdbCloseBufPool(STsdbRepo* pRepo); SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); -int tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks); +int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks); void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode); -#endif /* _TD_TSDB_BUFFER_H_ */ \ No newline at end of file +#endif /* _TD_TSDB_BUFFER_H_ */ diff --git a/src/tsdb/inc/tsdbReadImpl.h b/src/tsdb/inc/tsdbReadImpl.h index e7840d9723f7935c69f70f1c236a5ed49c82c146..814c4d130599768e8237145559c47e50e64db4db 100644 --- a/src/tsdb/inc/tsdbReadImpl.h +++ b/src/tsdb/inc/tsdbReadImpl.h @@ -16,6 +16,13 @@ #ifndef _TD_TSDB_READ_IMPL_H_ #define _TD_TSDB_READ_IMPL_H_ +#include "tfs.h" +#include "tsdb.h" +#include "os.h" +#include "tsdbFile.h" +#include "tskiplist.h" +#include "tsdbMeta.h" + typedef struct SReadH SReadH; typedef struct { @@ -143,4 +150,4 @@ static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) { return 0; } -#endif /*_TD_TSDB_READ_IMPL_H_*/ \ No newline at end of file +#endif /*_TD_TSDB_READ_IMPL_H_*/ diff --git a/src/tsdb/inc/tsdbRowMergeBuf.h b/src/tsdb/inc/tsdbRowMergeBuf.h new file mode 100644 index 0000000000000000000000000000000000000000..302bf25750fc08367a2840fa9c483919c828fcb5 --- /dev/null +++ b/src/tsdb/inc/tsdbRowMergeBuf.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TSDB_ROW_MERGE_BUF_H +#define TSDB_ROW_MERGE_BUF_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "tsdb.h" +#include "tchecksum.h" +#include "tsdbReadImpl.h" + +typedef void* SMergeBuf; + +SDataRow tsdbMergeTwoRows(SMergeBuf *pBuf, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2); + +static FORCE_INLINE int tsdbMergeBufMakeSureRoom(SMergeBuf *pBuf, STSchema* pSchema1, STSchema* pSchema2) { + return tsdbMakeRoom(pBuf, MAX(dataRowMaxBytesFromSchema(pSchema1), dataRowMaxBytesFromSchema(pSchema2))); +} + +static FORCE_INLINE void tsdbFreeMergeBuf(SMergeBuf buf) { + taosTZfree(buf); +} + +#ifdef __cplusplus +} +#endif + +#endif /* ifndef TSDB_ROW_MERGE_BUF_H */ diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index dd43e393102c40d4b12a948a9b4d6d72f30f6dad..757a0951e8640656951904ffdcca1024cc27a800 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -68,6 +68,8 @@ extern "C" { #include "tsdbCompact.h" // Commit Queue #include "tsdbCommitQueue.h" + +#include "tsdbRowMergeBuf.h" // Main definitions struct STsdbRepo { uint8_t state; @@ -92,6 +94,7 @@ struct STsdbRepo { pthread_mutex_t mutex; bool repoLocked; int32_t code; // Commit code + SMergeBuf mergeBuf; //used when update=2 bool inCompact; // is in compact process? }; @@ -139,4 +142,4 @@ static FORCE_INLINE int tsdbGetNextMaxTables(int tid) { } #endif -#endif /* _TD_TSDB_INT_H_ */ \ No newline at end of file +#endif /* _TD_TSDB_INT_H_ */ diff --git a/src/tsdb/src/tsdbBuffer.c b/src/tsdb/src/tsdbBuffer.c index 429ea8e0ceb2c687b53ebd0a9d61d02d2a1f3686..e675bf6f9de04021112d43a1db70cf56cf430f08 100644 --- a/src/tsdb/src/tsdbBuffer.c +++ b/src/tsdb/src/tsdbBuffer.c @@ -159,7 +159,7 @@ _err: static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } -int tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) { +int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) { if (oldTotalBlocks == pRepo->config.totalBlocks) { return TSDB_CODE_SUCCESS; } @@ -199,4 +199,4 @@ void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) { tsdbFreeBufBlock(pBufBlock); free(pNode); pPool->nBufBlocks--; -} \ No newline at end of file +} diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 6330da60585afb590ee82018b3204280f62e5f1c..6c98283189b8a3e83ff888bfb9530bb85127c27d 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -1290,7 +1290,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ASSERT(pSchema != NULL); } - tdAppendMemRowToDataCol(row, pSchema, pTarget); + tdAppendMemRowToDataCol(row, pSchema, pTarget, true); } tSkipListIterNext(pCommitIter->pIter); @@ -1302,7 +1302,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ASSERT(pSchema != NULL); } - tdAppendMemRowToDataCol(row, pSchema, pTarget); + tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); } } else { ASSERT(!isRowDel); diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index e45ac05e979b119ee7995cf845a6f242a9953cb3..59fb4f334d3006eb7e8807ce193d61905f2322d2 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -138,7 +138,7 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, pSaveCfg->totalBlocks, oldCfg.cacheLastRow, pSaveCfg->cacheLastRow, oldTotalBlocks, pSaveCfg->totalBlocks); - int err = tsdbExpendPool(pRepo, oldTotalBlocks); + int err = tsdbExpandPool(pRepo, oldTotalBlocks); if (!TAOS_SUCCEEDED(err)) { tsdbError("vgId:%d expand pool from %d to %d fail,reason:%s", REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err)); diff --git a/src/tsdb/src/tsdbCompact.c b/src/tsdb/src/tsdbCompact.c index 5211ee3c611bb9a75ba02aa5fdf810bb52b1ab9d..0490f26b5ec649a420d0340990a8f49c14f9deb0 100644 --- a/src/tsdb/src/tsdbCompact.c +++ b/src/tsdb/src/tsdbCompact.c @@ -455,7 +455,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { if (pReadh->pDCols[0]->numOfRows - ridx == 0) break; int rowsToMerge = MIN(pReadh->pDCols[0]->numOfRows - ridx, defaultRows - pComph->pDataCols->numOfRows); - tdMergeDataCols(pComph->pDataCols, pReadh->pDCols[0], rowsToMerge, &ridx); + tdMergeDataCols(pComph->pDataCols, pReadh->pDCols[0], rowsToMerge, &ridx, pCfg->update != TD_ROW_PARTIAL_UPDATE); if (pComph->pDataCols->numOfRows < defaultRows) { break; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index f3a7c4b7eecfab36eb5249107e88d0af60992f62..c877bfc7af2620c354c3613977a6b9da7ca96825 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -14,6 +14,7 @@ */ // no test file errors here +#include "taosdef.h" #include "tsdbint.h" #define IS_VALID_PRECISION(precision) \ @@ -106,6 +107,8 @@ STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { return NULL; } + pRepo->mergeBuf = NULL; + tsdbStartStream(pRepo); tsdbDebug("vgId:%d, TSDB repository opened", REPO_ID(pRepo)); @@ -518,7 +521,8 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { } // update check - if (pCfg->update != 0) pCfg->update = 1; + if (pCfg->update < TD_ROW_DISCARD_UPDATE || pCfg->update > TD_ROW_PARTIAL_UPDATE) + pCfg->update = TD_ROW_DISCARD_UPDATE; // update cacheLastRow if (pCfg->cacheLastRow != 0) { @@ -597,6 +601,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) { tsdbFreeFS(pRepo->fs); tsdbFreeBufPool(pRepo->pPool); tsdbFreeMeta(pRepo->tsdbMeta); + tsdbFreeMergeBuf(pRepo->mergeBuf); // tsdbFreeMemTable(pRepo->mem); // tsdbFreeMemTable(pRepo->imem); tsem_destroy(&(pRepo->readyToCommit)); diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index ee9dadd9b7e2e560480d289bb029fbf6c1fc8d88..8bb2d1c44e17d7409c5e290adf3cf0e7d1e79528 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -13,7 +13,11 @@ * along with this program. If not, see . */ +#include "tdataformat.h" +#include "tfunctional.h" #include "tsdbint.h" +#include "tskiplist.h" +#include "tsdbRowMergeBuf.h" #define TSDB_DATA_SKIPLIST_LEVEL 5 #define TSDB_MAX_INSERT_BATCH 512 @@ -30,24 +34,22 @@ typedef struct { void * pMsg; } SSubmitMsgIter; -static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo); -static void tsdbFreeMemTable(SMemTable *pMemTable); -static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); -static void tsdbFreeTableData(STableData *pTableData); -static char * tsdbGetTsTupleKey(const void *data); +static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo); +static void tsdbFreeMemTable(SMemTable *pMemTable); +static STableData* tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); +static void tsdbFreeTableData(STableData *pTableData); +static char * tsdbGetTsTupleKey(const void *data); static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); -static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row); +static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row); static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); -static SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); +static SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg); static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows); -static int tsdbCopyRowToMem(STsdbRepo *pRepo, SMemRow row, STable *pTable, void **ppRow); static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter); static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock); static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); -static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter); -static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter); -static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row); +static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row); + static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey, TSKEY now); @@ -342,7 +344,7 @@ int tsdbSyncCommit(STsdbRepo *repo) { * 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead * 4. operations in pCols not exceeds its max capacity if pCols is given * - * The function tries to procceed AS MUSH AS POSSIBLE. + * The function tries to procceed AS MUCH AS POSSIBLE. */ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) { @@ -523,9 +525,15 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { pTableData->keyLast = 0; pTableData->numOfRows = 0; + uint8_t skipListCreateFlags; + if(pCfg->update == TD_ROW_DISCARD_UPDATE) + skipListCreateFlags = SL_DISCARD_DUP_KEY; + else + skipListCreateFlags = SL_UPDATE_DUP_KEY; + pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], - tkeyComparFn, pCfg->update ? SL_UPDATE_DUP_KEY : SL_DISCARD_DUP_KEY, tsdbGetTsTupleKey); + tkeyComparFn, skipListCreateFlags, tsdbGetTsTupleKey); if (pTableData->pData == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; free(pTableData); @@ -581,7 +589,7 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema * } } - tdAppendMemRowToDataCol(row, *ppSchema, pCols); + tdAppendMemRowToDataCol(row, *ppSchema, pCols, true); } return 0; @@ -693,95 +701,162 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) { return 0; } -static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows) { - STsdbMeta * pMeta = pRepo->tsdbMeta; - int64_t points = 0; - STable * pTable = NULL; - SSubmitBlkIter blkIter = {0}; - SMemRow row = NULL; - void * rows[TSDB_MAX_INSERT_BATCH] = {0}; - int rowCounter = 0; +//row1 has higher priority +static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRepo, STSchema **ppSchema1, STSchema **ppSchema2, STable* pTable, int32_t* pAffectedRows, int64_t* pPoints, SMemRow* pLastRow) { + + //for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows! + if(row1 == NULL && row2 == NULL && pRepo->config.update == TD_ROW_DISCARD_UPDATE) { + (*pAffectedRows)++; + (*pPoints)++; + return NULL; + } - ASSERT(pBlock->tid < pMeta->maxTables); - pTable = pMeta->tables[pBlock->tid]; - ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid); + if(row2 == NULL || pRepo->config.update != TD_ROW_PARTIAL_UPDATE) { + void* pMem = tsdbAllocBytes(pRepo, memRowTLen(row1)); + if(pMem == NULL) return NULL; + memRowCpy(pMem, row1); + (*pAffectedRows)++; + (*pPoints)++; + *pLastRow = pMem; + return pMem; + } - tsdbInitSubmitBlkIter(pBlock, &blkIter); - while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) { - if (tsdbCopyRowToMem(pRepo, row, pTable, &(rows[rowCounter])) < 0) { - tsdbFreeRows(pRepo, rows, rowCounter); - goto _err; + STSchema *pSchema1 = *ppSchema1; + STSchema *pSchema2 = *ppSchema2; + SMergeBuf * pBuf = &pRepo->mergeBuf; + int dv1 = memRowVersion(row1); + int dv2 = memRowVersion(row2); + if(pSchema1 == NULL || schemaVersion(pSchema1) != dv1) { + if(pSchema2 != NULL && schemaVersion(pSchema2) == dv1) { + *ppSchema1 = pSchema2; + } else { + *ppSchema1 = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row1)); } + pSchema1 = *ppSchema1; + } - (*affectedrows)++; - points++; - - if (rows[rowCounter] != NULL) { - rowCounter++; + if(pSchema2 == NULL || schemaVersion(pSchema2) != dv2) { + if(schemaVersion(pSchema1) == dv2) { + pSchema2 = pSchema1; + } else { + *ppSchema2 = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row2)); + pSchema2 = *ppSchema2; } + } - if (rowCounter == TSDB_MAX_INSERT_BATCH) { - if (tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) { - goto _err; - } + SMemRow tmp = tsdbMergeTwoRows(pBuf, row1, row2, pSchema1, pSchema2); - rowCounter = 0; - memset(rows, 0, sizeof(rows)); - } - } + void* pMem = tsdbAllocBytes(pRepo, memRowTLen(tmp)); + if(pMem == NULL) return NULL; + memRowCpy(pMem, tmp); - if (rowCounter > 0 && tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) { - goto _err; - } + (*pAffectedRows)++; + (*pPoints)++; - STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion); - pRepo->stat.pointsWritten += points * schemaNCols(pSchema); - pRepo->stat.totalStorage += points * schemaVLen(pSchema); + *pLastRow = pMem; + return pMem; +} - return 0; +static void* tsdbInsertDupKeyMergePacked(void** args) { + return tsdbInsertDupKeyMerge(args[0], args[1], args[2], (STSchema**)&args[3], (STSchema**)&args[4], args[5], args[6], args[7], args[8]); +} -_err: - return -1; +static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STable *pTable, int32_t* pAffectedRows, int64_t* pPoints, SMemRow* pLastRow) { + + if(pSkipList->insertHandleFn == NULL) { + tGenericSavedFunc *dupHandleSavedFunc = genericSavedFuncInit((GenericVaFunc)&tsdbInsertDupKeyMergePacked, 9); + dupHandleSavedFunc->args[2] = pRepo; + dupHandleSavedFunc->args[3] = NULL; + dupHandleSavedFunc->args[4] = NULL; + dupHandleSavedFunc->args[5] = pTable; + dupHandleSavedFunc->args[6] = pAffectedRows; + dupHandleSavedFunc->args[7] = pPoints; + dupHandleSavedFunc->args[8] = pLastRow; + pSkipList->insertHandleFn = dupHandleSavedFunc; + } } -static int tsdbCopyRowToMem(STsdbRepo *pRepo, SMemRow row, STable *pTable, void **ppRow) { - STsdbCfg * pCfg = &pRepo->config; - TKEY tkey = memRowTKey(row); - TSKEY key = memRowKey(row); - bool isRowDelete = TKEY_IS_DELETED(tkey); +static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *pAffectedRows) { - if (isRowDelete) { - if (!pCfg->update) { - tsdbWarn("vgId:%d vnode is not allowed to update but try to delete a data row", REPO_ID(pRepo)); - terrno = TSDB_CODE_TDB_INVALID_ACTION; + STsdbMeta *pMeta = pRepo->tsdbMeta; + int64_t points = 0; + STable *pTable = NULL; + SSubmitBlkIter blkIter = {0}; + SMemTable *pMemTable = NULL; + STableData *pTableData = NULL; + STsdbCfg *pCfg = &(pRepo->config); + + tsdbInitSubmitBlkIter(pBlock, &blkIter); + if(blkIter.row == NULL) return 0; + TSKEY firstRowKey = memRowKey(blkIter.row); + + tsdbAllocBytes(pRepo, 0); + pMemTable = pRepo->mem; + + ASSERT(pMemTable != NULL); + ASSERT(pBlock->tid < pMeta->maxTables); + + pTable = pMeta->tables[pBlock->tid]; + + ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid); + + + if (TABLE_TID(pTable) >= pMemTable->maxTables) { + if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) { return -1; } + } + pTableData = pMemTable->tData[TABLE_TID(pTable)]; + + if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) { + if (pTableData != NULL) { + taosWLockLatch(&(pMemTable->latch)); + pMemTable->tData[TABLE_TID(pTable)] = NULL; + tsdbFreeTableData(pTableData); + taosWUnLockLatch(&(pMemTable->latch)); + } - TSKEY lastKey = tsdbGetTableLastKeyImpl(pTable); - if (key > lastKey) { - tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64, - REPO_ID(pRepo), key, lastKey); - return 0; + pTableData = tsdbNewTableData(pCfg, pTable); + if (pTableData == NULL) { + tsdbError("vgId:%d failed to insert data to table %s uid %" PRId64 " tid %d since %s", REPO_ID(pRepo), + TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), tstrerror(terrno)); + return -1; } - } - void *pRow = tsdbAllocBytes(pRepo, memRowTLen(row)); - if (pRow == NULL) { - tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %" PRIu32 " bytes since %s", - REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), memRowTLen(row), tstrerror(terrno)); - return -1; + pRepo->mem->tData[TABLE_TID(pTable)] = pTableData; } - memRowCpy(pRow, row); - ppRow[0] = pRow; // save the memory address of data rows + ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable)); + + SMemRow lastRow = NULL; + int64_t osize = SL_SIZE(pTableData->pData); + tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, pAffectedRows, &points, &lastRow); + tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tsdbGetSubmitBlkNext); + int64_t dsize = SL_SIZE(pTableData->pData) - osize; - tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), - isRowDelete ? "deleted from" : "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), - key); + + if(lastRow != NULL) { + TSKEY lastRowKey = memRowKey(lastRow); + if (pMemTable->keyFirst > firstRowKey) pMemTable->keyFirst = firstRowKey; + pMemTable->numOfRows += dsize; + + if (pTableData->keyFirst > firstRowKey) pTableData->keyFirst = firstRowKey; + pTableData->numOfRows += dsize; + if (pMemTable->keyLast < lastRowKey) pMemTable->keyLast = lastRowKey; + if (pTableData->keyLast < lastRowKey) pTableData->keyLast = lastRowKey; + if (tsdbUpdateTableLatestInfo(pRepo, pTable, lastRow) < 0) { + return -1; + } + } + + STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion); + pRepo->stat.pointsWritten += points * schemaNCols(pSchema); + pRepo->stat.totalStorage += points * schemaVLen(pSchema); return 0; } + static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { if (pMsg == NULL) { terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; @@ -889,106 +964,6 @@ static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pT return 0; } -static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter) { - if (rowCounter < 1) return 0; - - SMemTable * pMemTable = NULL; - STableData *pTableData = NULL; - STsdbMeta * pMeta = pRepo->tsdbMeta; - STsdbCfg * pCfg = &(pRepo->config); - - ASSERT(pRepo->mem != NULL); - pMemTable = pRepo->mem; - - if (TABLE_TID(pTable) >= pMemTable->maxTables) { - if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) { - tsdbFreeRows(pRepo, rows, rowCounter); - return -1; - } - } - pTableData = pMemTable->tData[TABLE_TID(pTable)]; - - if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) { - if (pTableData != NULL) { - taosWLockLatch(&(pMemTable->latch)); - pMemTable->tData[TABLE_TID(pTable)] = NULL; - tsdbFreeTableData(pTableData); - taosWUnLockLatch(&(pMemTable->latch)); - } - - pTableData = tsdbNewTableData(pCfg, pTable); - if (pTableData == NULL) { - tsdbError("vgId:%d failed to insert data to table %s uid %" PRId64 " tid %d since %s", REPO_ID(pRepo), - TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), tstrerror(terrno)); - tsdbFreeRows(pRepo, rows, rowCounter); - return -1; - } - - pRepo->mem->tData[TABLE_TID(pTable)] = pTableData; - } - - ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable)); - - int64_t osize = SL_SIZE(pTableData->pData); - tSkipListPutBatch(pTableData->pData, rows, rowCounter); - int64_t dsize = SL_SIZE(pTableData->pData) - osize; - TSKEY keyFirstRow = memRowKey(rows[0]); - TSKEY keyLastRow = memRowKey(rows[rowCounter - 1]); - - if (pMemTable->keyFirst > keyFirstRow) pMemTable->keyFirst = keyFirstRow; - if (pMemTable->keyLast < keyLastRow) pMemTable->keyLast = keyLastRow; - pMemTable->numOfRows += dsize; - - if (pTableData->keyFirst > keyFirstRow) pTableData->keyFirst = keyFirstRow; - if (pTableData->keyLast < keyLastRow) pTableData->keyLast = keyLastRow; - pTableData->numOfRows += dsize; - - // update table latest info - if (tsdbUpdateTableLatestInfo(pRepo, pTable, rows[rowCounter - 1]) < 0) { - return -1; - } - - return 0; -} - -static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { - ASSERT(pRepo->mem != NULL); - STsdbBufPool *pBufPool = pRepo->pPool; - - for (int i = rowCounter - 1; i >= 0; --i) { - SMemRow row = (SMemRow)rows[i]; - int bytes = (int)memRowTLen(row); - - if (pRepo->mem->extraBuffList == NULL) { - STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); - ASSERT(pBufBlock != NULL && pBufBlock->offset >= bytes); - - pBufBlock->offset -= bytes; - pBufBlock->remain += bytes; - ASSERT(row == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset)); - tsdbTrace("vgId:%d free %d bytes to TSDB buffer pool, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes, - listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain); - - if (pBufBlock->offset == 0) { // return the block to buffer pool - if (tsdbLockRepo(pRepo) < 0) return; - SListNode *pNode = tdListPopTail(pRepo->mem->bufBlockList); - tdListPrependNode(pBufPool->bufBlockList, pNode); - if (tsdbUnlockRepo(pRepo) < 0) return; - } - } else { - ASSERT(listNEles(pRepo->mem->extraBuffList) > 0); - SListNode *pNode = tdListPopTail(pRepo->mem->extraBuffList); - ASSERT(row == pNode->data); - free(pNode); - tsdbTrace("vgId:%d free %d bytes to SYSTEM buffer pool", REPO_ID(pRepo), bytes); - - if (listNEles(pRepo->mem->extraBuffList) == 0) { - tdListFree(pRepo->mem->extraBuffList); - pRepo->mem->extraBuffList = NULL; - } - } - } -} static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow row) { tsdbDebug("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, @@ -1005,8 +980,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro } SDataCol *pLatestCols = pTable->lastCols; + int32_t kvIdx = 0; - bool isDataRow = isDataRow(row); for (int16_t j = 0; j < schemaNCols(pSchema); j++) { STColumn *pTCol = schemaColAt(pSchema, j); // ignore not exist colId @@ -1017,16 +992,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro void *value = NULL; - if (isDataRow) { - value = tdGetRowDataOfCol(memRowDataBody(row), (int8_t)pTCol->type, - TD_DATA_ROW_HEAD_SIZE + pTCol->offset); - } else { - // SKVRow - SColIdx *pColIdx = tdGetKVRowIdxOfCol(memRowKvBody(row), pTCol->colId); - if (pColIdx) { - value = tdGetKvRowDataOfCol(memRowKvBody(row), pColIdx->offset); - } - } + value = tdGetMemRowDataOfColEx(row, pTCol->colId, (int8_t)pTCol->type, + TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset, &kvIdx); if ((value == NULL) || isNull(value, pTCol->type)) { continue; @@ -1055,13 +1022,14 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r // if cacheLastRow config has been reset, free the lastRow if (!pCfg->cacheLastRow && pTable->lastRow != NULL) { - taosTZfree(pTable->lastRow); + SMemRow cachedLastRow = pTable->lastRow; TSDB_WLOCK_TABLE(pTable); pTable->lastRow = NULL; TSDB_WUNLOCK_TABLE(pTable); + taosTZfree(cachedLastRow); } - if (tsdbGetTableLastKeyImpl(pTable) < memRowKey(row)) { + if (tsdbGetTableLastKeyImpl(pTable) <= memRowKey(row)) { if (CACHE_LAST_ROW(pCfg) || pTable->lastRow != NULL) { SMemRow nrow = pTable->lastRow; if (taosTSizeof(nrow) < memRowTLen(row)) { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 7a595da39a88c3a0934a28161c803664b03e2507..f5c01d86e7a20384c17aa470435511e985fd8a7a 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -14,11 +14,14 @@ */ #include "os.h" +#include "tdataformat.h" +#include "tskiplist.h" #include "tulog.h" #include "talgo.h" #include "tcompare.h" #include "exception.h" +#include "taosdef.h" #include "tlosertree.h" #include "tsdbint.h" #include "texpr.h" @@ -68,6 +71,12 @@ typedef struct SLoadCompBlockInfo { int32_t fileId; } SLoadCompBlockInfo; +enum { + CHECKINFO_CHOSEN_MEM = 0, + CHECKINFO_CHOSEN_IMEM = 1, + CHECKINFO_CHOSEN_BOTH = 2 //for update=2(merge case) +}; + typedef struct STableCheckInfo { STableId tableId; @@ -76,7 +85,7 @@ typedef struct STableCheckInfo { SBlockInfo* pCompInfo; int32_t compSize; int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks - int8_t chosen:2; // indicate which iterator should move forward + uint8_t chosen:2; // indicate which iterator should move forward bool initBuf; // whether to initialize the in-memory skip list iterator or not SSkipListIterator* iter; // mem buffer skip list iterator SSkipListIterator* iiter; // imem buffer skip list iterator @@ -781,7 +790,62 @@ static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) { tSkipListDestroyIter(pCheckInfo->iiter); } -static SMemRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update) { +static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, int32_t update) { + SMemRow rmem = NULL, rimem = NULL; + if (pCheckInfo->iter) { + SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); + if (node != NULL) { + rmem = (SMemRow)SL_GET_NODE_DATA(node); + } + } + + if (pCheckInfo->iiter) { + SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); + if (node != NULL) { + rimem = (SMemRow)SL_GET_NODE_DATA(node); + } + } + + if (rmem == NULL && rimem == NULL) { + return TSKEY_INITIAL_VAL; + } + + if (rmem != NULL && rimem == NULL) { + pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM; + return memRowKey(rmem); + } + + if (rmem == NULL && rimem != NULL) { + pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; + return memRowKey(rimem); + } + + TSKEY r1 = memRowKey(rmem); + TSKEY r2 = memRowKey(rimem); + + if (r1 == r2) { + if(update == TD_ROW_DISCARD_UPDATE){ + pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; + tSkipListIterNext(pCheckInfo->iter); + } + else if(update == TD_ROW_OVERWRITE_UPDATE) { + pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM; + tSkipListIterNext(pCheckInfo->iiter); + } else { + pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH; + } + return r1; + } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) { + pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM; + return r1; + } + else { + pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; + return r2; + } +} + +static SMemRow getSMemRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, SMemRow* extraRow) { SMemRow rmem = NULL, rimem = NULL; if (pCheckInfo->iter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); @@ -814,31 +878,35 @@ static SMemRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, TSKEY r1 = memRowKey(rmem); TSKEY r2 = memRowKey(rimem); - if (r1 == r2) { // data ts are duplicated, ignore the data in mem - if (!update) { + if (r1 == r2) { + if (update == TD_ROW_DISCARD_UPDATE) { tSkipListIterNext(pCheckInfo->iter); - pCheckInfo->chosen = 1; + pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; return rimem; - } else { + } else if(update == TD_ROW_OVERWRITE_UPDATE){ tSkipListIterNext(pCheckInfo->iiter); - pCheckInfo->chosen = 0; + pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM; + return rmem; + } else { + pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH; + extraRow = rimem; return rmem; } } else { if (ASCENDING_TRAVERSE(order)) { if (r1 < r2) { - pCheckInfo->chosen = 0; + pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM; return rmem; } else { - pCheckInfo->chosen = 1; + pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; return rimem; } } else { if (r1 < r2) { - pCheckInfo->chosen = 1; + pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; return rimem; } else { - pCheckInfo->chosen = 0; + pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; return rmem; } } @@ -847,7 +915,7 @@ static SMemRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) { bool hasNext = false; - if (pCheckInfo->chosen == 0) { + if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) { if (pCheckInfo->iter != NULL) { hasNext = tSkipListIterNext(pCheckInfo->iter); } @@ -859,7 +927,7 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) { if (pCheckInfo->iiter != NULL) { return tSkipListIterGet(pCheckInfo->iiter) != NULL; } - } else { //pCheckInfo->chosen == 1 + } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM){ if (pCheckInfo->iiter != NULL) { hasNext = tSkipListIterNext(pCheckInfo->iiter); } @@ -871,6 +939,13 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) { if (pCheckInfo->iter != NULL) { return tSkipListIterGet(pCheckInfo->iter) != NULL; } + } else { + if (pCheckInfo->iter != NULL) { + hasNext = tSkipListIterNext(pCheckInfo->iter); + } + if (pCheckInfo->iiter != NULL) { + hasNext = tSkipListIterNext(pCheckInfo->iiter) || hasNext; + } } return hasNext; @@ -891,7 +966,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { initTableMemIterator(pHandle, pCheckInfo); } - SMemRow row = getSDataRowInTableMem(pCheckInfo, pHandle->order, pCfg->update); + SMemRow row = getSMemRowInTableMem(pCheckInfo, pHandle->order, pCfg->update, NULL); if (row == NULL) { return false; } @@ -1147,25 +1222,28 @@ static int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t c static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols); static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle); static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos); +static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, int32_t update); static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo){ SQueryFilePos* cur = &pQueryHandle->cur; STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); + TSKEY key; int32_t code = TSDB_CODE_SUCCESS; /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); - SMemRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update); assert(cur->pos >= 0 && cur->pos <= binfo.rows); - TSKEY key = (row != NULL) ? memRowKey(row) : TSKEY_INITIAL_VAL; + key = extractFirstTraverseKey(pCheckInfo, pQueryHandle->order, pCfg->update); + if (key != TSKEY_INITIAL_VAL) { tsdbDebug("%p key in mem:%"PRId64", 0x%"PRIx64, pQueryHandle, key, pQueryHandle->qId); } else { tsdbDebug("%p no data in mem, 0x%"PRIx64, pQueryHandle, pQueryHandle->qId); } + if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) || (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) { @@ -1190,6 +1268,7 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SBlock* p return code; } + // return error, add test cases if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) { return code; @@ -1452,40 +1531,125 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity return numOfRows + num; } -static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SMemRow row, - int32_t numOfCols, STable* pTable, STSchema* pSchema) { +// Note: row1 always has high priority +static void mergeTwoRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, + SMemRow row1, SMemRow row2, int32_t numOfCols, STable* pTable, + STSchema* pSchema1, STSchema* pSchema2, bool forceSetNull) { char* pData = NULL; + STSchema* pSchema; + SMemRow row; + int16_t colId; + int16_t offset; + + bool isRow1DataRow = isDataRow(row1); + bool isRow2DataRow; + bool isChosenRowDataRow; + int32_t chosen_itr; + void *value; - // the schema version info is embedded in SDataRow, and use latest schema version for SKVRow - int32_t numOfRowCols = 0; - if (pSchema == NULL) { - pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row)); - numOfRowCols = schemaNCols(pSchema); + // the schema version info is embeded in SDataRow + int32_t numOfColsOfRow1 = 0; + + if (pSchema1 == NULL) { + pSchema1 = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row1)); + } + if(isRow1DataRow) { + numOfColsOfRow1 = schemaNCols(pSchema1); } else { - numOfRowCols = schemaNCols(pSchema); + numOfColsOfRow1 = kvRowNCols(memRowKvBody(row1)); } - int32_t i = 0; + int32_t numOfColsOfRow2 = 0; + if(row2) { + isRow2DataRow = isDataRow(row2); + if (pSchema2 == NULL) { + pSchema2 = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row2)); + } + if(isRow2DataRow) { + numOfColsOfRow2 = schemaNCols(pSchema2); + } else { + numOfColsOfRow2 = kvRowNCols(memRowKvBody(row2)); + } + } - if (isDataRow(row)) { - SDataRow dataRow = memRowDataBody(row); - int32_t j = 0; - while (i < numOfCols && j < numOfRowCols) { - SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - if (pSchema->columns[j].colId < pColInfo->info.colId) { + + int32_t i = 0, j = 0, k = 0; + while(i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) { + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; + } else { + pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; + } + + int32_t colIdOfRow1; + if(j >= numOfColsOfRow1) { + colIdOfRow1 = INT32_MAX; + } else if(isRow1DataRow) { + colIdOfRow1 = pSchema1->columns[j].colId; + } else { + void *rowBody = memRowKvBody(row1); + SColIdx *pColIdx = kvRowColIdxAt(rowBody, j); + colIdOfRow1 = pColIdx->colId; + } + + int32_t colIdOfRow2; + if(k >= numOfColsOfRow2) { + colIdOfRow2 = INT32_MAX; + } else if(isRow2DataRow) { + colIdOfRow2 = pSchema2->columns[k].colId; + } else { + void *rowBody = memRowKvBody(row2); + SColIdx *pColIdx = kvRowColIdxAt(rowBody, k); + colIdOfRow2 = pColIdx->colId; + } + + if(colIdOfRow1 == colIdOfRow2) { + if(colIdOfRow1 < pColInfo->info.colId) { j++; + k++; continue; } - - if (ASCENDING_TRAVERSE(pQueryHandle->order)) { - pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; - } else { - pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; + row = row1; + pSchema = pSchema1; + isChosenRowDataRow = isRow1DataRow; + chosen_itr = j; + } else if(colIdOfRow1 < colIdOfRow2) { + if(colIdOfRow1 < pColInfo->info.colId) { + j++; + continue; } + row = row1; + pSchema = pSchema1; + isChosenRowDataRow = isRow1DataRow; + chosen_itr = j; + } else { + if(colIdOfRow2 < pColInfo->info.colId) { + k++; + continue; + } + row = row2; + pSchema = pSchema2; + chosen_itr = k; + isChosenRowDataRow = isRow2DataRow; + } + if(isChosenRowDataRow) { + colId = pSchema->columns[chosen_itr].colId; + offset = pSchema->columns[chosen_itr].offset; + void *rowBody = memRowDataBody(row); + value = tdGetRowDataOfCol(rowBody, (int8_t)pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + offset); + } else { + void *rowBody = memRowKvBody(row); + SColIdx *pColIdx = kvRowColIdxAt(rowBody, chosen_itr); + colId = pColIdx->colId; + offset = pColIdx->offset; + value = tdGetKvRowDataOfCol(rowBody, pColIdx->offset); + } + - if (pSchema->columns[j].colId == pColInfo->info.colId) { - void* value = - tdGetRowDataOfCol(dataRow, (int8_t)pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); + if (colId == pColInfo->info.colId) { + if(forceSetNull || (!isNull(value, (int8_t)pColInfo->info.type))) { switch (pColInfo->info.type) { case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: @@ -1495,19 +1659,19 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_UTINYINT: - *(uint8_t*)pData = *(uint8_t*)value; + *(uint8_t *)pData = *(uint8_t *)value; break; case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_USMALLINT: - *(uint16_t*)pData = *(uint16_t*)value; + *(uint16_t *)pData = *(uint16_t *)value; break; case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_UINT: - *(uint32_t*)pData = *(uint32_t*)value; + *(uint32_t *)pData = *(uint32_t *)value; break; case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_UBIGINT: - *(uint64_t*)pData = *(uint64_t*)value; + *(uint64_t *)pData = *(uint64_t *)value; break; case TSDB_DATA_TYPE_FLOAT: SET_FLOAT_PTR(pData, value); @@ -1517,121 +1681,54 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, break; case TSDB_DATA_TYPE_TIMESTAMP: if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { - *(TSKEY*)pData = tdGetKey(*(TKEY*)value); + *(TSKEY *)pData = tdGetKey(*(TKEY *)value); } else { - *(TSKEY*)pData = *(TSKEY*)value; + *(TSKEY *)pData = *(TSKEY *)value; } break; default: memcpy(pData, value, pColInfo->info.bytes); } + } + i++; + if(row == row1) { j++; - i++; - } else { // pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data + } else { + k++; + } + } else { + if(forceSetNull) { if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { setVardataNull(pData, pColInfo->info.type); } else { setNull(pData, pColInfo->info.type, pColInfo->info.bytes); } - i++; } + i++; } - } else if (isKvRow(row)) { - SKVRow kvRow = memRowKvBody(row); - int32_t k = 0; - int32_t nKvRowCols = kvRowNCols(kvRow); + } - while (i < numOfCols && k < nKvRowCols) { + if(forceSetNull) { + while (i < numOfCols) { // the remain columns are all null data SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - SColIdx* pColIdx = kvRowColIdxAt(kvRow, k); - - if (pColIdx->colId < pColInfo->info.colId) { - ++k; - continue; - } - if (ASCENDING_TRAVERSE(pQueryHandle->order)) { pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; } else { pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; } - if (pColIdx->colId == pColInfo->info.colId) { - // offset of pColIdx for SKVRow including the TD_KV_ROW_HEAD_SIZE - void* value = tdGetKvRowDataOfCol(kvRow, pColIdx->offset); - switch (pColInfo->info.type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - memcpy(pData, value, varDataTLen(value)); - break; - case TSDB_DATA_TYPE_NULL: - case TSDB_DATA_TYPE_BOOL: - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_UTINYINT: - *(uint8_t*)pData = *(uint8_t*)value; - break; - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_USMALLINT: - *(uint16_t*)pData = *(uint16_t*)value; - break; - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_UINT: - *(uint32_t*)pData = *(uint32_t*)value; - break; - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_UBIGINT: - *(uint64_t*)pData = *(uint64_t*)value; - break; - case TSDB_DATA_TYPE_FLOAT: - SET_FLOAT_PTR(pData, value); - break; - case TSDB_DATA_TYPE_DOUBLE: - SET_DOUBLE_PTR(pData, value); - break; - case TSDB_DATA_TYPE_TIMESTAMP: - if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { - *(TSKEY*)pData = tdGetKey(*(TKEY*)value); - } else { - *(TSKEY*)pData = *(TSKEY*)value; - } - break; - default: - memcpy(pData, value, pColInfo->info.bytes); - } - ++k; - ++i; - continue; - } - // If (pColInfo->info.colId < pColIdx->colId), it is NULL data if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { setVardataNull(pData, pColInfo->info.type); } else { setNull(pData, pColInfo->info.type, pColInfo->info.bytes); } - ++i; - } - } else { - ASSERT(0); - } - - while (i < numOfCols) { // the remain columns are all null data - SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - if (ASCENDING_TRAVERSE(pQueryHandle->order)) { - pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; - } else { - pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; - } - if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { - setVardataNull(pData, pColInfo->info.type); - } else { - setNull(pData, pColInfo->info.type, pColInfo->info.bytes); + i++; } - - i++; } } + static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols) { if (numOfRows == 0 || ASCENDING_TRAVERSE(pQueryHandle->order)) { return; @@ -1798,8 +1895,10 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* // compared with the data from in-memory buffer, to generate the correct timestamp array list int32_t numOfRows = 0; - int16_t rv = -1; - STSchema* pSchema = NULL; + int16_t rv1 = -1; + int16_t rv2 = -1; + STSchema* pSchema1 = NULL; + STSchema* pSchema2 = NULL; int32_t pos = cur->pos; cur->win = TSWINDOW_INITIALIZER; @@ -1811,12 +1910,13 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { SSkipListNode* node = NULL; do { - SMemRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update); - if (row == NULL) { + SMemRow row2 = NULL; + SMemRow row1 = getSMemRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update, &row2); + if (row1 == NULL) { break; } - TSKEY key = memRowKey(row); + TSKEY key = memRowKey(row1); if ((key > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) { break; @@ -1829,12 +1929,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { - if (rv != memRowVersion(row)) { - pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row)); - rv = memRowVersion(row); + if (rv1 != memRowVersion(row1)) { + pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); + rv1 = memRowVersion(row1); } - - copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema); + if(row2 && rv2 != memRowVersion(row2)) { + pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); + rv2 = memRowVersion(row2); + } + + mergeTwoRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, true); numOfRows += 1; if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; @@ -1847,12 +1951,20 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* moveToNextRowInMem(pCheckInfo); } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it if (pCfg->update) { - if (rv != memRowVersion(row)) { - pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row)); - rv = memRowVersion(row); + if(pCfg->update == TD_ROW_PARTIAL_UPDATE) { + doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, pos, pos); } - - copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema); + if (rv1 != memRowVersion(row1)) { + pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); + rv1 = memRowVersion(row1); + } + if(row2 && rv2 != memRowVersion(row2)) { + pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); + rv2 = memRowVersion(row2); + } + + bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE; + mergeTwoRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, forceSetNull); numOfRows += 1; if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; @@ -1877,7 +1989,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* assert(end != -1); if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it - if (!pCfg->update) { + if (pCfg->update == TD_ROW_DISCARD_UPDATE) { moveToNextRowInMem(pCheckInfo); } else { end -= step; @@ -2490,7 +2602,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int STSchema* pSchema = NULL; do { - SMemRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update); + SMemRow row = getSMemRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update, NULL); if (row == NULL) { break; } @@ -2512,7 +2624,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row)); rv = memRowVersion(row); } - copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable, pSchema); + mergeTwoRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pTable, pSchema, NULL, true); if (++numOfRows >= maxRowsToRead) { moveToNextRowInMem(pCheckInfo); @@ -2637,7 +2749,7 @@ static bool loadCachedLastRow(STsdbQueryHandle* pQueryHandle) { if (ret != TSDB_CODE_SUCCESS) { return false; } - copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL); + mergeTwoRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->pTableObj, NULL, NULL, true); tfree(pRow); // update the last key value @@ -2920,7 +3032,7 @@ static int32_t doGetExternalRow(STsdbQueryHandle* pQueryHandle, int16_t type, SM } } - SArray* row = (type == TSDB_PREV_ROW)? pQueryHandle->prev:pQueryHandle->next; + SArray* row = (type == TSDB_PREV_ROW)? pQueryHandle->prev : pQueryHandle->next; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); @@ -3787,10 +3899,6 @@ static void* doFreeColumnInfoData(SArray* pColumnInfoData) { } static void* destroyTableCheckInfo(SArray* pTableCheckInfo) { - if (pTableCheckInfo == NULL) { - return NULL; - } - size_t size = taosArrayGetSize(pTableCheckInfo); for (int32_t i = 0; i < size; ++i) { STableCheckInfo* p = taosArrayGet(pTableCheckInfo, i); @@ -3834,6 +3942,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next); SIOCostSummary* pCost = &pQueryHandle->cost; + tsdbDebug("%p :io-cost summary: head-file read cnt:%"PRIu64", head-file time:%"PRIu64" us, statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, 0x%"PRIx64, pQueryHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pQueryHandle->qId); diff --git a/src/tsdb/src/tsdbReadImpl.c b/src/tsdb/src/tsdbReadImpl.c index dd14dc700f85c3126411cf4613651463745f71d5..666a2d357144431093855479f30a4feefbb9ab3b 100644 --- a/src/tsdb/src/tsdbReadImpl.c +++ b/src/tsdb/src/tsdbReadImpl.c @@ -244,6 +244,7 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) { int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { ASSERT(pBlock->numOfSubBlocks > 0); + int8_t update = pReadh->pRepo->config.update; SBlock *iBlock = pBlock; if (pBlock->numOfSubBlocks > 1) { @@ -258,7 +259,7 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { for (int i = 1; i < pBlock->numOfSubBlocks; i++) { iBlock++; if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1; - if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL) < 0) return -1; + if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, update != TD_ROW_PARTIAL_UPDATE) < 0) return -1; } ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows); @@ -270,6 +271,7 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int16_t *colIds, int numOfColsIds) { ASSERT(pBlock->numOfSubBlocks > 0); + int8_t update = pReadh->pRepo->config.update; SBlock *iBlock = pBlock; if (pBlock->numOfSubBlocks > 1) { @@ -284,7 +286,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, for (int i = 1; i < pBlock->numOfSubBlocks; i++) { iBlock++; if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1; - if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL) < 0) return -1; + if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, update != TD_ROW_PARTIAL_UPDATE) < 0) return -1; } ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows); @@ -657,4 +659,4 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc } return 0; -} \ No newline at end of file +} diff --git a/src/tsdb/src/tsdbRowMergeBuf.c b/src/tsdb/src/tsdbRowMergeBuf.c new file mode 100644 index 0000000000000000000000000000000000000000..5ce580f70f257ae2a0c00865b5bf54fec0f2b14f --- /dev/null +++ b/src/tsdb/src/tsdbRowMergeBuf.c @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdbRowMergeBuf.h" +#include "tdataformat.h" + +// row1 has higher priority +SMemRow tsdbMergeTwoRows(SMergeBuf *pBuf, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2) { + if(row2 == NULL) return row1; + if(row1 == NULL) return row2; + ASSERT(pSchema1->version == memRowVersion(row1)); + ASSERT(pSchema2->version == memRowVersion(row2)); + + if(tsdbMergeBufMakeSureRoom(pBuf, pSchema1, pSchema2) < 0) { + return NULL; + } + return mergeTwoMemRows(*pBuf, row1, row2, pSchema1, pSchema2); +} diff --git a/src/util/inc/tfunctional.h b/src/util/inc/tfunctional.h new file mode 100644 index 0000000000000000000000000000000000000000..70f54e921da3166645bd43f4e3fe58bc144fe714 --- /dev/null +++ b/src/util/inc/tfunctional.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef TD_TFUNCTIONAL_H +#define TD_TFUNCTIONAL_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "os.h" + +//TODO: hard to use, trying to rewrite it using va_list + +typedef void* (*GenericVaFunc)(void* args[]); +typedef int32_t (*I32VaFunc) (void* args[]); +typedef void (*VoidVaFunc) (void* args[]); + +typedef struct GenericSavedFunc { + GenericVaFunc func; + void * args[]; +} tGenericSavedFunc; + +typedef struct I32SavedFunc { + I32VaFunc func; + void * args[]; +} tI32SavedFunc; + +typedef struct VoidSavedFunc { + VoidVaFunc func; + void * args[]; +} tVoidSavedFunc; + +tGenericSavedFunc* genericSavedFuncInit(GenericVaFunc func, int numOfArgs); +tI32SavedFunc* i32SavedFuncInit(I32VaFunc func, int numOfArgs); +tVoidSavedFunc* voidSavedFuncInit(VoidVaFunc func, int numOfArgs); +void* genericInvoke(tGenericSavedFunc* const pSavedFunc); +int32_t i32Invoke(tI32SavedFunc* const pSavedFunc); +void voidInvoke(tVoidSavedFunc* const pSavedFunc); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/util/inc/tskiplist.h b/src/util/inc/tskiplist.h index 17f5940b4998c8a5b5e277e153cf7485c3cd6129..d9dc001ccd072e9803d7241bd7e0e3e029a77b3e 100644 --- a/src/util/inc/tskiplist.h +++ b/src/util/inc/tskiplist.h @@ -23,6 +23,7 @@ extern "C" { #include "os.h" #include "taosdef.h" #include "tarray.h" +#include "tfunctional.h" #define MAX_SKIP_LIST_LEVEL 15 #define SKIP_LIST_RECORD_PERFORMANCE 0 @@ -30,13 +31,17 @@ extern "C" { // For key property setting #define SL_ALLOW_DUP_KEY (uint8_t)0x0 // Allow duplicate key exists (for tag index usage) #define SL_DISCARD_DUP_KEY (uint8_t)0x1 // Discard duplicate key (for data update=0 case) -#define SL_UPDATE_DUP_KEY (uint8_t)0x2 // Update duplicate key by remove/insert (for data update=1 case) +#define SL_UPDATE_DUP_KEY (uint8_t)0x2 // Update duplicate key by remove/insert (for data update!=0 case) + // For thread safety setting #define SL_THREAD_SAFE (uint8_t)0x4 typedef char *SSkipListKey; typedef char *(*__sl_key_fn_t)(const void *); +typedef void (*sl_patch_row_fn_t)(void * pDst, const void * pSrc); +typedef void* (*iter_next_fn_t)(void *iter); + typedef struct SSkipListNode { uint8_t level; void * pData; @@ -95,6 +100,12 @@ typedef struct tSkipListState { uint64_t nTotalElapsedTimeForInsert; } tSkipListState; +typedef enum { + SSkipListPutSuccess = 0, + SSkipListPutEarlyStop = 1, + SSkipListPutSkipOne = 2 +} SSkipListPutStatus; + typedef struct SSkipList { unsigned int seed; __compar_fn_t comparFn; @@ -111,6 +122,7 @@ typedef struct SSkipList { #if SKIP_LIST_RECORD_PERFORMANCE tSkipListState state; // skiplist state #endif + tGenericSavedFunc* insertHandleFn; } SSkipList; typedef struct SSkipListIterator { @@ -118,7 +130,7 @@ typedef struct SSkipListIterator { SSkipListNode *cur; int32_t step; // the number of nodes that have been checked already int32_t order; // order of the iterator - SSkipListNode *next; // next points to the true qualified node in skip list + SSkipListNode *next; // next points to the true qualified node in skiplist } SSkipListIterator; #define SL_IS_THREAD_SAFE(s) (((s)->flags) & SL_THREAD_SAFE) @@ -132,7 +144,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _ __sl_key_fn_t fn); void tSkipListDestroy(SSkipList *pSkipList); SSkipListNode * tSkipListPut(SSkipList *pSkipList, void *pData); -void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata); +void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t iterate); SArray * tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey); void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel); SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList); diff --git a/src/util/src/tfunctional.c b/src/util/src/tfunctional.c new file mode 100644 index 0000000000000000000000000000000000000000..c470a2b8aefc11141c9125e60c1c45fcbb949f09 --- /dev/null +++ b/src/util/src/tfunctional.c @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tfunctional.h" +#include "tarray.h" + + +tGenericSavedFunc* genericSavedFuncInit(GenericVaFunc func, int numOfArgs) { + tGenericSavedFunc* pSavedFunc = malloc(sizeof(tGenericSavedFunc) + numOfArgs * (sizeof(void*))); + pSavedFunc->func = func; + return pSavedFunc; +} + +tI32SavedFunc* i32SavedFuncInit(I32VaFunc func, int numOfArgs) { + tI32SavedFunc* pSavedFunc = malloc(sizeof(tI32SavedFunc) + numOfArgs * sizeof(void *)); + pSavedFunc->func = func; + return pSavedFunc; +} + +tVoidSavedFunc* voidSavedFuncInit(VoidVaFunc func, int numOfArgs) { + tVoidSavedFunc* pSavedFunc = malloc(sizeof(tVoidSavedFunc) + numOfArgs * sizeof(void*)); + pSavedFunc->func = func; + return pSavedFunc; +} + +FORCE_INLINE void* genericInvoke(tGenericSavedFunc* const pSavedFunc) { + return pSavedFunc->func(pSavedFunc->args); +} + +FORCE_INLINE int32_t i32Invoke(tI32SavedFunc* const pSavedFunc) { + return pSavedFunc->func(pSavedFunc->args); +} + +FORCE_INLINE void voidInvoke(tVoidSavedFunc* const pSavedFunc) { + if(pSavedFunc) pSavedFunc->func(pSavedFunc->args); +} diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 082b454bb59abb6f43008a63078f665fd38ff32d..b464519ba66776ba13ce2964070d19a2a4430bfb 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -16,6 +16,7 @@ #include "tskiplist.h" #include "os.h" #include "tcompare.h" +#include "tdataformat.h" #include "tulog.h" #include "tutil.h" @@ -31,6 +32,7 @@ static SSkipListNode *tSkipListNewNode(uint8_t level); static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipListNode **direction, bool isForward, bool hasDup); + static FORCE_INLINE int tSkipListWLock(SSkipList *pSkipList); static FORCE_INLINE int tSkipListRLock(SSkipList *pSkipList); static FORCE_INLINE int tSkipListUnlock(SSkipList *pSkipList); @@ -80,6 +82,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _ #if SKIP_LIST_RECORD_PERFORMANCE pSkipList->state.nTotalMemSize += sizeof(SSkipList); #endif + pSkipList->insertHandleFn = NULL; return pSkipList; } @@ -97,6 +100,8 @@ void tSkipListDestroy(SSkipList *pSkipList) { tSkipListFreeNode(pTemp); } + tfree(pSkipList->insertHandleFn); + tSkipListUnlock(pSkipList); if (pSkipList->lock != NULL) { pthread_rwlock_destroy(pSkipList->lock); @@ -124,8 +129,7 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) { return pNode; } -// Put a batch of data into skiplist. The batch of data must be in ascending order -void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) { +void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t iterate) { SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0}; SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; bool hasDup = false; @@ -135,17 +139,21 @@ void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) { tSkipListWLock(pSkipList); + void* pData = iterate(iter); + if(pData == NULL) return; + // backward to put the first data - hasDup = tSkipListGetPosToPut(pSkipList, backward, ppData[0]); - tSkipListPutImpl(pSkipList, ppData[0], backward, false, hasDup); + hasDup = tSkipListGetPosToPut(pSkipList, backward, pData); + + tSkipListPutImpl(pSkipList, pData, backward, false, hasDup); for (int level = 0; level < pSkipList->maxLevel; level++) { forward[level] = SL_NODE_GET_BACKWARD_POINTER(backward[level], level); } // forward to put the rest of data - for (int idata = 1; idata < ndata; idata++) { - pDataKey = pSkipList->keyFn(ppData[idata]); + while ((pData = iterate(iter)) != NULL) { + pDataKey = pSkipList->keyFn(pData); hasDup = false; // Compare max key @@ -186,9 +194,8 @@ void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) { } } - tSkipListPutImpl(pSkipList, ppData[idata], forward, true, hasDup); + tSkipListPutImpl(pSkipList, pData, forward, true, hasDup); } - tSkipListUnlock(pSkipList); } @@ -661,18 +668,40 @@ static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipL uint8_t dupMode = SL_DUP_MODE(pSkipList); SSkipListNode *pNode = NULL; - if (hasDup && (dupMode == SL_DISCARD_DUP_KEY || dupMode == SL_UPDATE_DUP_KEY)) { + if (hasDup && (dupMode != SL_ALLOW_DUP_KEY)) { if (dupMode == SL_UPDATE_DUP_KEY) { if (isForward) { pNode = SL_NODE_GET_FORWARD_POINTER(direction[0], 0); } else { pNode = SL_NODE_GET_BACKWARD_POINTER(direction[0], 0); } - atomic_store_ptr(&(pNode->pData), pData); + if (pSkipList->insertHandleFn) { + pSkipList->insertHandleFn->args[0] = pData; + pSkipList->insertHandleFn->args[1] = pNode->pData; + pData = genericInvoke(pSkipList->insertHandleFn); + } + if(pData) { + atomic_store_ptr(&(pNode->pData), pData); + } + } else { + //for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows! + if(pSkipList->insertHandleFn) { + pSkipList->insertHandleFn->args[0] = NULL; + pSkipList->insertHandleFn->args[1] = NULL; + genericInvoke(pSkipList->insertHandleFn); + } } } else { pNode = tSkipListNewNode(getSkipListRandLevel(pSkipList)); if (pNode != NULL) { + // insertHandleFn will be assigned only for timeseries data, + // in which case, pData is pointed to an memory to be freed later; + // while for metadata, the mem alloc will not be called. + if (pSkipList->insertHandleFn) { + pSkipList->insertHandleFn->args[0] = pData; + pSkipList->insertHandleFn->args[1] = NULL; + pData = genericInvoke(pSkipList->insertHandleFn); + } pNode->pData = pData; tSkipListDoInsert(pSkipList, direction, pNode, isForward);