提交 b279db6c 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/TD-20631

...@@ -361,7 +361,7 @@ pipeline { ...@@ -361,7 +361,7 @@ pipeline {
} }
parallel { parallel {
stage('check docs') { stage('check docs') {
agent{label " worker03 || slave215 || slave217 || slave219 || Mac_catalina "} agent{label " slave1_47 || slave1_48 || slave1_49 || slave1_52 || worker03 || slave215 || slave217 || slave219 || Mac_catalina "}
steps { steps {
check_docs() check_docs()
} }
...@@ -407,7 +407,7 @@ pipeline { ...@@ -407,7 +407,7 @@ pipeline {
} }
} }
stage('linux test') { stage('linux test') {
agent{label " worker03 || slave215 || slave217 || slave219 "} agent{label " slave1_47 || slave1_48 || slave1_49 || slave1_52 || worker03 || slave215 || slave217 || slave219 "}
options { skipDefaultCheckout() } options { skipDefaultCheckout() }
when { when {
changeRequest() changeRequest()
......
...@@ -33,8 +33,8 @@ typedef struct STColumn STColumn; ...@@ -33,8 +33,8 @@ typedef struct STColumn STColumn;
typedef struct STSchema STSchema; typedef struct STSchema STSchema;
typedef struct SValue SValue; typedef struct SValue SValue;
typedef struct SColVal SColVal; typedef struct SColVal SColVal;
typedef struct STSRow2 STSRow2; typedef struct SRow SRow;
typedef struct STSRowBuilder STSRowBuilder; typedef struct SRowIter SRowIter;
typedef struct STagVal STagVal; typedef struct STagVal STagVal;
typedef struct STag STag; typedef struct STag STag;
typedef struct SColData SColData; typedef struct SColData SColData;
...@@ -68,13 +68,10 @@ struct SBuffer { ...@@ -68,13 +68,10 @@ struct SBuffer {
void tBufferDestroy(SBuffer *pBuffer); void tBufferDestroy(SBuffer *pBuffer);
int32_t tBufferInit(SBuffer *pBuffer, int64_t size); int32_t tBufferInit(SBuffer *pBuffer, int64_t size);
int32_t tBufferPut(SBuffer *pBuffer, const void *pData, int64_t nData); int32_t tBufferPut(SBuffer *pBuffer, const void *pData, int64_t nData);
int32_t tBufferReserve(SBuffer *pBuffer, int64_t nData, void **ppData);
// STSchema ================================ // STSchema ================================
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t nCols, STSchema **ppTSchema); void tDestroyTSchema(STSchema *pTSchema);
void tTSchemaDestroy(STSchema *pTSchema);
// SValue ================================
static FORCE_INLINE int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type);
// SColVal ================================ // SColVal ================================
#define CV_FLAG_VALUE ((int8_t)0x0) #define CV_FLAG_VALUE ((int8_t)0x0)
...@@ -89,26 +86,14 @@ static FORCE_INLINE int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type); ...@@ -89,26 +86,14 @@ static FORCE_INLINE int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type);
#define COL_VAL_IS_NULL(CV) ((CV)->flag == CV_FLAG_NULL) #define COL_VAL_IS_NULL(CV) ((CV)->flag == CV_FLAG_NULL)
#define COL_VAL_IS_VALUE(CV) ((CV)->flag == CV_FLAG_VALUE) #define COL_VAL_IS_VALUE(CV) ((CV)->flag == CV_FLAG_VALUE)
// STSRow2 ================================ // SRow ================================
#define TSROW_LEN(PROW, V) tGetI32v((uint8_t *)(PROW)->data, (V) ? &(V) : NULL) int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SBuffer *pBuffer);
#define TSROW_SVER(PROW, V) tGetI32v((PROW)->data + TSROW_LEN(PROW, NULL), (V) ? &(V) : NULL) void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow); // SRowIter ================================
int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow); int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter);
void tTSRowFree(STSRow2 *pRow); void tRowIterClose(SRowIter **ppIter);
void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); SColVal *tRowIterNext(SRowIter *pIter);
int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray);
int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow);
int32_t tGetTSRow(uint8_t *p, STSRow2 **ppRow);
// STSRowBuilder ================================
#define tsRowBuilderInit() ((STSRowBuilder){0})
#define tsRowBuilderClear(B) \
do { \
if ((B)->pBuf) { \
taosMemoryFree((B)->pBuf); \
} \
} while (0)
// STag ================================ // STag ================================
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag); int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);
...@@ -147,30 +132,18 @@ struct STSchema { ...@@ -147,30 +132,18 @@ struct STSchema {
int32_t numOfCols; int32_t numOfCols;
int32_t version; int32_t version;
int32_t flen; int32_t flen;
int32_t vlen;
int32_t tlen; int32_t tlen;
STColumn columns[]; STColumn columns[];
}; };
#define TSROW_HAS_NONE ((uint8_t)0x1) struct SRow {
#define TSROW_HAS_NULL ((uint8_t)0x2U) uint8_t flag;
#define TSROW_HAS_VAL ((uint8_t)0x4U) uint8_t rsv;
#define TSROW_KV_SMALL ((uint8_t)0x10U) uint16_t sver;
#define TSROW_KV_MID ((uint8_t)0x20U) uint32_t len;
#define TSROW_KV_BIG ((uint8_t)0x40U)
#pragma pack(push, 1)
struct STSRow2 {
TSKEY ts; TSKEY ts;
uint8_t flags;
uint8_t data[]; uint8_t data[];
}; };
#pragma pack(pop)
struct STSRowBuilder {
// STSRow2 tsRow;
int32_t szBuf;
uint8_t *pBuf;
};
struct SValue { struct SValue {
union { union {
...@@ -258,37 +231,17 @@ typedef struct { ...@@ -258,37 +231,17 @@ typedef struct {
int32_t nCols; int32_t nCols;
schema_ver_t version; schema_ver_t version;
uint16_t flen; uint16_t flen;
int32_t vlen;
int32_t tlen; int32_t tlen;
STColumn *columns; STColumn *columns;
} STSchemaBuilder; } STSchemaBuilder;
// use 2 bits for bitmap(default: STSRow/sub block)
#define TD_VTYPE_BITS 2
#define TD_VTYPE_PARTS 4 // PARTITIONS: 1 byte / 2 bits
#define TD_VTYPE_OPTR 3 // OPERATOR: 4 - 1, utilize to get remainder
#define TD_BITMAP_BYTES(cnt) (((cnt) + TD_VTYPE_OPTR) >> 2)
// use 1 bit for bitmap(super block)
#define TD_VTYPE_BITS_I 1
#define TD_VTYPE_PARTS_I 8 // PARTITIONS: 1 byte / 1 bit
#define TD_VTYPE_OPTR_I 7 // OPERATOR: 8 - 1, utilize to get remainder
#define TD_BITMAP_BYTES_I(cnt) (((cnt) + TD_VTYPE_OPTR_I) >> 3)
int32_t tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version); int32_t tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version);
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder); void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder);
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version); void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version);
int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t flags, col_id_t colId, col_bytes_t bytes); int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t flags, col_id_t colId, col_bytes_t bytes);
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder); STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
static FORCE_INLINE int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type) { STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version);
if (IS_VAR_DATA_TYPE(type)) {
return tGetBinary(p, &pValue->pData, pValue ? &pValue->nData : NULL);
} else {
memcpy(&pValue->val, p, tDataTypes[type].bytes);
return tDataTypes[type].bytes;
}
}
#endif #endif
......
...@@ -55,6 +55,14 @@ typedef struct STSRow { ...@@ -55,6 +55,14 @@ typedef struct STSRow {
#define TD_ROW_TP 0x0U // default #define TD_ROW_TP 0x0U // default
#define TD_ROW_KV 0x01U #define TD_ROW_KV 0x01U
#define TD_VTYPE_PARTS 4 // PARTITIONS: 1 byte / 2 bits
#define TD_VTYPE_OPTR 3 // OPERATOR: 4 - 1, utilize to get remainder
#define TD_BITMAP_BYTES(cnt) (((cnt) + TD_VTYPE_OPTR) >> 2)
#define TD_VTYPE_PARTS_I 8 // PARTITIONS: 1 byte / 1 bit
#define TD_VTYPE_OPTR_I 7 // OPERATOR: 8 - 1, utilize to get remainder
#define TD_BITMAP_BYTES_I(cnt) (((cnt) + TD_VTYPE_OPTR_I) >> 3)
/** /**
* @brief value type * @brief value type
* - for data from client input and STSRow in memory, 3 types of value none/null/norm available * - for data from client input and STSRow in memory, 3 types of value none/null/norm available
...@@ -244,7 +252,7 @@ int32_t tdGetBitmapValTypeI(const void *pBitmap, int16_t colIdx, TDRowValT *pVal ...@@ -244,7 +252,7 @@ int32_t tdGetBitmapValTypeI(const void *pBitmap, int16_t colIdx, TDRowValT *pVal
*/ */
static FORCE_INLINE void *tdGetBitmapAddrTp(STSRow *pRow, uint32_t flen) { static FORCE_INLINE void *tdGetBitmapAddrTp(STSRow *pRow, uint32_t flen) {
// The primary TS key is stored separatedly. // The primary TS key is stored separatedly.
return POINTER_SHIFT(TD_ROW_DATA(pRow), flen - sizeof(TSKEY)); return POINTER_SHIFT(TD_ROW_DATA(pRow), flen);
// return POINTER_SHIFT(pRow->ts, flen); // return POINTER_SHIFT(pRow->ts, flen);
} }
......
...@@ -41,6 +41,7 @@ extern "C" { ...@@ -41,6 +41,7 @@ extern "C" {
#define SNAPSHOT_WAIT_MS 1000 * 30 #define SNAPSHOT_WAIT_MS 1000 * 30
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000 #define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
#define SYNC_HEART_TIMEOUT_MS 1000 * 8
#define SYNC_MAX_BATCH_SIZE 1 #define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_BEGIN 0
......
...@@ -1275,6 +1275,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) ...@@ -1275,6 +1275,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
nVar++; nVar++;
} }
} }
fLen -= sizeof(TSKEY);
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
(int32_t)TD_BITMAP_BYTES(numOfCols - 1); (int32_t)TD_BITMAP_BYTES(numOfCols - 1);
...@@ -1333,8 +1334,10 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) ...@@ -1333,8 +1334,10 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
} }
} }
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
offset += TYPE_BYTES[pColumn->type]; offset += TYPE_BYTES[pColumn->type];
} }
}
tdSRowEnd(&rb); tdSRowEnd(&rb);
int32_t rowLen = TD_ROW_LEN(rowData); int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen); rowData = POINTER_SHIFT(rowData, rowLen);
...@@ -1503,6 +1506,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { ...@@ -1503,6 +1506,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
nVar++; nVar++;
} }
} }
fLen -= sizeof(TSKEY);
int32_t rows = rspObj.resInfo.numOfRows; int32_t rows = rspObj.resInfo.numOfRows;
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
...@@ -1585,9 +1589,10 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { ...@@ -1585,9 +1589,10 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k); tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k);
} }
} }
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
offset += TYPE_BYTES[pColumn->type]; offset += TYPE_BYTES[pColumn->type];
} }
}
tdSRowEnd(&rb); tdSRowEnd(&rb);
int32_t rowLen = TD_ROW_LEN(rowData); int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen); rowData = POINTER_SHIFT(rowData, rowLen);
...@@ -1803,6 +1808,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1803,6 +1808,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
nVar++; nVar++;
} }
} }
fLen -= sizeof(TSKEY);
int32_t rows = rspObj.resInfo.numOfRows; int32_t rows = rspObj.resInfo.numOfRows;
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
...@@ -1888,9 +1894,10 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1888,9 +1894,10 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k); tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k);
} }
} }
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
offset += TYPE_BYTES[pColumn->type]; offset += TYPE_BYTES[pColumn->type];
} }
}
tdSRowEnd(&rb); tdSRowEnd(&rb);
int32_t rowLen = TD_ROW_LEN(rowData); int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen); rowData = POINTER_SHIFT(rowData, rowLen);
......
...@@ -2052,6 +2052,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB ...@@ -2052,6 +2052,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB
isStartKey = true; isStartKey = true;
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true,
offset, k); offset, k);
continue; // offset should keep 0 for next column
} else if (colDataIsNull_s(pColInfoData, j)) { } else if (colDataIsNull_s(pColInfoData, j)) {
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NULL, NULL, tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NULL, NULL,
......
此差异已折叠。
...@@ -192,7 +192,7 @@ bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t fl ...@@ -192,7 +192,7 @@ bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t fl
return true; return true;
} }
void *pBitmap = tdGetBitmapAddrTp(pRow, flen); void *pBitmap = tdGetBitmapAddrTp(pRow, flen);
tdGetTpRowValOfCol(pVal, pRow, pBitmap, colType, offset - sizeof(TSKEY), colIdx); tdGetTpRowValOfCol(pVal, pRow, pBitmap, colType, offset, colIdx);
return true; return true;
} }
...@@ -217,7 +217,7 @@ bool tdSTSRowIterFetch(STSRowIter *pIter, col_id_t colId, col_type_t colType, SC ...@@ -217,7 +217,7 @@ bool tdSTSRowIterFetch(STSRowIter *pIter, col_id_t colId, col_type_t colType, SC
return false; return false;
} }
} }
tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset - sizeof(TSKEY), pVal); tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset, pVal);
++pIter->colIdx; ++pIter->colIdx;
} else if (TD_IS_KV_ROW(pIter->pRow)) { } else if (TD_IS_KV_ROW(pIter->pRow)) {
return tdSTSRowIterGetKvVal(pIter, colId, &pIter->kvIdx, pVal); return tdSTSRowIterGetKvVal(pIter, colId, &pIter->kvIdx, pVal);
...@@ -244,7 +244,7 @@ bool tdSTSRowIterNext(STSRowIter *pIter, SCellVal *pVal) { ...@@ -244,7 +244,7 @@ bool tdSTSRowIterNext(STSRowIter *pIter, SCellVal *pVal) {
} }
if (TD_IS_TP_ROW(pIter->pRow)) { if (TD_IS_TP_ROW(pIter->pRow)) {
tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset - sizeof(TSKEY), pVal); tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset, pVal);
} else if (TD_IS_KV_ROW(pIter->pRow)) { } else if (TD_IS_KV_ROW(pIter->pRow)) {
tdSTSRowIterGetKvVal(pIter, pCol->colId, &pIter->kvIdx, pVal); tdSTSRowIterGetKvVal(pIter, pCol->colId, &pIter->kvIdx, pVal);
} else { } else {
...@@ -469,7 +469,7 @@ bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCell ...@@ -469,7 +469,7 @@ bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCell
#ifdef TD_SUPPORT_BITMAP #ifdef TD_SUPPORT_BITMAP
colIdx = POINTER_DISTANCE(pCol, pSchema->columns) / sizeof(STColumn); colIdx = POINTER_DISTANCE(pCol, pSchema->columns) / sizeof(STColumn);
#endif #endif
tdGetTpRowValOfCol(pVal, pRow, pIter->pBitmap, pCol->type, pCol->offset - sizeof(TSKEY), colIdx - 1); tdGetTpRowValOfCol(pVal, pRow, pIter->pBitmap, pCol->type, pCol->offset, colIdx - 1);
} else if (TD_IS_KV_ROW(pRow)) { } else if (TD_IS_KV_ROW(pRow)) {
SKvRowIdx *pIdx = (SKvRowIdx *)taosbsearch(&colId, TD_ROW_COL_IDX(pRow), tdRowGetNCols(pRow), sizeof(SKvRowIdx), SKvRowIdx *pIdx = (SKvRowIdx *)taosbsearch(&colId, TD_ROW_COL_IDX(pRow), tdRowGetNCols(pRow), sizeof(SKvRowIdx),
compareKvRowColId, TD_EQ); compareKvRowColId, TD_EQ);
...@@ -757,11 +757,10 @@ int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowValT valType, const vo ...@@ -757,11 +757,10 @@ int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowValT valType, const vo
int32_t tdAppendColValToTpRow(SRowBuilder *pBuilder, TDRowValT valType, const void *val, bool isCopyVarData, int32_t tdAppendColValToTpRow(SRowBuilder *pBuilder, TDRowValT valType, const void *val, bool isCopyVarData,
int8_t colType, int16_t colIdx, int32_t offset) { int8_t colType, int16_t colIdx, int32_t offset) {
if ((offset < (int32_t)sizeof(TSKEY)) || (colIdx < 1)) { if (colIdx < 1) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
offset -= sizeof(TSKEY);
--colIdx; --colIdx;
#ifdef TD_SUPPORT_BITMAP #ifdef TD_SUPPORT_BITMAP
...@@ -853,7 +852,7 @@ int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) { ...@@ -853,7 +852,7 @@ int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
memset(pBuilder->pBitmap, TD_VTYPE_NONE_BYTE_II, pBuilder->nBitmaps); memset(pBuilder->pBitmap, TD_VTYPE_NONE_BYTE_II, pBuilder->nBitmaps);
#endif #endif
// the primary TS key is stored separatedly // the primary TS key is stored separatedly
len = TD_ROW_HEAD_LEN + pBuilder->flen - sizeof(TSKEY) + pBuilder->nBitmaps; len = TD_ROW_HEAD_LEN + pBuilder->flen + pBuilder->nBitmaps;
TD_ROW_SET_LEN(pBuilder->pBuf, len); TD_ROW_SET_LEN(pBuilder->pBuf, len);
TD_ROW_SET_SVER(pBuilder->pBuf, pBuilder->sver); TD_ROW_SET_SVER(pBuilder->pBuf, pBuilder->sver);
break; break;
......
...@@ -61,7 +61,7 @@ tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX] = { ...@@ -61,7 +61,7 @@ tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX] = {
static float floatMin = -FLT_MAX, floatMax = FLT_MAX; static float floatMin = -FLT_MAX, floatMax = FLT_MAX;
static double doubleMin = -DBL_MAX, doubleMax = DBL_MAX; static double doubleMin = -DBL_MAX, doubleMax = DBL_MAX;
FORCE_INLINE void *getDataMin(int32_t type, void* value) { FORCE_INLINE void *getDataMin(int32_t type, void *value) {
switch (type) { switch (type) {
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
*(float *)value = floatMin; *(float *)value = floatMin;
...@@ -77,7 +77,7 @@ FORCE_INLINE void *getDataMin(int32_t type, void* value) { ...@@ -77,7 +77,7 @@ FORCE_INLINE void *getDataMin(int32_t type, void* value) {
return value; return value;
} }
FORCE_INLINE void *getDataMax(int32_t type, void* value) { FORCE_INLINE void *getDataMax(int32_t type, void *value) {
switch (type) { switch (type) {
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
*(float *)value = floatMax; *(float *)value = floatMax;
......
...@@ -196,7 +196,7 @@ SArray *mmGetMsgHandles() { ...@@ -196,7 +196,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
code = 0; code = 0;
......
...@@ -464,7 +464,7 @@ SArray *vmGetMsgHandles() { ...@@ -464,7 +464,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
code = 0; code = 0;
......
...@@ -56,7 +56,7 @@ typedef struct SDataFWriter SDataFWriter; ...@@ -56,7 +56,7 @@ typedef struct SDataFWriter SDataFWriter;
typedef struct SDataFReader SDataFReader; typedef struct SDataFReader SDataFReader;
typedef struct SDelFWriter SDelFWriter; typedef struct SDelFWriter SDelFWriter;
typedef struct SDelFReader SDelFReader; typedef struct SDelFReader SDelFReader;
typedef struct SRowIter SRowIter; typedef struct STSDBRowIter STSDBRowIter;
typedef struct STsdbFS STsdbFS; typedef struct STsdbFS STsdbFS;
typedef struct SRowMerger SRowMerger; typedef struct SRowMerger SRowMerger;
typedef struct STsdbReadSnap STsdbReadSnap; typedef struct STsdbReadSnap STsdbReadSnap;
...@@ -111,9 +111,9 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) { ...@@ -111,9 +111,9 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) {
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
// int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); // int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
int32_t tsdbRowCmprFn(const void *p1, const void *p2); int32_t tsdbRowCmprFn(const void *p1, const void *p2);
// SRowIter // STSDBRowIter
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); void tsdbRowIterInit(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
SColVal *tRowIterNext(SRowIter *pIter); SColVal *tsdbRowIterNext(STSDBRowIter *pIter);
// SRowMerger // SRowMerger
int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema); int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
...@@ -562,7 +562,7 @@ struct SDFileSet { ...@@ -562,7 +562,7 @@ struct SDFileSet {
SSttFile *aSttF[TSDB_MAX_STT_TRIGGER]; SSttFile *aSttF[TSDB_MAX_STT_TRIGGER];
}; };
struct SRowIter { struct STSDBRowIter {
TSDBROW *pRow; TSDBROW *pRow;
STSchema *pTSchema; STSchema *pTSchema;
SColVal colVal; SColVal colVal;
......
...@@ -341,7 +341,7 @@ int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo ...@@ -341,7 +341,7 @@ int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo
pSkmInfo->suid = suid; pSkmInfo->suid = suid;
pSkmInfo->uid = uid; pSkmInfo->uid = uid;
tTSchemaDestroy(pSkmInfo->pTSchema); tDestroyTSchema(pSkmInfo->pTSchema);
code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema); code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -365,7 +365,7 @@ static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid ...@@ -365,7 +365,7 @@ static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid
pCommitter->skmRow.suid = suid; pCommitter->skmRow.suid = suid;
pCommitter->skmRow.uid = uid; pCommitter->skmRow.uid = uid;
tTSchemaDestroy(pCommitter->skmRow.pTSchema); tDestroyTSchema(pCommitter->skmRow.pTSchema);
code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema); code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -623,7 +623,8 @@ int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapDa ...@@ -623,7 +623,8 @@ int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapDa
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
tstrerror(code));
} }
return code; return code;
} }
...@@ -666,7 +667,8 @@ int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray ...@@ -666,7 +667,8 @@ int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
tstrerror(code));
} }
return code; return code;
} }
...@@ -706,7 +708,8 @@ static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilde ...@@ -706,7 +708,8 @@ static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilde
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
tstrerror(code));
} }
return code; return code;
} }
...@@ -919,8 +922,8 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) { ...@@ -919,8 +922,8 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) {
#else #else
tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1); tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1);
#endif #endif
tTSchemaDestroy(pCommitter->skmTable.pTSchema); tDestroyTSchema(pCommitter->skmTable.pTSchema);
tTSchemaDestroy(pCommitter->skmRow.pTSchema); tDestroyTSchema(pCommitter->skmRow.pTSchema);
} }
static int32_t tsdbCommitData(SCommitter *pCommitter) { static int32_t tsdbCommitData(SCommitter *pCommitter) {
......
...@@ -595,21 +595,21 @@ int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTS ...@@ -595,21 +595,21 @@ int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTS
if (pBuilder->bi.minKey > kRow.ts) pBuilder->bi.minKey = kRow.ts; if (pBuilder->bi.minKey > kRow.ts) pBuilder->bi.minKey = kRow.ts;
if (pBuilder->bi.maxKey < kRow.ts) pBuilder->bi.maxKey = kRow.ts; if (pBuilder->bi.maxKey < kRow.ts) pBuilder->bi.maxKey = kRow.ts;
SRowIter iter = {0}; STSDBRowIter iter = {0};
tRowIterInit(&iter, pRow, pTSchema); tsdbRowIterInit(&iter, pRow, pTSchema);
SColVal *pColVal = tRowIterNext(&iter); SColVal *pColVal = tsdbRowIterNext(&iter);
for (int32_t iBuilder = 0; iBuilder < pBuilder->nBuilder; iBuilder++) { for (int32_t iBuilder = 0; iBuilder < pBuilder->nBuilder; iBuilder++) {
SDiskColBuilder *pDCBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iBuilder); SDiskColBuilder *pDCBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iBuilder);
while (pColVal && pColVal->cid < pDCBuilder->cid) { while (pColVal && pColVal->cid < pDCBuilder->cid) {
pColVal = tRowIterNext(&iter); pColVal = tsdbRowIterNext(&iter);
} }
if (pColVal && pColVal->cid == pDCBuilder->cid) { if (pColVal && pColVal->cid == pDCBuilder->cid) {
code = tDiskColAddVal(pDCBuilder, pColVal); code = tDiskColAddVal(pDCBuilder, pColVal);
if (code) return code; if (code) return code;
pColVal = tRowIterNext(&iter); pColVal = tsdbRowIterNext(&iter);
} else { } else {
code = tDiskColAddVal(pDCBuilder, &COL_VAL_NONE(pDCBuilder->cid, pDCBuilder->type)); code = tDiskColAddVal(pDCBuilder, &COL_VAL_NONE(pDCBuilder->cid, pDCBuilder->type));
if (code) return code; if (code) return code;
......
...@@ -555,7 +555,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { ...@@ -555,7 +555,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
} }
tBlockDataDestroy(&pReader->bData, 1); tBlockDataDestroy(&pReader->bData, 1);
tTSchemaDestroy(pReader->skmTable.pTSchema); tDestroyTSchema(pReader->skmTable.pTSchema);
// del // del
if (pReader->pDelFReader) tsdbDelFReaderClose(&pReader->pDelFReader); if (pReader->pDelFReader) tsdbDelFReaderClose(&pReader->pDelFReader);
...@@ -1416,7 +1416,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { ...@@ -1416,7 +1416,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
taosArrayDestroy(pWriter->dReader.aBlockIdx); taosArrayDestroy(pWriter->dReader.aBlockIdx);
tBlockDataDestroy(&pWriter->bData, 1); tBlockDataDestroy(&pWriter->bData, 1);
tTSchemaDestroy(pWriter->skmTable.pTSchema); tDestroyTSchema(pWriter->skmTable.pTSchema);
for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) { for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) {
tFree(pWriter->aBuf[iBuf]); tFree(pWriter->aBuf[iBuf]);
......
...@@ -579,8 +579,8 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2) { ...@@ -579,8 +579,8 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2) {
return tsdbKeyCmprFn(&TSDBROW_KEY((TSDBROW *)p1), &TSDBROW_KEY((TSDBROW *)p2)); return tsdbKeyCmprFn(&TSDBROW_KEY((TSDBROW *)p1), &TSDBROW_KEY((TSDBROW *)p2));
} }
// SRowIter ====================================================== // STSDBRowIter ======================================================
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) { void tsdbRowIterInit(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) {
pIter->pRow = pRow; pIter->pRow = pRow;
if (pRow->type == 0) { if (pRow->type == 0) {
ASSERT(pTSchema); ASSERT(pTSchema);
...@@ -594,7 +594,7 @@ void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) { ...@@ -594,7 +594,7 @@ void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) {
} }
} }
SColVal *tRowIterNext(SRowIter *pIter) { SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
if (pIter->pRow->type == 0) { if (pIter->pRow->type == 0) {
if (pIter->i < pIter->pTSchema->numOfCols) { if (pIter->i < pIter->pTSchema->numOfCols) {
tTSRowGetVal(pIter->pRow->pTSRow, pIter->pTSchema, pIter->i, &pIter->colVal); tTSRowGetVal(pIter->pRow->pTSRow, pIter->pTSchema, pIter->i, &pIter->colVal);
...@@ -1084,11 +1084,11 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch ...@@ -1084,11 +1084,11 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch
cv.flag = CV_FLAG_VALUE; cv.flag = CV_FLAG_VALUE;
if (IS_VAR_DATA_TYPE(pTColumn->type)) { if (IS_VAR_DATA_TYPE(pTColumn->type)) {
void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset);
cv.value.nData = varDataLen(pData); cv.value.nData = varDataLen(pData);
cv.value.pData = varDataVal(pData); cv.value.pData = varDataVal(pData);
} else { } else {
memcpy(&cv.value.val, pRow->data + pTColumn->offset - sizeof(TSKEY), pTColumn->bytes); memcpy(&cv.value.val, pRow->data + pTColumn->offset, pTColumn->bytes);
} }
code = tColDataAppendValue(pColData, &cv); code = tColDataAppendValue(pColData, &cv);
...@@ -1106,11 +1106,11 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch ...@@ -1106,11 +1106,11 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch
cv.flag = CV_FLAG_VALUE; cv.flag = CV_FLAG_VALUE;
if (IS_VAR_DATA_TYPE(pTColumn->type)) { if (IS_VAR_DATA_TYPE(pTColumn->type)) {
void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset);
cv.value.nData = varDataLen(pData); cv.value.nData = varDataLen(pData);
cv.value.pData = varDataVal(pData); cv.value.pData = varDataVal(pData);
} else { } else {
memcpy(&cv.value.val, pRow->data + pTColumn->offset - sizeof(TSKEY), pTColumn->bytes); memcpy(&cv.value.val, pRow->data + pTColumn->offset, pTColumn->bytes);
} }
code = tColDataAppendValue(pColData, &cv); code = tColDataAppendValue(pColData, &cv);
......
...@@ -47,6 +47,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch ...@@ -47,6 +47,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
int32_t code = TSDB_CODE_SUCCESS;
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
if (NULL == tags) { if (NULL == tags) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
...@@ -59,10 +60,10 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch ...@@ -59,10 +60,10 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
SArray* tagName = taosArrayInit(8, TSDB_COL_NAME_LEN); SArray* tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
if (!tagName) { if (!tagName) {
return buildInvalidOperationMsg(&pBuf, "out of memory"); code = buildInvalidOperationMsg(&pBuf, "out of memory");
goto end;
} }
int32_t code = TSDB_CODE_SUCCESS;
SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
bool isJson = false; bool isJson = false;
...@@ -77,6 +78,10 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch ...@@ -77,6 +78,10 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
int32_t colLen = pTagSchema->bytes; int32_t colLen = pTagSchema->bytes;
if (IS_VAR_DATA_TYPE(pTagSchema->type)) { if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
colLen = bind[c].length[0]; colLen = bind[c].length[0];
if ((colLen + VARSTR_HEADER_SIZE) > pTagSchema->bytes) {
code = buildInvalidOperationMsg(&pBuf, "tag length is too big");
goto end;
}
} }
taosArrayPush(tagName, pTagSchema->name); taosArrayPush(tagName, pTagSchema->name);
if (pTagSchema->type == TSDB_DATA_TYPE_JSON) { if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
......
...@@ -139,8 +139,8 @@ void insSetBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_i ...@@ -139,8 +139,8 @@ void insSetBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_i
if (i > 0) { if (i > 0) {
pColList->cols[i].offset = pColList->cols[i - 1].offset + pSchema[i - 1].bytes; pColList->cols[i].offset = pColList->cols[i - 1].offset + pSchema[i - 1].bytes;
pColList->cols[i].toffset = pColList->flen; pColList->cols[i].toffset = pColList->flen;
}
pColList->flen += TYPE_BYTES[type]; pColList->flen += TYPE_BYTES[type];
}
switch (type) { switch (type) {
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
pColList->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES); pColList->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
......
...@@ -743,7 +743,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -743,7 +743,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} }
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
if (queryStop || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
// Note: query is not running anymore // Note: query is not running anymore
QW_SET_PHASE(ctx, 0); QW_SET_PHASE(ctx, 0);
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
......
...@@ -232,6 +232,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, S ...@@ -232,6 +232,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, S
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode); SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h); int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h);
bool syncNodeHeartbeatTimeout(SSyncNode* pSyncNode);
// raft state change -------------- // raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
......
...@@ -137,6 +137,7 @@ typedef struct SyncHeartbeatReply { ...@@ -137,6 +137,7 @@ typedef struct SyncHeartbeatReply {
SyncTerm term; SyncTerm term;
SyncTerm privateTerm; SyncTerm privateTerm;
int64_t startTime; int64_t startTime;
int64_t timeStamp;
} SyncHeartbeatReply; } SyncHeartbeatReply;
typedef struct SyncPreSnapshot { typedef struct SyncPreSnapshot {
......
...@@ -50,9 +50,10 @@ void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr) { ...@@ -50,9 +50,10 @@ void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr) {
memset(pSyncIndexMgr->privateTerm, 0, sizeof(pSyncIndexMgr->privateTerm)); memset(pSyncIndexMgr->privateTerm, 0, sizeof(pSyncIndexMgr->privateTerm));
// int64_t timeNow = taosGetMonotonicMs(); // int64_t timeNow = taosGetMonotonicMs();
int64_t timeNow = taosGetTimestampMs();
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
pSyncIndexMgr->startTimeArr[i] = 0; pSyncIndexMgr->startTimeArr[i] = 0;
pSyncIndexMgr->recvTimeArr[i] = 0; pSyncIndexMgr->recvTimeArr[i] = timeNow;
} }
/* /*
...@@ -147,7 +148,7 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRa ...@@ -147,7 +148,7 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRa
return recvTime; return recvTime;
} }
} }
ASSERT(0);
return -1; return -1;
} }
......
...@@ -639,6 +639,14 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { ...@@ -639,6 +639,14 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
return -1; return -1;
} }
// heartbeat timeout
if (syncNodeHeartbeatTimeout(pSyncNode)) {
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
return -1;
}
// optimized one replica // optimized one replica
if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) { if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
SyncIndex retIndex; SyncIndex retIndex;
...@@ -2086,6 +2094,29 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand ...@@ -2086,6 +2094,29 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
return code; return code;
} }
bool syncNodeHeartbeatTimeout(SSyncNode* pSyncNode) {
if (pSyncNode->replicaNum == 1) {
return false;
}
int32_t toCount = 0;
int64_t tsNow = taosGetTimestampMs();
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
if (recvTime == 0 || recvTime == -1) {
continue;
}
if (tsNow - recvTime > SYNC_HEART_TIMEOUT_MS) {
toCount++;
}
}
bool b = (toCount >= pSyncNode->quorum ? true : false);
return b;
}
static int32_t syncNodeAppendNoop(SSyncNode* ths) { static int32_t syncNodeAppendNoop(SSyncNode* ths) {
int32_t ret = 0; int32_t ret = 0;
...@@ -2127,6 +2158,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2127,6 +2158,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pMsgReply->srcId = ths->myRaftId; pMsgReply->srcId = ths->myRaftId;
pMsgReply->term = ths->pRaftStore->currentTerm; pMsgReply->term = ths->pRaftStore->currentTerm;
pMsgReply->privateTerm = 8864; // magic number pMsgReply->privateTerm = 8864; // magic number
pMsgReply->timeStamp = taosGetTimestampMs();
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
syncNodeResetElectTimer(ths); syncNodeResetElectTimer(ths);
...@@ -2191,7 +2223,7 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2191,7 +2223,7 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncLogRecvHeartbeatReply(ths, pMsg, ""); syncLogRecvHeartbeatReply(ths, pMsg, "");
// update last reply time, make decision whether the other node is alive or not // update last reply time, make decision whether the other node is alive or not
syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->destId, pMsg->startTime); syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, pMsg->timeStamp);
return 0; return 0;
} }
......
...@@ -424,8 +424,8 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p ...@@ -424,8 +424,8 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port, sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->privateTerm, s); pMsg->term, pMsg->timeStamp, s);
} }
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册