diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 11d0f34c4560b0399acac2a455836e7ed9326c9e..e94be797b950be4e4899ac373009543ba862564e 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -372,10 +372,9 @@ static FORCE_INLINE void tdCopyColOfRowBySchema(SDataRow dst, STSchema *pDstSche // ----------------- Data column structure // SDataCol arrangement: data => bitmap => dataOffset typedef struct SDataCol { - int8_t type; // column type - uint8_t bitmap : 1; // 0: no bitmap if all rows are NORM, 1: has bitmap if has NULL/NORM rows - uint8_t bitmapMode : 1; // default is 0(2 bits), otherwise 1(1 bit) - uint8_t reserve : 6; + int8_t type; // column type + uint8_t bitmap : 1; // 0: no bitmap if all rows are NORM, 1: has bitmap if has NULL/NORM rows + uint8_t reserve : 7; int16_t colId; // column ID int32_t bytes; // column data bytes defined int32_t offset; // data offset in a SDataRow (including the header size) @@ -387,8 +386,6 @@ typedef struct SDataCol { TSKEY ts; // only used in last NULL column } SDataCol; - - #define isAllRowsNull(pCol) ((pCol)->len == 0) #define isAllRowsNone(pCol) ((pCol)->len == 0) static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } @@ -482,7 +479,7 @@ void tdResetDataCols(SDataCols *pCols); int32_t tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); SDataCols *tdFreeDataCols(SDataCols *pCols); -int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool forceSetNull, TDRowVerT maxVer); +int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool update, TDRowVerT maxVer); // ----------------- K-V data row structure /* |<-------------------------------------- len -------------------------------------------->| diff --git a/include/common/trow.h b/include/common/trow.h index f87ea1f00926cf480390f82b515708b4cab18dc3..0d34c6e49fa8c2d0dece7e234652a39d43e6eb8b 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -42,7 +42,7 @@ extern "C" { * @brief value type * - for data from client input and STSRow in memory, 3 types of value none/null/norm available */ -#define TD_VTYPE_NORM 0x00U // normal val: not none, not null(no need assign value) +#define TD_VTYPE_NORM 0x00U // normal val: not none, not null #define TD_VTYPE_NULL 0x01U // null val #define TD_VTYPE_NONE 0x02U // none or unknown/undefined #define TD_VTYPE_MAX 0x03U // @@ -140,8 +140,6 @@ typedef struct { }; /// row total length uint32_t len; - /// row version - // uint64_t ver; /// the inline data, maybe a tuple or a k-v tuple char data[]; } STSRow; @@ -241,13 +239,13 @@ static FORCE_INLINE int32_t tdGetBitmapValType(const void *pBitmap, int16_t colI static FORCE_INLINE bool tdIsBitmapValTypeNorm(const void *pBitmap, int16_t idx, int8_t bitmapMode); bool tdIsBitmapBlkNorm(const void *pBitmap, int32_t numOfBits, int8_t bitmapMode); int32_t tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int32_t numOfRows, int32_t maxPoints, - int8_t bitmapMode); + int8_t bitmapMode, bool isMerge); static FORCE_INLINE int32_t tdAppendColValToTpRow(SRowBuilder *pBuilder, TDRowValT valType, const void *val, bool isCopyVarData, int8_t colType, int16_t colIdx, int32_t offset); static FORCE_INLINE int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowValT valType, const void *val, bool isCopyVarData, int8_t colType, int16_t colIdx, int32_t offset, col_id_t colId); -int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols); +int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge); /** * @brief @@ -718,6 +716,7 @@ static int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) { terrno = TSDB_CODE_INVALID_PARA; return terrno; } + return TSDB_CODE_SUCCESS; } diff --git a/include/util/tdef.h b/include/util/tdef.h index 316640797be355e053117a3e8db7b3de5009fce6..0ab277e0c3dfe8190826847cd02b889d4852f6d0 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -29,6 +29,8 @@ extern "C" { #define TSKEY_MAX (INT64_MAX - 1) #define TSKEY_INITIAL_VAL TSKEY_MIN +#define TD_VER_MAX UINT64_MAX // TODO: use the real max version from query handle + // Bytes for each type. extern const int32_t TYPE_BYTES[15]; diff --git a/source/common/src/trow.c b/source/common/src/trow.c index 7157c1e0f0c2ec3953eacc73d07ae979b6b694b7..50e51a40ddac64f5b5daa5461fb9a598f7c821e5 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -24,7 +24,8 @@ const uint8_t tdVTypeByte[2][3] = {{ }, { // 1 bit - TD_VTYPE_NORM_BYTE_I, TD_VTYPE_NULL_BYTE_I, + TD_VTYPE_NORM_BYTE_I, // normal + TD_VTYPE_NULL_BYTE_I, TD_VTYPE_NULL_BYTE_I, // padding } @@ -33,6 +34,24 @@ const uint8_t tdVTypeByte[2][3] = {{ // declaration static uint8_t tdGetBitmapByte(uint8_t byte); +// static void dataColSetNEleNull(SDataCol *pCol, int nEle); + +/** + * @brief src2 data has more priority than src1 + * + * @param target + * @param src1 + * @param iter1 + * @param limit1 + * @param src2 + * @param iter2 + * @param limit2 + * @param tRows + * @param update + */ +static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, + int limit2, int tRows, bool update); + // implementation /** * @brief Compress bitmap bytes comprised of 2-bits to counterpart of 1-bit. @@ -229,23 +248,23 @@ static uint8_t tdGetMergedBitmapByte(uint8_t byte) { void tdMergeBitmap(uint8_t *srcBitmap, int32_t nBits, uint8_t *dstBitmap) { int32_t i = 0, j = 0; int32_t nBytes = TD_BITMAP_BYTES(nBits); - int32_t nStrictBytes = nBits / 4; - int32_t nPartialBits = nBits - nStrictBytes * 4; + int32_t nRoundBytes = nBits / 4; + int32_t nRemainderBits = nBits - nRoundBytes * 4; - switch (nPartialBits) { + switch (nRemainderBits) { case 0: // NOTHING TODO break; case 1: { - void *lastByte = POINTER_SHIFT(srcBitmap, nStrictBytes); + void *lastByte = POINTER_SHIFT(srcBitmap, nRoundBytes); *(uint8_t *)lastByte &= 0xC0; } break; case 2: { - void *lastByte = POINTER_SHIFT(srcBitmap, nStrictBytes); + void *lastByte = POINTER_SHIFT(srcBitmap, nRoundBytes); *(uint8_t *)lastByte &= 0xF0; } break; case 3: { - void *lastByte = POINTER_SHIFT(srcBitmap, nStrictBytes); + void *lastByte = POINTER_SHIFT(srcBitmap, nRoundBytes); *(uint8_t *)lastByte &= 0xFC; } break; default: @@ -266,10 +285,6 @@ void tdMergeBitmap(uint8_t *srcBitmap, int32_t nBits, uint8_t *dstBitmap) { } } -// static void dataColSetNEleNull(SDataCol *pCol, int nEle); -static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, - int limit2, int tRows, bool forceSetNull); - static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index, bool setBitmap, int8_t bitmapMode) { if (IS_VAR_DATA_TYPE(pCol->type)) { pCol->dataOff[index] = pCol->len; @@ -329,7 +344,7 @@ bool tdIsBitmapBlkNorm(const void *pBitmap, int32_t numOfBits, int8_t bitmapMode if (*((uint8_t *)pBitmap) != vTypeByte) { return false; } - pBitmap = POINTER_SHIFT(pBitmap, 1); + pBitmap = POINTER_SHIFT(pBitmap, i); } int32_t nLeft = numOfBits - nBytes * (bitmapMode == 0 ? TD_VTYPE_BITS : TD_VTYPE_BITS_I); @@ -410,11 +425,12 @@ STSRow *tdRowDup(STSRow *row) { * @param val * @param numOfRows * @param maxPoints - * @param bitmapMode default is 0(2 bits), otherwise 1(1 bit) + * @param bitmapMode default is 0(2 bits), otherwise 1(1 bit) + * @param isMerge merge to current row * @return int */ int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int numOfRows, int maxPoints, - int8_t bitmapMode) { + int8_t bitmapMode, bool isMerge) { TASSERT(pCol != NULL); // Assume that the columns not specified during insert/upsert mean None. @@ -430,33 +446,58 @@ int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int dataColSetNEleNone(pCol, numOfRows, bitmapMode); } } - if (!tdValTypeIsNorm(valType)) { + const void *value = val; + if (!tdValTypeIsNorm(valType) || !val) { // TODO: // 1. back compatibility and easy to debug with codes of 2.0 to save NULL values. // 2. later on, considering further optimization, don't save Null/None for VarType. - val = getNullValue(pCol->type); + value = getNullValue(pCol->type); } - if (IS_VAR_DATA_TYPE(pCol->type)) { - // set offset - pCol->dataOff[numOfRows] = pCol->len; - // Copy data - memcpy(POINTER_SHIFT(pCol->pData, pCol->len), val, varDataTLen(val)); - // Update the length - pCol->len += varDataTLen(val); - } else { - ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows); - memcpy(POINTER_SHIFT(pCol->pData, pCol->len), val, pCol->bytes); - pCol->len += pCol->bytes; + if (!isMerge) { + if (IS_VAR_DATA_TYPE(pCol->type)) { + // set offset + pCol->dataOff[numOfRows] = pCol->len; + // Copy data + memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value)); + // Update the length + pCol->len += varDataTLen(value); + } else { + ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows); + memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes); + pCol->len += pCol->bytes; + } + } else if (!tdValTypeIsNone(valType)) { + if (IS_VAR_DATA_TYPE(pCol->type)) { + // keep the last offset + // discard the last var data + int32_t lastVarLen = varDataTLen(POINTER_SHIFT(pCol->pData, pCol->dataOff[numOfRows])); + pCol->len -= lastVarLen; + // Copy data + memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value)); + // Update the length + pCol->len += varDataTLen(value); + } else { + ASSERT(pCol->len - TYPE_BYTES[pCol->type] == TYPE_BYTES[pCol->type] * numOfRows); + memcpy(POINTER_SHIFT(pCol->pData, pCol->len - TYPE_BYTES[pCol->type]), value, pCol->bytes); + } } + #ifdef TD_SUPPORT_BITMAP - tdSetBitmapValType(pCol->pBitmap, numOfRows, valType, bitmapMode); + if (!isMerge || !tdValTypeIsNone(valType)) { + tdSetBitmapValType(pCol->pBitmap, numOfRows, valType, bitmapMode); + } #endif return 0; } // internal -static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols) { +static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge) { +#if 0 ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_KEY(pRow)); +#endif + + // Multi-Version rows with the same key and different versions supported + ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) <= TD_ROW_KEY(pRow)); int rcol = 1; int dcol = 1; @@ -464,12 +505,14 @@ static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols SDataCol *pDataCol = &(pCols->cols[0]); ASSERT(pDataCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID); - tdAppendValToDataCol(pDataCol, TD_VTYPE_NORM, &pRow->ts, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode); + tdAppendValToDataCol(pDataCol, TD_VTYPE_NORM, &pRow->ts, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, + isMerge); while (dcol < pCols->numOfCols) { pDataCol = &(pCols->cols[dcol]); if (rcol >= schemaNCols(pSchema)) { - tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode); + tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, + isMerge); ++dcol; continue; } @@ -480,22 +523,26 @@ static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols if (tdGetTpRowValOfCol(&sVal, pRow, pBitmap, pRowCol->type, pRowCol->offset - sizeof(TSKEY), rcol - 1) < 0) { return terrno; } - tdAppendValToDataCol(pDataCol, sVal.valType, sVal.val, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode); + tdAppendValToDataCol(pDataCol, sVal.valType, sVal.val, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, + isMerge); ++dcol; ++rcol; } else if (pRowCol->colId < pDataCol->colId) { ++rcol; } else { - tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode); + tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, + isMerge); ++dcol; } } +#if 0 ++pCols->numOfRows; +#endif return TSDB_CODE_SUCCESS; } // internal -static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols) { +static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge) { ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_KEY(pRow)); int rcol = 0; @@ -506,12 +553,14 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols SDataCol *pDataCol = &(pCols->cols[0]); ASSERT(pDataCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID); - tdAppendValToDataCol(pDataCol, TD_VTYPE_NORM, &pRow->ts, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode); + tdAppendValToDataCol(pDataCol, TD_VTYPE_NORM, &pRow->ts, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, + isMerge); while (dcol < pCols->numOfCols) { pDataCol = &(pCols->cols[dcol]); if (rcol >= tRowCols || rcol >= tSchemaCols) { - tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode); + tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, + isMerge); ++dcol; continue; } @@ -527,17 +576,21 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols if (tdGetKvRowValOfCol(&sVal, pRow, pBitmap, pIdx->offset, colIdx) < 0) { return terrno; } - tdAppendValToDataCol(pDataCol, sVal.valType, sVal.val, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode); + tdAppendValToDataCol(pDataCol, sVal.valType, sVal.val, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, + isMerge); ++dcol; ++rcol; } else if (pIdx->colId < pDataCol->colId) { ++rcol; } else { - tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode); + tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, + isMerge); ++dcol; } } +#if 0 ++pCols->numOfRows; +#endif return TSDB_CODE_SUCCESS; } @@ -548,20 +601,30 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols * @param pRow * @param pSchema * @param pCols - * @param forceSetNull */ -int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols) { +int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge) { if (TD_IS_TP_ROW(pRow)) { - return tdAppendTpRowToDataCol(pRow, pSchema, pCols); + return tdAppendTpRowToDataCol(pRow, pSchema, pCols, isMerge); } else if (TD_IS_KV_ROW(pRow)) { - return tdAppendKvRowToDataCol(pRow, pSchema, pCols); + return tdAppendKvRowToDataCol(pRow, pSchema, pCols, isMerge); } else { ASSERT(0); } return TSDB_CODE_SUCCESS; } -int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull, +/** + * @brief source data has more priority than target + * + * @param target + * @param source + * @param rowsToMerge + * @param pOffset + * @param update + * @param maxVer + * @return int + */ +int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool update, TDRowVerT maxVer) { ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); ASSERT(target->numOfCols == source->numOfCols); @@ -576,17 +639,38 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int * if ((target->numOfRows == 0) || (dataColsKeyLast(target) < dataColsKeyAtRow(source, *pOffset))) { // No overlap ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints); // TODO: filter the maxVer - for (int i = 0; i < rowsToMerge; i++) { + TSKEY lastKey = TSKEY_INITIAL_VAL; + for (int i = 0; i < rowsToMerge; ++i) { + bool merge = false; for (int j = 0; j < source->numOfCols; j++) { if (source->cols[j].len > 0 || target->cols[j].len > 0) { SCellVal sVal = {0}; if (tdGetColDataOfRow(&sVal, source->cols + j, i + (*pOffset), source->bitmapMode) < 0) { TASSERT(0); } + + if (j == 0) { + if (lastKey == *(TSKEY *)sVal.val) { + if (!update) { + break; + } + merge = true; + } else if (lastKey != TSKEY_INITIAL_VAL) { + ++target->numOfRows; + } + + lastKey = *(TSKEY *)sVal.val; + } + if (i == 0) { + (target->cols + j)->bitmap = (source->cols + j)->bitmap; + } + tdAppendValToDataCol(target->cols + j, sVal.valType, sVal.val, target->numOfRows, target->maxPoints, - target->bitmapMode); + target->bitmapMode, merge); } } + } + if (lastKey != TSKEY_INITIAL_VAL) { ++target->numOfRows; } (*pOffset) += rowsToMerge; @@ -596,7 +680,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int * int iter1 = 0; tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, pOffset, source->numOfRows, - pTarget->numOfRows + rowsToMerge, forceSetNull); + pTarget->numOfRows + rowsToMerge, update); } tdFreeDataCols(pTarget); @@ -607,67 +691,95 @@ _err: return -1; } -// src2 data has more priority than src1 +static void tdAppendValToDataCols(SDataCols *target, SDataCols *src, int iter, bool isMerge) { + for (int i = 0; i < src->numOfCols; ++i) { + ASSERT(target->cols[i].type == src->cols[i].type); + if (src->cols[i].len > 0 || target->cols[i].len > 0) { + SCellVal sVal = {0}; + if (tdGetColDataOfRow(&sVal, src->cols + i, iter, src->bitmapMode) < 0) { + TASSERT(0); + } + if (isMerge) { + if (!tdValTypeIsNone(sVal.valType)) { + tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints, + target->bitmapMode, isMerge); + } else { + // Keep the origin value for None + } + } else { + tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints, + target->bitmapMode, isMerge); + } + } + } +} +/** + * @brief src2 data has more priority than src1 + * + * @param target + * @param src1 + * @param iter1 + * @param limit1 + * @param src2 + * @param iter2 + * @param limit2 + * @param tRows + * @param update + */ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, - int limit2, int tRows, bool forceSetNull) { + int limit2, int tRows, bool update) { tdResetDataCols(target); + target->bitmapMode = src1->bitmapMode; ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows); + int32_t nRows = 0; - while (target->numOfRows < tRows) { + // TODO: filter the maxVer + // TODO: handle the delete function + TSKEY lastKey = TSKEY_INITIAL_VAL; + while (nRows < tRows) { if (*iter1 >= limit1 && *iter2 >= limit2) break; TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : dataColsKeyAt(src1, *iter1); - TKEY tkey1 = (*iter1 >= limit1) ? TKEY_NULL : dataColsTKeyAt(src1, *iter1); + // TKEY tkey1 = (*iter1 >= limit1) ? TKEY_NULL : dataColsTKeyAt(src1, *iter1); TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : dataColsKeyAt(src2, *iter2); // TKEY tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2); - ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1))); - // TODO: filter the maxVer - if (key1 < key2) { - for (int i = 0; i < src1->numOfCols; ++i) { - ASSERT(target->cols[i].type == src1->cols[i].type); - if (src1->cols[i].len > 0 || target->cols[i].len > 0) { - SCellVal sVal = {0}; - if (tdGetColDataOfRow(&sVal, src1->cols + i, *iter1, src1->bitmapMode) < 0) { - TASSERT(0); - } - tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints, - target->bitmapMode); + // ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1))); + + if (key1 <= key2) { + // select key1 if not delete + if (update && (lastKey == key1)) { + tdAppendValToDataCols(target, src1, *iter1, true); + } else if (lastKey != key1) { + if (lastKey != TSKEY_INITIAL_VAL) { + ++target->numOfRows; } + tdAppendValToDataCols(target, src1, *iter1, false); } - - ++target->numOfRows; + ++nRows; ++(*iter1); - } else if (key1 >= key2) { - // TODO: filter the maxVer - if ((key1 > key2) || ((key1 == key2) && !TKEY_IS_DELETED(key2))) { - for (int i = 0; i < src2->numOfCols; ++i) { - SCellVal sVal = {0}; - ASSERT(target->cols[i].type == src2->cols[i].type); - if (tdGetColDataOfRow(&sVal, src2->cols + i, *iter2, src2->bitmapMode) < 0) { - TASSERT(0); - } - if (src2->cols[i].len > 0 && !tdValTypeIsNull(sVal.valType)) { - tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints, - target->bitmapMode); - } else if (!forceSetNull && key1 == key2 && src1->cols[i].len > 0) { - if (tdGetColDataOfRow(&sVal, src1->cols + i, *iter1, src1->bitmapMode) < 0) { - TASSERT(0); - } - tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints, - target->bitmapMode); - } else if (target->cols[i].len > 0) { - dataColSetNullAt(&target->cols[i], target->numOfRows, true, target->bitmapMode); - } + lastKey = key1; + } else { + // use key2 if not deleted + // TODO: handle the delete function + if (update && (lastKey == key2)) { + tdAppendValToDataCols(target, src2, *iter2, true); + } else if (lastKey != key2) { + if (lastKey != TSKEY_INITIAL_VAL) { + ++target->numOfRows; } - ++target->numOfRows; + tdAppendValToDataCols(target, src2, *iter2, false); } + ++nRows; ++(*iter2); - if (key1 == key2) ++(*iter1); + lastKey = key2; } - ASSERT(target->numOfRows <= target->maxPoints); + ASSERT(target->numOfRows <= target->maxPoints - 1); + } + if (lastKey != TSKEY_INITIAL_VAL) { + ++target->numOfRows; } } @@ -777,7 +889,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { pRet->sversion = pDataCols->sversion; if (keepData) pRet->numOfRows = pDataCols->numOfRows; - for (int i = 0; i < pDataCols->numOfCols; i++) { + for (int i = 0; i < pDataCols->numOfCols; ++i) { pRet->cols[i].type = pDataCols->cols[i].type; pRet->cols[i].bitmap = pDataCols->cols[i].bitmap; pRet->cols[i].colId = pDataCols->cols[i].colId; @@ -797,8 +909,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, dataOffSize); } if (!TD_COL_ROWS_NORM(pRet->cols + i)) { - int32_t nBitmapBytes = (int32_t)TD_BITMAP_BYTES(pDataCols->numOfRows); - memcpy(pRet->cols[i].pBitmap, pDataCols->cols[i].pBitmap, nBitmapBytes); + memcpy(pRet->cols[i].pBitmap, pDataCols->cols[i].pBitmap, TD_BITMAP_BYTES(pDataCols->numOfRows)); } } } diff --git a/source/common/src/ttypes.c b/source/common/src/ttypes.c index e3d67cd488364172f3596460d142038d844f740a..3fd2ef8e730d64327f4a75448743f07248e49de3 100644 --- a/source/common/src/ttypes.c +++ b/source/common/src/ttypes.c @@ -557,7 +557,7 @@ static const void *nullValues[] = { }; const void *getNullValue(int32_t type) { - assert(type >= TSDB_DATA_TYPE_BOOL && type <= TSDB_DATA_TYPE_UBIGINT); + assert(type >= TSDB_DATA_TYPE_BOOL && type <= TSDB_DATA_TYPE_UBIGINT); // TODO: extend the types return nullValues[type - 1]; } diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 2716692bd71d57fb4ebdc24c79dc9c968abee3ac..d84c1b1d0721706c6d2b87c049ff80578c352d44 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -273,7 +273,8 @@ typedef enum { typedef struct { uint8_t last : 1; - uint8_t blkVer : 7; + uint8_t hasDupKey : 1; // 0: no dup TS key, 1: has dup TS key(since supporting Multi-Version) + uint8_t blkVer : 6; uint8_t numOfSubBlocks; col_id_t numOfCols; // not including timestamp column uint32_t len; // data block length diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 97b796c6a50b9712d35a7b77a328d78c089e4792..a008c4a0e5d09fbf3cecae9ae4ae03ab5e1600ab 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -1330,13 +1330,15 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt TSKEY maxKey, int maxRows, int8_t update) { TSKEY key1 = INT64_MAX; TSKEY key2 = INT64_MAX; + TSKEY lastKey = TSKEY_INITIAL_VAL; STSchema *pSchema = NULL; ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey); tdResetDataCols(pTarget); pTarget->bitmapMode = pDataCols->bitmapMode; - + // TODO: filter Multi-Version + // TODO: support delete function while (true) { key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); STSRow *row = tsdbNextIterRow(pCommitIter->pIter); @@ -1349,6 +1351,9 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt if (key1 == INT64_MAX && key2 == INT64_MAX) break; if (key1 < key2) { + if (lastKey != TSKEY_INITIAL_VAL) { + ++pTarget->numOfRows; + } for (int i = 0; i < pDataCols->numOfCols; ++i) { // TODO: dataColAppendVal may fail SCellVal sVal = {0}; @@ -1356,10 +1361,10 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt TASSERT(0); } tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints, - pTarget->bitmapMode); + pTarget->bitmapMode, false); } - ++pTarget->numOfRows; + lastKey = key1; ++(*iter); } else if (key1 > key2) { if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) { @@ -1367,7 +1372,17 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ASSERT(pSchema != NULL); } - tdAppendSTSRowToDataCol(row, pSchema, pTarget); + if (key2 == lastKey) { + if (TD_SUPPORT_UPDATE(update)) { + tdAppendSTSRowToDataCol(row, pSchema, pTarget, true); + } + } else { + if (lastKey != TSKEY_INITIAL_VAL) { + ++pTarget->numOfRows; + } + tdAppendSTSRowToDataCol(row, pSchema, pTarget, false); + lastKey = key2; + } tSkipListIterNext(pCommitIter->pIter); } else { @@ -1397,6 +1412,12 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ++(*iter); tSkipListIterNext(pCommitIter->pIter); #endif + + if(lastKey != key1) { + lastKey = key1; + ++pTarget->numOfRows; + } + // copy disk data for (int i = 0; i < pDataCols->numOfCols; ++i) { SCellVal sVal = {0}; @@ -1405,7 +1426,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt } // TODO: tdAppendValToDataCol may fail tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints, - pTarget->bitmapMode); + pTarget->bitmapMode, false); } if (TD_SUPPORT_UPDATE(update)) { @@ -1416,26 +1437,17 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt } // TODO: merge with Multi-Version - STSRow *curRow = row; - - ++(*iter); - tSkipListIterNext(pCommitIter->pIter); - STSRow *nextRow = tsdbNextIterRow(pCommitIter->pIter); - - if (key2 < TD_ROW_KEY(nextRow)) { - tdAppendSTSRowToDataCol(row, pSchema, pTarget); - } else { - tdAppendSTSRowToDataCol(row, pSchema, pTarget); - } - // TODO: merge with Multi-Version - } else { - ++pTarget->numOfRows; - ++(*iter); - tSkipListIterNext(pCommitIter->pIter); + tdAppendSTSRowToDataCol(row, pSchema, pTarget, true); } + ++(*iter); + tSkipListIterNext(pCommitIter->pIter); } - if (pTarget->numOfRows >= maxRows) break; + if (pTarget->numOfRows >= (maxRows - 1)) break; + } + + if (lastKey != TSKEY_INITIAL_VAL) { + ++pTarget->numOfRows; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index b52a34cfb2f9bf9044700ae4c6d3c102e44e5de0..48d32c0dd3daf5a084c7205c603da479391c03e8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -20,7 +20,7 @@ static void tsdbFreeTbData(STbData *pTbData); static char *tsdbGetTsTupleKey(const void *data); static int tsdbTbDataComp(const void *arg1, const void *arg2); static char *tsdbTbDataGetUid(const void *arg); -static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row); +static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, bool merge); int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) { STsdbMemTable *pMemTable; @@ -82,14 +82,19 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) { ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0); if (pIter == NULL) return 0; - STSchema *pSchema = NULL; - TSKEY rowKey = 0; - TSKEY fKey = 0; + STSchema *pSchema = NULL; + TSKEY rowKey = 0; + TSKEY fKey = 0; + // only fetch lastKey from mem data as file data not used in this function actually + TSKEY lastKey = TSKEY_INITIAL_VAL; bool isRowDel = false; int filterIter = 0; STSRow *row = NULL; SMergeInfo mInfo; + // TODO: support Multi-Version(the rows with the same TS keys in memory can't be merged if its version refered by + // query handle) + if (pMergeInfo == NULL) pMergeInfo = &mInfo; memset(pMergeInfo, 0, sizeof(*pMergeInfo)); @@ -111,7 +116,8 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey } else { fKey = tdGetKey(filterKeys[filterIter]); } - + // 1. fkey - no dup since merged up to maxVersion of each query handle by tsdbLoadBlockDataCols + // 2. rowKey - would dup since Multi-Version supported while (true) { if (fKey == INT64_MAX && rowKey == INT64_MAX) break; @@ -125,12 +131,14 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey } else { fKey = tdGetKey(filterKeys[filterIter]); } +#if 0 } else if (fKey > rowKey) { if (isRowDel) { pMergeInfo->rowsDeleteFailed++; } else { if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break; if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; + pMergeInfo->rowsInserted++; pMergeInfo->nOperations++; pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey); @@ -185,6 +193,94 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey fKey = tdGetKey(filterKeys[filterIter]); } } +#endif +#if 1 + } else if (fKey > rowKey) { + if (isRowDel) { + // TODO: support delete function + pMergeInfo->rowsDeleteFailed++; + } else { + if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break; + if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; + + if (lastKey != rowKey) { + pMergeInfo->rowsInserted++; + pMergeInfo->nOperations++; + pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey); + pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey); + if (pCols) { + if (lastKey != TSKEY_INITIAL_VAL) { + ++pCols->numOfRows; + } + tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false); + } + lastKey = rowKey; + } else { + if (keepDup) { + tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true); + } else { + // discard + } + } + } + + tSkipListIterNext(pIter); + row = tsdbNextIterRow(pIter); + if (row == NULL || TD_ROW_KEY(row) > maxKey) { + rowKey = INT64_MAX; + isRowDel = false; + } else { + rowKey = TD_ROW_KEY(row); + isRowDel = TD_ROW_IS_DELETED(row); + } + } else { // fkey == rowKey + if (isRowDel) { // TODO: support delete function(How to stands for delete in file? rowVersion = -1?) + ASSERT(!keepDup); + if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; + pMergeInfo->rowsDeleteSucceed++; + pMergeInfo->nOperations++; + tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false); + } else { + if (keepDup) { + if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; + if (lastKey != rowKey) { + pMergeInfo->rowsUpdated++; + pMergeInfo->nOperations++; + pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey); + pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey); + lastKey = rowKey; + ++pCols->numOfRows; + tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false); + } else { + tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true); + } + } else { + pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey); + pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey); + } + } + + tSkipListIterNext(pIter); + row = tsdbNextIterRow(pIter); + if (row == NULL || TD_ROW_KEY(row) > maxKey) { + rowKey = INT64_MAX; + isRowDel = false; + } else { + rowKey = TD_ROW_KEY(row); + isRowDel = TD_ROW_IS_DELETED(row); + } + + filterIter++; + if (filterIter >= nFilterKeys) { + fKey = INT64_MAX; + } else { + fKey = tdGetKey(filterKeys[filterIter]); + } + } +#endif + } + if (lastKey != TSKEY_INITIAL_VAL) { + ++pCols->numOfRows; } return 0; @@ -254,9 +350,12 @@ static STbData *tsdbNewTbData(tb_uid_t uid) { pTbData->keyMin = TSKEY_MAX; pTbData->keyMax = TSKEY_MIN; pTbData->nrows = 0; - +#if 0 pTbData->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_DISCARD_DUP_KEY, tsdbGetTsTupleKey); +#endif + pTbData->pData = + tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_ALLOW_DUP_KEY, tsdbGetTsTupleKey); if (pTbData->pData == NULL) { taosMemoryFree(pTbData); return NULL; @@ -291,7 +390,7 @@ static char *tsdbTbDataGetUid(const void *arg) { STbData *pTbData = (STbData *)arg; return (char *)(&(pTbData->uid)); } -static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row) { +static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, bool merge) { if (pCols) { if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) { *ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, TD_ROW_SVER(row)); @@ -301,7 +400,7 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema * } } - tdAppendSTSRowToDataCol(row, *ppSchema, pCols); + tdAppendSTSRowToDataCol(row, *ppSchema, pCols, merge); } return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 0da18ebdeb36630e59e8c0f4e0176c092b5987b2..aa305dd71c4de04258299de5ddf25f2805c598a6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -163,7 +163,7 @@ static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2); // static void* destroyTableCheckInfo(SArray* pTableCheckInfo); static bool tsdbGetExternalRow(tsdbReaderT pHandle); -static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions); +static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, TSKEY winSKey, SRetention* retentions); static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { pBlockLoadInfo->slot = -1; @@ -351,36 +351,43 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableData pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr); } } -#if 0 -int nQUERY = 0; +#if 1 +int nQUERY = 0; #endif -static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions) { +static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, TSKEY winSKey, SRetention* retentions) { if (vnodeIsRollup(pVnode)) { int level = 0; -#if 1 +#if 0 int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision); for (int i = 0; i < TSDB_RETENTION_MAX; ++i) { SRetention* pRetention = retentions + i; if (pRetention->keep <= 0 || (now - pRetention->keep) >= winSKey) { break; } + ++level; } #endif -#if 0 - ++nQUERY; - if(nQUERY%3 == 0) { - level = 2; - } else if(nQUERY%2 == 0) { - level = 1; - } else { - level = 0; +#if 1 + switch ((nQUERY++) % 3) { + case 0: + level = 0; + break; + case 1: + level = 1; + break; + default: + level = 2; + break; } #endif if (level == TSDB_RETENTION_L0) { + tsdbDebug("%p rsma level %d is selected to query\n", pReadHandle, level); return VND_RSMA0(pVnode); } else if (level == TSDB_RETENTION_L1) { + tsdbDebug("%p rsma level %d is selected to query\n", pReadHandle, level); return VND_RSMA1(pVnode); } else { + tsdbDebug("%p rsma level %d is selected to query\n", pReadHandle, level); return VND_RSMA2(pVnode); } } @@ -393,7 +400,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* goto _end; } - STsdb* pTsdb = getTsdbByRetentions(pVnode, pCond->twindow.skey, pVnode->config.tsdbCfg.retentions); + STsdb* pTsdb = getTsdbByRetentions(pVnode, pReadHandle, pCond->twindow.skey, pVnode->config.tsdbCfg.retentions); pReadHandle->order = pCond->order; pReadHandle->pTsdb = pTsdb; @@ -803,12 +810,16 @@ static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) { tSkipListDestroyIter(pCheckInfo->iiter); } -static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, int32_t update) { +static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT maxVer) { STSRow *rmem = NULL, *rimem = NULL; if (pCheckInfo->iter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); if (node != NULL) { rmem = (STSRow*)SL_GET_NODE_DATA(node); + // TODO: filter max version + // if (TD_ROW_VER(rmem) > maxVer) { + // rmem = NULL; + // } } } @@ -816,6 +827,10 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); if (node != NULL) { rimem = (STSRow*)SL_GET_NODE_DATA(node); + // TODO: filter max version + // if (TD_ROW_VER(rimem) > maxVer) { + // rimem = NULL; + // } } } @@ -837,6 +852,7 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, TSKEY r2 = TD_ROW_KEY(rimem); if (r1 == r2) { +#if 0 if (update == TD_ROW_DISCARD_UPDATE) { pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; tSkipListIterNext(pCheckInfo->iter); @@ -846,6 +862,13 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, } else { pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH; } +#endif + if (TD_SUPPORT_UPDATE(update)) { + pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH; + } else { + pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; + tSkipListIterNext(pCheckInfo->iter); + } return r1; } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) { pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM; @@ -856,12 +879,18 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, } } -static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow) { + +static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow, TDRowVerT maxVer) { STSRow *rmem = NULL, *rimem = NULL; if (pCheckInfo->iter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); if (node != NULL) { rmem = (STSRow*)SL_GET_NODE_DATA(node); +#if 0 // TODO: skiplist refactor + if (TD_ROW_VER(rmem) > maxVer) { + rmem = NULL; + } +#endif } } @@ -869,6 +898,11 @@ static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); if (node != NULL) { rimem = (STSRow*)SL_GET_NODE_DATA(node); +#if 0 // TODO: skiplist refactor + if (TD_ROW_VER(rimem) > maxVer) { + rimem = NULL; + } +#endif } } @@ -890,6 +924,7 @@ static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int TSKEY r2 = TD_ROW_KEY(rimem); if (r1 == r2) { +#if 0 if (update == TD_ROW_DISCARD_UPDATE) { tSkipListIterNext(pCheckInfo->iter); pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; @@ -903,6 +938,16 @@ static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int *extraRow = rimem; return rmem; } +#endif + if (TD_SUPPORT_UPDATE(update)) { + pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH; + *extraRow = rimem; + return rmem; + } else { + tSkipListIterNext(pCheckInfo->iter); + pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; + return rimem; + } } else { if (ASCENDING_TRAVERSE(order)) { if (r1 < r2) { @@ -973,7 +1018,7 @@ static bool hasMoreDataInCache(STsdbReadHandle* pHandle) { initTableMemIterator(pHandle, pCheckInfo); } - STSRow* row = getSRowInTableMem(pCheckInfo, pHandle->order, pCfg->update, NULL); + STSRow* row = getSRowInTableMem(pCheckInfo, pHandle->order, pCfg->update, NULL, TD_VER_MAX); if (row == NULL) { return false; } @@ -1250,7 +1295,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* /*bool hasData = */ initTableMemIterator(pTsdbReadHandle, pCheckInfo); assert(cur->pos >= 0 && cur->pos <= binfo.rows); - key = extractFirstTraverseKey(pCheckInfo, pTsdbReadHandle->order, pCfg->update); + key = extractFirstTraverseKey(pCheckInfo, pTsdbReadHandle->order, pCfg->update, TD_VER_MAX); if (key != TSKEY_INITIAL_VAL) { tsdbDebug("%p key in mem:%" PRId64 ", %s", pTsdbReadHandle, key, pTsdbReadHandle->idStr); @@ -1497,10 +1542,10 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t TASSERT(0); } - if (sVal.valType == TD_VTYPE_NULL) { - colDataAppendNULL(pColInfo, rowIndex); - } else { + if (sVal.valType == TD_VTYPE_NORM) { colDataAppend(pColInfo, rowIndex, sVal.val, false); + } else { + colDataAppendNULL(pColInfo, rowIndex); } } } else { // handle the var-string @@ -1513,10 +1558,10 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t TASSERT(0); } - if (sVal.valType == TD_VTYPE_NULL) { - colDataAppendNULL(pColInfo, rowIndex); - } else { + if (sVal.valType == TD_VTYPE_NORM) { colDataAppend(pColInfo, rowIndex, sVal.val, false); + } else { + colDataAppendNULL(pColInfo, rowIndex); } } } @@ -1541,11 +1586,26 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t return numOfRows + num; } -// TODO fix bug for reverse copy data problem -// Note: row1 always has high priority -static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, STSRow* row1, - STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2, - bool forceSetNull) { +/** + * @brief // TODO fix bug for reverse copy data problem + * Note: row1 always has high priority + * + * @param pTsdbReadHandle + * @param capacity + * @param curRow + * @param row1 + * @param row2 + * @param numOfCols + * @param uid + * @param pSchema1 + * @param pSchema2 + * @param update + * @param lastRowKey + * @return int32_t The quantity of rows appended + */ +static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t* curRow, STSRow* row1, + STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2, + bool update, TSKEY* lastRowKey) { #if 1 STSchema* pSchema; STSRow* row; @@ -1557,12 +1617,17 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit bool isChosenRowDataRow; int32_t chosen_itr; SCellVal sVal = {0}; + TSKEY rowKey = TSKEY_INITIAL_VAL; + int32_t nResult = 0; + bool isMerge = true; // the schema version info is embeded in STSRow int32_t numOfColsOfRow1 = 0; if (pSchema1 == NULL) { - pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1)); + // pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1)); + // TODO: use the real schemaVersion + pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 0); } #ifdef TD_DEBUG_PRINT_ROW @@ -1579,7 +1644,9 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit if (row2) { isRow2DataRow = TD_IS_TP_ROW(row2); if (pSchema2 == NULL) { - pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2)); + // pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2)); + // TODO: use the real schemaVersion + pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 0); } if (isRow2DataRow) { numOfColsOfRow2 = schemaNCols(pSchema2); @@ -1610,19 +1677,19 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit colIdOfRow2 = tdKvRowColIdAt(row2, k); } - if (colIdOfRow1 == colIdOfRow2) { + if (colIdOfRow1 < colIdOfRow2) { // the most probability if (colIdOfRow1 < pColInfo->info.colId) { - j++; - k++; + ++j; continue; } row = row1; pSchema = pSchema1; isChosenRowDataRow = isRow1DataRow; chosen_itr = j; - } else if (colIdOfRow1 < colIdOfRow2) { + } else if (colIdOfRow1 == colIdOfRow2) { if (colIdOfRow1 < pColInfo->info.colId) { - j++; + ++j; + ++k; continue; } row = row1; @@ -1631,7 +1698,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit chosen_itr = j; } else { if (colIdOfRow2 < pColInfo->info.colId) { - k++; + ++k; continue; } row = row2; @@ -1639,16 +1706,37 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit chosen_itr = k; isChosenRowDataRow = isRow2DataRow; } + if (isChosenRowDataRow) { colId = pSchema->columns[chosen_itr].colId; offset = pSchema->columns[chosen_itr].offset; // TODO: use STSRowIter tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr - 1, &sVal); + if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + rowKey = *(TSKEY*)sVal.val; + if (rowKey != *lastRowKey) { + isMerge = false; + if (*lastRowKey != TSKEY_INITIAL_VAL) { + ++(*curRow); + } + ++nResult; + } + *lastRowKey = rowKey; + } } else { // TODO: use STSRowIter if (chosen_itr == 0) { colId = PRIMARYKEY_TIMESTAMP_COL_ID; tdSKvRowGetVal(row, PRIMARYKEY_TIMESTAMP_COL_ID, -1, -1, &sVal); + rowKey = *(TSKEY*)sVal.val; + if (rowKey != *lastRowKey) { + isMerge = false; + if (*lastRowKey != TSKEY_INITIAL_VAL) { + ++(*curRow); + } + ++nResult; + } + *lastRowKey = rowKey; } else { SKvRowIdx* pColIdx = tdKvRowColIdxAt(row, chosen_itr - 1); colId = pColIdx->colId; @@ -1657,35 +1745,46 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit } } + ASSERT(rowKey != TSKEY_INITIAL_VAL); + if (colId == pColInfo->info.colId) { if (tdValTypeIsNorm(sVal.valType)) { - colDataAppend(pColInfo, numOfRows, sVal.val, false); - } else if (forceSetNull) { - colDataAppend(pColInfo, numOfRows, NULL, true); + colDataAppend(pColInfo, *curRow, sVal.val, false); + } else if (tdValTypeIsNull(sVal.valType)) { + colDataAppend(pColInfo, *curRow, NULL, true); + } else if (tdValTypeIsNone(sVal.valType)) { + // TODO: Set null if nothing append for this row + if (!isMerge) { + colDataAppend(pColInfo, *curRow, NULL, true); + } + } else { + ASSERT(0); } - i++; + ++i; if (row == row1) { - j++; + ++j; } else { - k++; + ++k; } } else { - if (forceSetNull) { - colDataAppend(pColInfo, numOfRows, NULL, true); + if (!isMerge) { + colDataAppend(pColInfo, *curRow, NULL, true); } - i++; + ++i; } } - if (forceSetNull) { + if (*lastRowKey != rowKey) { while (i < numOfCols) { // the remain columns are all null data SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); - colDataAppend(pColInfo, numOfRows, NULL, true); - i++; + colDataAppend(pColInfo, *curRow, NULL, true); + ++i; } } + + return nResult; #endif } @@ -1859,6 +1958,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf // compared with the data from in-memory buffer, to generate the correct timestamp array list int32_t numOfRows = 0; + int32_t curRow = 0; int16_t rv1 = -1; int16_t rv2 = -1; @@ -1874,9 +1974,11 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf return; } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { SSkipListNode* node = NULL; + TSKEY lastRowKey = TSKEY_INITIAL_VAL; + do { STSRow* row2 = NULL; - STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2); + STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2, TD_VER_MAX); if (row1 == NULL) { break; } @@ -1905,9 +2007,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf rv2 = TD_ROW_SVER(row2); } - mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, - pCheckInfo->tableId, pSchema1, pSchema2, true); - numOfRows += 1; + numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, + pCheckInfo->tableId, pSchema1, pSchema2, true, &lastRowKey); + // numOfRows += 1; if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; } @@ -1918,6 +2020,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf moveToNextRowInMem(pCheckInfo); } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it +#if 0 if (pCfg->update) { if (pCfg->update == TD_ROW_PARTIAL_UPDATE) { doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, pos, pos); @@ -1933,7 +2036,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE; mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, - pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull); + pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull, &lastRowKey); numOfRows += 1; if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; @@ -1943,6 +2046,35 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf cur->lastKey = key + step; cur->mixBlock = true; + moveToNextRowInMem(pCheckInfo); + pos += step; + } else { + moveToNextRowInMem(pCheckInfo); + } +#endif + if (TD_SUPPORT_UPDATE(pCfg->update)) { + doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos); + + if (rv1 != TD_ROW_SVER(row1)) { + // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); + rv1 = TD_ROW_SVER(row1); + } + if (row2 && rv2 != TD_ROW_SVER(row2)) { + // pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); + rv2 = TD_ROW_SVER(row2); + } + + numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, + pCheckInfo->tableId, pSchema1, pSchema2, true, &lastRowKey); + // ++numOfRows; + if (cur->win.skey == TSKEY_INITIAL_VAL) { + cur->win.skey = key; + } + + cur->win.ekey = key; + cur->lastKey = key + step; + cur->mixBlock = true; + moveToNextRowInMem(pCheckInfo); pos += step; } else { @@ -1958,11 +2090,18 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf assert(end != -1); if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it +#if 0 if (pCfg->update == TD_ROW_DISCARD_UPDATE) { moveToNextRowInMem(pCheckInfo); } else { end -= step; } +#endif + if (!TD_SUPPORT_UPDATE(pCfg->update)) { + moveToNextRowInMem(pCheckInfo); + } else { + end -= step; + } } int32_t qstart = 0, qend = 0; @@ -2572,6 +2711,7 @@ static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReaderT pHandle) { static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, STsdbReadHandle* pTsdbReadHandle) { int numOfRows = 0; + int curRows = 0; int32_t numOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns); STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb); win->skey = TSKEY_INITIAL_VAL; @@ -2579,9 +2719,11 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int int64_t st = taosGetTimestampUs(); int16_t rv = -1; STSchema* pSchema = NULL; + TSKEY lastRowKey = TSKEY_INITIAL_VAL; + do { - STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL); + STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL, TD_VER_MAX); if (row == NULL) { break; } @@ -2604,16 +2746,18 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0); rv = TD_ROW_SVER(row); } - mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema, - NULL, true); + numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema, + NULL, true, &lastRowKey); - if (++numOfRows >= maxRowsToRead) { + if (numOfRows >= maxRowsToRead) { moveToNextRowInMem(pCheckInfo); break; } } while (moveToNextRowInMem(pCheckInfo)); + taosMemoryFreeClear(pSchema); // free the STSChema + assert(numOfRows <= maxRowsToRead); // if the buffer is not full in case of descending order query, move the data in the front of the buffer @@ -2731,6 +2875,8 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) { STSRow* pRow = NULL; TSKEY key = TSKEY_INITIAL_VAL; int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; + TSKEY lastRowKey = TSKEY_INITIAL_VAL; + int32_t curRow = 0; if (++pTsdbReadHandle->activeIndex < numOfTables) { STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex); @@ -2738,8 +2884,8 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) { // if (ret != TSDB_CODE_SUCCESS) { // return false; // } - mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId, - NULL, NULL, true); + mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols, pCheckInfo->tableId, + NULL, NULL, true, &lastRowKey); taosMemoryFreeClear(pRow); // update the last key value diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c index d7c9a70c183786cce37129cb714d36186eacdf29..49d02adc3ae0520d140dcb4b033c46c725b348d2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c @@ -246,6 +246,12 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) { return 0; } +static FORCE_INLINE void tsdbSwapDataCols(SDataCols *pDest, SDataCols *pSrc) { + SDataCol *pCols = pDest->cols; + memcpy(pDest, pSrc, sizeof(SDataCols)); + pSrc->cols = pCols; +} + int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { ASSERT(pBlock->numOfSubBlocks > 0); STsdbCfg *pCfg = REPO_CFG(pReadh->pRepo); @@ -266,17 +272,29 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1; // TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, - update != TD_ROW_PARTIAL_UPDATE, UINT64_MAX) < 0) + TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0) + return -1; + } + // if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line + if (pBlock->numOfSubBlocks == 1) { + tdResetDataCols(pReadh->pDCols[1]); + pReadh->pDCols[1]->bitmapMode = pReadh->pDCols[0]->bitmapMode; + if (tdMergeDataCols(pReadh->pDCols[1], pReadh->pDCols[0], pReadh->pDCols[0]->numOfRows, NULL, + TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0) { return -1; + } + tsdbSwapDataCols(pReadh->pDCols[0], pReadh->pDCols[1]); + ASSERT(pReadh->pDCols[0]->bitmapMode != 0); } - ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows); + ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows); ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->keyFirst); ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->keyLast); return 0; } +// TODO: filter by Multi-Version int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds, bool mergeBitmap) { ASSERT(pBlock->numOfSubBlocks > 0); @@ -297,9 +315,21 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1; // TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, - update != TD_ROW_PARTIAL_UPDATE, UINT64_MAX) < 0) + TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0) return -1; } + // if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line + if (pBlock->numOfSubBlocks == 1) { + tdResetDataCols(pReadh->pDCols[1]); + pReadh->pDCols[1]->bitmapMode = pReadh->pDCols[0]->bitmapMode; + if (tdMergeDataCols(pReadh->pDCols[1], pReadh->pDCols[0], pReadh->pDCols[0]->numOfRows, NULL, + TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0) { + return -1; + } + tsdbSwapDataCols(pReadh->pDCols[0], pReadh->pDCols[1]); + ASSERT(pReadh->pDCols[0]->bitmapMode != 0); + } + if (mergeBitmap && !tdDataColsIsBitmapI(pReadh->pDCols[0])) { for (int i = 0; i < numOfColsIds; ++i) { @@ -312,7 +342,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, } } - ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows); + ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows); ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->keyFirst); ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->keyLast); @@ -623,15 +653,15 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat } if (dcol != 0) { - ccol++; + ++ccol; } - dcol++; + ++dcol; } else if (tcolId < pDataCol->colId) { - ccol++; + ++ccol; } else { // Set current column as NULL and forward dataColReset(pDataCol); - dcol++; + ++dcol; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 480b1319146c7ff6f69608374f9092a6477b8605..105bc330d399608cf152c54ede84e32491386201 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -1635,17 +1635,20 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) { SSmaCfg vCreateSmaReq = {0}; if (!tDeserializeSVCreateTSmaReq(pMsg, &vCreateSmaReq)) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tsdbWarn("vgId:%d TDMT_VND_CREATE_SMA received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno)); + tsdbWarn("vgId:%d tsma create msg received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno)); return -1; } - tsdbDebug("vgId:%d TDMT_VND_CREATE_SMA msg received for %s:%" PRIi64, REPO_ID(pTsdb), vCreateSmaReq.tSma.indexName, - vCreateSmaReq.tSma.indexUid); + + tsdbDebug("vgId:%d tsma create msg %s:%" PRIi64 " for table %" PRIi64 " received", REPO_ID(pTsdb), + vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid); // record current timezone of server side vCreateSmaReq.tSma.timezoneInt = tsTimezone; if (metaCreateTSma(REPO_META(pTsdb), &vCreateSmaReq) < 0) { // TODO: handle error + tsdbWarn("vgId:%d tsma %s:%" PRIi64 " create failed for table %" PRIi64 " since %s", REPO_ID(pTsdb), + vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid, terrstr(terrno)); tdDestroyTSma(&vCreateSmaReq.tSma); return -1; } diff --git a/tests/script/tsim/query/scalarFunction.sim b/tests/script/tsim/query/scalarFunction.sim index 80d2dafa84bf767d7caa8e4a4963d42f0a39f8ed..2946a89ff654f9744b4cfe0f89240b542623ea94 100644 --- a/tests/script/tsim/query/scalarFunction.sim +++ b/tests/script/tsim/query/scalarFunction.sim @@ -102,20 +102,26 @@ print ====> $data60 $data61 $data62 $data63 $data64 $data65 print ====> $data70 $data71 $data72 $data73 $data74 $data75 print ====> $data80 $data81 $data82 $data83 $data84 $data85 print ====> $data90 $data91 $data92 $data93 $data94 $data95 +print ====> rows = $rows and rowNum = $rowNum for ct1 if $rows != $rowNum then return -1 endi + print ====> select c1, abs(c1), c2, abs(c2), c3, abs(c3) from stb sql select c1, abs(c1), c2, abs(c2), c3, abs(c3) from stb +print ====> rows = $rows and totalRows = $totalRows for stb if $rows != $totalRows then return -1 endi + print ====> select c1, abs(c1), c2, abs(c2), c3, abs(c3) from ntb sql select c1, abs(c1), c2, abs(c2), c3, abs(c3) from ntb +print ====> rows = $rows and rowNum = $rowNum for ntb if $rows != $rowNum then return -1 endi + print ====> log sql select c1, log(c1, 10), c2, log(c2, 10), c3, log(c3, 10) from ct1 print ====> select c1, log(c1, 10), c2, log(c2, 10), c3, log(c3, 10) from ct1