未验证 提交 c0d6c718 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4060 from taosdata/feature/update

Feature/update
...@@ -5311,6 +5311,7 @@ static void setCreateDBOption(SCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) { ...@@ -5311,6 +5311,7 @@ static void setCreateDBOption(SCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) {
pMsg->replications = pCreateDb->replica; pMsg->replications = pCreateDb->replica;
pMsg->quorum = pCreateDb->quorum; pMsg->quorum = pCreateDb->quorum;
pMsg->ignoreExist = pCreateDb->ignoreExists; pMsg->ignoreExist = pCreateDb->ignoreExists;
pMsg->update = pCreateDb->update;
} }
int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql) { int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql) {
......
...@@ -119,6 +119,33 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version); ...@@ -119,6 +119,33 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes); int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes);
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder); STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
// ----------------- Semantic timestamp key definition
typedef uint64_t TKEY;
#define TKEY_INVALID UINT64_MAX
#define TKEY_NULL TKEY_INVALID
#define TKEY_NEGATIVE_FLAG (((TKEY)1) << 63)
#define TKEY_DELETE_FLAG (((TKEY)1) << 62)
#define TKEY_VALUE_FILTER (~(TKEY_NEGATIVE_FLAG | TKEY_DELETE_FLAG))
#define TKEY_IS_NEGATIVE(tkey) (((tkey)&TKEY_NEGATIVE_FLAG) != 0)
#define TKEY_IS_DELETED(tkey) (((tkey)&TKEY_DELETE_FLAG) != 0)
#define tdSetTKEYDeleted(tkey) ((tkey) | TKEY_DELETE_FLAG)
#define tdGetTKEY(key) (((TKEY)ABS(key)) | (TKEY_NEGATIVE_FLAG & (TKEY)(key)))
#define tdGetKey(tkey) (((TSKEY)((tkey)&TKEY_VALUE_FILTER)) * (TKEY_IS_NEGATIVE(tkey) ? -1 : 1))
static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) {
TSKEY key1 = tdGetKey(*(TKEY *)tkey1);
TSKEY key2 = tdGetKey(*(TKEY *)tkey2);
if (key1 < key2) {
return -1;
} else if (key1 > key2) {
return 1;
} else {
return 0;
}
}
// ----------------- Data row structure // ----------------- Data row structure
/* A data row, the format is like below: /* A data row, the format is like below:
...@@ -129,6 +156,8 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder); ...@@ -129,6 +156,8 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
* +----------+----------+---------------------------------+---------------------------------+ * +----------+----------+---------------------------------+---------------------------------+
* | len | sversion | First part | Second part | * | len | sversion | First part | Second part |
* +----------+----------+---------------------------------+---------------------------------+ * +----------+----------+---------------------------------+---------------------------------+
*
* NOTE: timestamp in this row structure is TKEY instead of TSKEY
*/ */
typedef void *SDataRow; typedef void *SDataRow;
...@@ -137,11 +166,13 @@ typedef void *SDataRow; ...@@ -137,11 +166,13 @@ typedef void *SDataRow;
#define dataRowLen(r) (*(uint16_t *)(r)) #define dataRowLen(r) (*(uint16_t *)(r))
#define dataRowVersion(r) *(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)) #define dataRowVersion(r) *(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))
#define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE) #define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE)
#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r))) #define dataRowTKey(r) (*(TKEY *)(dataRowTuple(r)))
#define dataRowKey(r) tdGetKey(dataRowTKey(r))
#define dataRowSetLen(r, l) (dataRowLen(r) = (l)) #define dataRowSetLen(r, l) (dataRowLen(r) = (l))
#define dataRowSetVersion(r, v) (dataRowVersion(r) = (v)) #define dataRowSetVersion(r, v) (dataRowVersion(r) = (v))
#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r)) #define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r))
#define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE) #define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE)
#define dataRowDeleted(r) TKEY_IS_DELETED(dataRowTKey(r))
SDataRow tdNewDataRowFromSchema(STSchema *pSchema); SDataRow tdNewDataRowFromSchema(STSchema *pSchema);
void tdFreeDataRow(SDataRow row); void tdFreeDataRow(SDataRow row);
...@@ -154,16 +185,18 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, i ...@@ -154,16 +185,18 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, i
int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE; int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE;
char * ptr = (char *)POINTER_SHIFT(row, dataRowLen(row)); char * ptr = (char *)POINTER_SHIFT(row, dataRowLen(row));
switch (type) { if (IS_VAR_DATA_TYPE(type)) {
case TSDB_DATA_TYPE_BINARY: *(VarDataOffsetT *)POINTER_SHIFT(row, toffset) = dataRowLen(row);
case TSDB_DATA_TYPE_NCHAR: memcpy(ptr, value, varDataTLen(value));
*(VarDataOffsetT *)POINTER_SHIFT(row, toffset) = dataRowLen(row); dataRowLen(row) += varDataTLen(value);
memcpy(ptr, value, varDataTLen(value)); } else {
dataRowLen(row) += varDataTLen(value); if (offset == 0) {
break; ASSERT(type == TSDB_DATA_TYPE_TIMESTAMP);
default: TKEY tvalue = tdGetTKEY(*(TSKEY *)value);
memcpy(POINTER_SHIFT(row, toffset), (void *)(&tvalue), TYPE_BYTES[type]);
} else {
memcpy(POINTER_SHIFT(row, toffset), value, TYPE_BYTES[type]); memcpy(POINTER_SHIFT(row, toffset), value, TYPE_BYTES[type]);
break; }
} }
return 0; return 0;
...@@ -171,12 +204,10 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, i ...@@ -171,12 +204,10 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, i
// NOTE: offset here including the header size // NOTE: offset here including the header size
static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) { static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) {
switch (type) { if (IS_VAR_DATA_TYPE(type)) {
case TSDB_DATA_TYPE_BINARY: return POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row, offset));
case TSDB_DATA_TYPE_NCHAR: } else {
return POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row, offset)); return POINTER_SHIFT(row, offset);
default:
return POINTER_SHIFT(row, offset);
} }
} }
...@@ -196,7 +227,6 @@ static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } ...@@ -196,7 +227,6 @@ static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints); void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints);
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints); void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints);
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows);
void dataColSetOffset(SDataCol *pCol, int nEle); void dataColSetOffset(SDataCol *pCol, int nEle);
bool isNEleNull(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle);
...@@ -204,28 +234,20 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints); ...@@ -204,28 +234,20 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints);
// Get the data pointer from a column-wised data // Get the data pointer from a column-wised data
static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) { static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) {
switch (pCol->type) { if (IS_VAR_DATA_TYPE(pCol->type)) {
case TSDB_DATA_TYPE_BINARY: return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
case TSDB_DATA_TYPE_NCHAR: } else {
return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]); return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
break;
default:
return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
break;
} }
} }
static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) { static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) {
ASSERT(rows > 0); ASSERT(rows > 0);
switch (pDataCol->type) { if (IS_VAR_DATA_TYPE(pDataCol->type)) {
case TSDB_DATA_TYPE_BINARY: return pDataCol->dataOff[rows - 1] + varDataTLen(tdGetColDataOfRow(pDataCol, rows - 1));
case TSDB_DATA_TYPE_NCHAR: } else {
return pDataCol->dataOff[rows - 1] + varDataTLen(tdGetColDataOfRow(pDataCol, rows - 1)); return TYPE_BYTES[pDataCol->type] * rows;
break;
default:
return TYPE_BYTES[pDataCol->type] * rows;
} }
} }
...@@ -243,9 +265,14 @@ typedef struct { ...@@ -243,9 +265,14 @@ typedef struct {
} SDataCols; } SDataCols;
#define keyCol(pCols) (&((pCols)->cols[0])) // Key column #define keyCol(pCols) (&((pCols)->cols[0])) // Key column
#define dataColsKeyAt(pCols, idx) ((TSKEY *)(keyCol(pCols)->pData))[(idx)] #define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)]
#define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0) #define dataColsKeyAt(pCols, idx) tdGetKey(dataColsTKeyAt(pCols, idx))
#define dataColsKeyLast(pCols) ((pCols->numOfRows == 0) ? 0 : dataColsKeyAt(pCols, (pCols)->numOfRows - 1)) #define dataColsTKeyFirst(pCols) (((pCols)->numOfRows == 0) ? TKEY_INVALID : dataColsTKeyAt(pCols, 0))
#define dataColsKeyFirst(pCols) (((pCols)->numOfRows == 0) ? TSDB_DATA_TIMESTAMP_NULL : dataColsKeyAt(pCols, 0))
#define dataColsTKeyLast(pCols) \
(((pCols)->numOfRows == 0) ? TKEY_INVALID : dataColsTKeyAt(pCols, (pCols)->numOfRows - 1))
#define dataColsKeyLast(pCols) \
(((pCols)->numOfRows == 0) ? TSDB_DATA_TIMESTAMP_NULL : dataColsKeyAt(pCols, (pCols)->numOfRows - 1))
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
void tdResetDataCols(SDataCols *pCols); void tdResetDataCols(SDataCols *pCols);
...@@ -253,10 +280,7 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema); ...@@ -253,10 +280,7 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
void tdFreeDataCols(SDataCols *pCols); void tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols);
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows);
// ----------------- K-V data row structure // ----------------- K-V data row structure
/* /*
......
...@@ -88,6 +88,7 @@ extern int16_t tsWAL; ...@@ -88,6 +88,7 @@ extern int16_t tsWAL;
extern int32_t tsFsyncPeriod; extern int32_t tsFsyncPeriod;
extern int32_t tsReplications; extern int32_t tsReplications;
extern int32_t tsQuorum; extern int32_t tsQuorum;
extern int32_t tsUpdate;
// balance // balance
extern int32_t tsEnableBalance; extern int32_t tsEnableBalance;
......
...@@ -18,6 +18,9 @@ ...@@ -18,6 +18,9 @@
#include "tcoding.h" #include "tcoding.h"
#include "wchar.h" #include "wchar.h"
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows);
/** /**
* Duplicate the schema and return a new object * Duplicate the schema and return a new object
*/ */
...@@ -202,7 +205,7 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) ...@@ -202,7 +205,7 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints)
pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE; pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE;
pDataCol->len = 0; pDataCol->len = 0;
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { if (IS_VAR_DATA_TYPE(pDataCol->type)) {
pDataCol->dataOff = (VarDataOffsetT *)(*pBuf); pDataCol->dataOff = (VarDataOffsetT *)(*pBuf);
pDataCol->pData = POINTER_SHIFT(*pBuf, sizeof(VarDataOffsetT) * maxPoints); pDataCol->pData = POINTER_SHIFT(*pBuf, sizeof(VarDataOffsetT) * maxPoints);
pDataCol->spaceSize = pDataCol->bytes * maxPoints; pDataCol->spaceSize = pDataCol->bytes * maxPoints;
...@@ -215,60 +218,29 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) ...@@ -215,60 +218,29 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints)
} }
} }
// value from timestamp should be TKEY here instead of TSKEY
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) { void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) {
ASSERT(pCol != NULL && value != NULL); ASSERT(pCol != NULL && value != NULL);
switch (pCol->type) { if (IS_VAR_DATA_TYPE(pCol->type)) {
case TSDB_DATA_TYPE_BINARY: // set offset
case TSDB_DATA_TYPE_NCHAR: pCol->dataOff[numOfRows] = pCol->len;
// set offset // Copy data
pCol->dataOff[numOfRows] = pCol->len; memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
// Copy data // Update the length
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value)); pCol->len += varDataTLen(value);
// Update the length
pCol->len += varDataTLen(value);
break;
default:
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
pCol->len += pCol->bytes;
break;
}
}
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows) {
int pointsLeft = numOfRows - pointsToPop;
ASSERT(pointsLeft > 0);
if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) {
ASSERT(pCol->len > 0);
VarDataOffsetT toffset = pCol->dataOff[pointsToPop];
pCol->len = pCol->len - toffset;
ASSERT(pCol->len > 0);
memmove(pCol->pData, POINTER_SHIFT(pCol->pData, toffset), pCol->len);
dataColSetOffset(pCol, pointsLeft);
} else { } else {
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows); ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
pCol->len = TYPE_BYTES[pCol->type] * pointsLeft; memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
memmove(pCol->pData, POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * pointsToPop), pCol->len); pCol->len += pCol->bytes;
} }
} }
bool isNEleNull(SDataCol *pCol, int nEle) { bool isNEleNull(SDataCol *pCol, int nEle) {
switch (pCol->type) { for (int i = 0; i < nEle; i++) {
case TSDB_DATA_TYPE_BINARY: if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
case TSDB_DATA_TYPE_NCHAR:
for (int i = 0; i < nEle; i++) {
if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
}
return true;
default:
for (int i = 0; i < nEle; i++) {
if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
}
return true;
} }
return true;
} }
void dataColSetNullAt(SDataCol *pCol, int index) { void dataColSetNullAt(SDataCol *pCol, int index) {
...@@ -390,7 +362,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { ...@@ -390,7 +362,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize; pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize;
pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf))); pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf)));
if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) { if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) {
ASSERT(pDataCols->cols[i].dataOff != NULL); ASSERT(pDataCols->cols[i].dataOff != NULL);
pRet->cols[i].dataOff = pRet->cols[i].dataOff =
(int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf))); (int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf)));
...@@ -400,7 +372,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { ...@@ -400,7 +372,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
pRet->cols[i].len = pDataCols->cols[i].len; pRet->cols[i].len = pDataCols->cols[i].len;
if (pDataCols->cols[i].len > 0) { if (pDataCols->cols[i].len > 0) {
memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) { if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) {
memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints); memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints);
} }
} }
...@@ -420,58 +392,54 @@ void tdResetDataCols(SDataCols *pCols) { ...@@ -420,58 +392,54 @@ void tdResetDataCols(SDataCols *pCols) {
} }
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) { void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) {
ASSERT(dataColsKeyLast(pCols) < dataRowKey(row)); ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row));
int rcol = 0; int rcol = 0;
int dcol = 0; int dcol = 0;
while (dcol < pCols->numOfCols) { if (dataRowDeleted(row)) {
SDataCol *pDataCol = &(pCols->cols[dcol]); for (; dcol < pCols->numOfCols; dcol++) {
if (rcol >= schemaNCols(pSchema)) { SDataCol *pDataCol = &(pCols->cols[dcol]);
dataColSetNullAt(pDataCol, pCols->numOfRows); if (dcol == 0) {
dcol++; dataColAppendVal(pDataCol, dataRowTuple(row), pCols->numOfRows, pCols->maxPoints);
continue; } else {
dataColSetNullAt(pDataCol, pCols->numOfRows);
}
} }
} else {
while (dcol < pCols->numOfCols) {
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) {
dataColSetNullAt(pDataCol, pCols->numOfRows);
dcol++;
continue;
}
STColumn *pRowCol = schemaColAt(pSchema, rcol); STColumn *pRowCol = schemaColAt(pSchema, rcol);
if (pRowCol->colId == pDataCol->colId) { if (pRowCol->colId == pDataCol->colId) {
void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset+TD_DATA_ROW_HEAD_SIZE); void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
dcol++; dcol++;
rcol++; rcol++;
} else if (pRowCol->colId < pDataCol->colId) { } else if (pRowCol->colId < pDataCol->colId) {
rcol++; rcol++;
} else { } else {
dataColSetNullAt(pDataCol, pCols->numOfRows); dataColSetNullAt(pDataCol, pCols->numOfRows);
dcol++; dcol++;
}
} }
} }
pCols->numOfRows++; pCols->numOfRows++;
} }
// Pop pointsToPop points from the SDataCols
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
int pointsLeft = pCols->numOfRows - pointsToPop;
if (pointsLeft <= 0) {
tdResetDataCols(pCols);
return;
}
for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
SDataCol *pCol = pCols->cols + iCol;
dataColPopPoints(pCol, pointsToPop, pCols->numOfRows);
}
pCols->numOfRows = pointsLeft;
}
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
ASSERT(target->numOfCols == source->numOfCols); ASSERT(target->numOfCols == source->numOfCols);
SDataCols *pTarget = NULL; SDataCols *pTarget = NULL;
if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
for (int i = 0; i < rowsToMerge; i++) { for (int i = 0; i < rowsToMerge; i++) {
for (int j = 0; j < source->numOfCols; j++) { for (int j = 0; j < source->numOfCols; j++) {
if (source->cols[j].len > 0) { if (source->cols[j].len > 0) {
...@@ -499,17 +467,23 @@ _err: ...@@ -499,17 +467,23 @@ _err:
return -1; return -1;
} }
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows) { // src2 data has more priority than src1
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows) {
tdResetDataCols(target); tdResetDataCols(target);
ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows); ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
while (target->numOfRows < tRows) { while (target->numOfRows < tRows) {
if (*iter1 >= limit1 && *iter2 >= limit2) break; if (*iter1 >= limit1 && *iter2 >= limit2) break;
TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1]; TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : dataColsKeyAt(src1, *iter1);
TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2]; TKEY tkey1 = (*iter1 >= limit1) ? TKEY_NULL : dataColsTKeyAt(src1, *iter1);
TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : dataColsKeyAt(src2, *iter2);
TKEY tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2);
if (key1 <= key2) { ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1)));
if (key1 < key2) {
for (int i = 0; i < src1->numOfCols; i++) { for (int i = 0; i < src1->numOfCols; i++) {
ASSERT(target->cols[i].type == src1->cols[i].type); ASSERT(target->cols[i].type == src1->cols[i].type);
if (src1->cols[i].len > 0) { if (src1->cols[i].len > 0) {
...@@ -520,19 +494,23 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi ...@@ -520,19 +494,23 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi
target->numOfRows++; target->numOfRows++;
(*iter1)++; (*iter1)++;
if (key1 == key2) (*iter2)++; } else if (key1 >= key2) {
} else { if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) {
for (int i = 0; i < src2->numOfCols; i++) { for (int i = 0; i < src2->numOfCols; i++) {
ASSERT(target->cols[i].type == src2->cols[i].type); ASSERT(target->cols[i].type == src2->cols[i].type);
if (src2->cols[i].len > 0) { if (src2->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows, dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
target->maxPoints); target->maxPoints);
}
} }
target->numOfRows++;
} }
target->numOfRows++;
(*iter2)++; (*iter2)++;
if (key1 == key2) (*iter1)++;
} }
ASSERT(target->numOfRows <= target->maxPoints);
} }
} }
......
...@@ -120,6 +120,7 @@ int16_t tsWAL = TSDB_DEFAULT_WAL_LEVEL; ...@@ -120,6 +120,7 @@ int16_t tsWAL = TSDB_DEFAULT_WAL_LEVEL;
int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD; int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION; int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION;
int32_t tsQuorum = TSDB_DEFAULT_DB_QUORUM_OPTION; int32_t tsQuorum = TSDB_DEFAULT_DB_QUORUM_OPTION;
int32_t tsUpdate = TSDB_DEFAULT_DB_UPDATE_OPTION;
int32_t tsMaxVgroupsPerDb = 0; int32_t tsMaxVgroupsPerDb = 0;
int32_t tsMinTablePerVnode = TSDB_TABLES_STEP; int32_t tsMinTablePerVnode = TSDB_TABLES_STEP;
int32_t tsMaxTablePerVnode = TSDB_DEFAULT_TABLES; int32_t tsMaxTablePerVnode = TSDB_DEFAULT_TABLES;
...@@ -786,6 +787,16 @@ static void doInitGlobalConfig(void) { ...@@ -786,6 +787,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "update";
cfg.ptr = &tsUpdate;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MIN_DB_UPDATE;
cfg.maxValue = TSDB_MAX_DB_UPDATE;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "mqttHostName"; cfg.option = "mqttHostName";
cfg.ptr = tsMqttHostName; cfg.ptr = tsMqttHostName;
cfg.valType = TAOS_CFG_VTYPE_STRING; cfg.valType = TAOS_CFG_VTYPE_STRING;
......
...@@ -75,6 +75,7 @@ extern const int32_t TYPE_BYTES[11]; ...@@ -75,6 +75,7 @@ extern const int32_t TYPE_BYTES[11];
#define TSDB_DATA_SMALLINT_NULL 0x8000 #define TSDB_DATA_SMALLINT_NULL 0x8000
#define TSDB_DATA_INT_NULL 0x80000000L #define TSDB_DATA_INT_NULL 0x80000000L
#define TSDB_DATA_BIGINT_NULL 0x8000000000000000L #define TSDB_DATA_BIGINT_NULL 0x8000000000000000L
#define TSDB_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL
#define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN #define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
#define TSDB_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN #define TSDB_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN
...@@ -363,6 +364,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf ...@@ -363,6 +364,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_MAX_WAL_LEVEL 2 #define TSDB_MAX_WAL_LEVEL 2
#define TSDB_DEFAULT_WAL_LEVEL 1 #define TSDB_DEFAULT_WAL_LEVEL 1
#define TSDB_MIN_DB_UPDATE 0
#define TSDB_MAX_DB_UPDATE 1
#define TSDB_DEFAULT_DB_UPDATE_OPTION 0
#define TSDB_MIN_FSYNC_PERIOD 0 #define TSDB_MIN_FSYNC_PERIOD 0
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond #define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second #define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second
......
...@@ -544,6 +544,8 @@ typedef struct { ...@@ -544,6 +544,8 @@ typedef struct {
int8_t replications; int8_t replications;
int8_t quorum; int8_t quorum;
int8_t ignoreExist; int8_t ignoreExist;
int8_t update;
int8_t reserve[9];
} SCreateDbMsg, SAlterDbMsg; } SCreateDbMsg, SAlterDbMsg;
typedef struct { typedef struct {
...@@ -654,7 +656,8 @@ typedef struct { ...@@ -654,7 +656,8 @@ typedef struct {
int8_t replications; int8_t replications;
int8_t wals; int8_t wals;
int8_t quorum; int8_t quorum;
int8_t reserved[16]; int8_t update;
int8_t reserved[15];
} SVnodeCfg; } SVnodeCfg;
typedef struct { typedef struct {
......
...@@ -65,6 +65,7 @@ typedef struct { ...@@ -65,6 +65,7 @@ typedef struct {
int32_t maxRowsPerFileBlock; // maximum rows per file block int32_t maxRowsPerFileBlock; // maximum rows per file block
int8_t precision; int8_t precision;
int8_t compression; int8_t compression;
int8_t update;
} STsdbCfg; } STsdbCfg;
// --------- TSDB REPOSITORY USAGE STATISTICS // --------- TSDB REPOSITORY USAGE STATISTICS
......
...@@ -114,114 +114,115 @@ ...@@ -114,114 +114,115 @@
#define TK_FSYNC 95 #define TK_FSYNC 95
#define TK_COMP 96 #define TK_COMP 96
#define TK_PRECISION 97 #define TK_PRECISION 97
#define TK_LP 98 #define TK_UPDATE 98
#define TK_RP 99 #define TK_LP 99
#define TK_TAGS 100 #define TK_RP 100
#define TK_USING 101 #define TK_TAGS 101
#define TK_AS 102 #define TK_USING 102
#define TK_COMMA 103 #define TK_AS 103
#define TK_NULL 104 #define TK_COMMA 104
#define TK_SELECT 105 #define TK_NULL 105
#define TK_UNION 106 #define TK_SELECT 106
#define TK_ALL 107 #define TK_UNION 107
#define TK_FROM 108 #define TK_ALL 108
#define TK_VARIABLE 109 #define TK_FROM 109
#define TK_INTERVAL 110 #define TK_VARIABLE 110
#define TK_FILL 111 #define TK_INTERVAL 111
#define TK_SLIDING 112 #define TK_FILL 112
#define TK_ORDER 113 #define TK_SLIDING 113
#define TK_BY 114 #define TK_ORDER 114
#define TK_ASC 115 #define TK_BY 115
#define TK_DESC 116 #define TK_ASC 116
#define TK_GROUP 117 #define TK_DESC 117
#define TK_HAVING 118 #define TK_GROUP 118
#define TK_LIMIT 119 #define TK_HAVING 119
#define TK_OFFSET 120 #define TK_LIMIT 120
#define TK_SLIMIT 121 #define TK_OFFSET 121
#define TK_SOFFSET 122 #define TK_SLIMIT 122
#define TK_WHERE 123 #define TK_SOFFSET 123
#define TK_NOW 124 #define TK_WHERE 124
#define TK_RESET 125 #define TK_NOW 125
#define TK_QUERY 126 #define TK_RESET 126
#define TK_ADD 127 #define TK_QUERY 127
#define TK_COLUMN 128 #define TK_ADD 128
#define TK_TAG 129 #define TK_COLUMN 129
#define TK_CHANGE 130 #define TK_TAG 130
#define TK_SET 131 #define TK_CHANGE 131
#define TK_KILL 132 #define TK_SET 132
#define TK_CONNECTION 133 #define TK_KILL 133
#define TK_STREAM 134 #define TK_CONNECTION 134
#define TK_COLON 135 #define TK_STREAM 135
#define TK_ABORT 136 #define TK_COLON 136
#define TK_AFTER 137 #define TK_ABORT 137
#define TK_ATTACH 138 #define TK_AFTER 138
#define TK_BEFORE 139 #define TK_ATTACH 139
#define TK_BEGIN 140 #define TK_BEFORE 140
#define TK_CASCADE 141 #define TK_BEGIN 141
#define TK_CLUSTER 142 #define TK_CASCADE 142
#define TK_CONFLICT 143 #define TK_CLUSTER 143
#define TK_COPY 144 #define TK_CONFLICT 144
#define TK_DEFERRED 145 #define TK_COPY 145
#define TK_DELIMITERS 146 #define TK_DEFERRED 146
#define TK_DETACH 147 #define TK_DELIMITERS 147
#define TK_EACH 148 #define TK_DETACH 148
#define TK_END 149 #define TK_EACH 149
#define TK_EXPLAIN 150 #define TK_END 150
#define TK_FAIL 151 #define TK_EXPLAIN 151
#define TK_FOR 152 #define TK_FAIL 152
#define TK_IGNORE 153 #define TK_FOR 153
#define TK_IMMEDIATE 154 #define TK_IGNORE 154
#define TK_INITIALLY 155 #define TK_IMMEDIATE 155
#define TK_INSTEAD 156 #define TK_INITIALLY 156
#define TK_MATCH 157 #define TK_INSTEAD 157
#define TK_KEY 158 #define TK_MATCH 158
#define TK_OF 159 #define TK_KEY 159
#define TK_RAISE 160 #define TK_OF 160
#define TK_REPLACE 161 #define TK_RAISE 161
#define TK_RESTRICT 162 #define TK_REPLACE 162
#define TK_ROW 163 #define TK_RESTRICT 163
#define TK_STATEMENT 164 #define TK_ROW 164
#define TK_TRIGGER 165 #define TK_STATEMENT 165
#define TK_VIEW 166 #define TK_TRIGGER 166
#define TK_COUNT 167 #define TK_VIEW 167
#define TK_SUM 168 #define TK_COUNT 168
#define TK_AVG 169 #define TK_SUM 169
#define TK_MIN 170 #define TK_AVG 170
#define TK_MAX 171 #define TK_MIN 171
#define TK_FIRST 172 #define TK_MAX 172
#define TK_LAST 173 #define TK_FIRST 173
#define TK_TOP 174 #define TK_LAST 174
#define TK_BOTTOM 175 #define TK_TOP 175
#define TK_STDDEV 176 #define TK_BOTTOM 176
#define TK_PERCENTILE 177 #define TK_STDDEV 177
#define TK_APERCENTILE 178 #define TK_PERCENTILE 178
#define TK_LEASTSQUARES 179 #define TK_APERCENTILE 179
#define TK_HISTOGRAM 180 #define TK_LEASTSQUARES 180
#define TK_DIFF 181 #define TK_HISTOGRAM 181
#define TK_SPREAD 182 #define TK_DIFF 182
#define TK_TWA 183 #define TK_SPREAD 183
#define TK_INTERP 184 #define TK_TWA 184
#define TK_LAST_ROW 185 #define TK_INTERP 185
#define TK_RATE 186 #define TK_LAST_ROW 186
#define TK_IRATE 187 #define TK_RATE 187
#define TK_SUM_RATE 188 #define TK_IRATE 188
#define TK_SUM_IRATE 189 #define TK_SUM_RATE 189
#define TK_AVG_RATE 190 #define TK_SUM_IRATE 190
#define TK_AVG_IRATE 191 #define TK_AVG_RATE 191
#define TK_TBID 192 #define TK_AVG_IRATE 192
#define TK_SEMI 193 #define TK_TBID 193
#define TK_NONE 194 #define TK_SEMI 194
#define TK_PREV 195 #define TK_NONE 195
#define TK_LINEAR 196 #define TK_PREV 196
#define TK_IMPORT 197 #define TK_LINEAR 197
#define TK_METRIC 198 #define TK_IMPORT 198
#define TK_TBNAME 199 #define TK_METRIC 199
#define TK_JOIN 200 #define TK_TBNAME 200
#define TK_METRICS 201 #define TK_JOIN 201
#define TK_STABLE 202 #define TK_METRICS 202
#define TK_INSERT 203 #define TK_STABLE 203
#define TK_INTO 204 #define TK_INSERT 204
#define TK_VALUES 205 #define TK_INTO 205
#define TK_VALUES 206
#define TK_SPACE 300 #define TK_SPACE 300
#define TK_COMMENT 301 #define TK_COMMENT 301
......
...@@ -172,7 +172,8 @@ typedef struct { ...@@ -172,7 +172,8 @@ typedef struct {
int8_t walLevel; int8_t walLevel;
int8_t replications; int8_t replications;
int8_t quorum; int8_t quorum;
int8_t reserved[12]; int8_t update;
int8_t reserved[11];
} SDbCfg; } SDbCfg;
typedef struct SDbObj { typedef struct SDbObj {
......
...@@ -319,6 +319,11 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) { ...@@ -319,6 +319,11 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
} }
#endif #endif
if (pCfg->update < TSDB_MIN_DB_UPDATE || pCfg->update > TSDB_MAX_DB_UPDATE) {
mError("invalid db option update:%d valid range: [%d, %d]", pCfg->update, TSDB_MIN_DB_UPDATE, TSDB_MAX_DB_UPDATE);
return TSDB_CODE_MND_INVALID_DB_OPTION;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -339,6 +344,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) { ...@@ -339,6 +344,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->walLevel < 0) pCfg->walLevel = tsWAL; if (pCfg->walLevel < 0) pCfg->walLevel = tsWAL;
if (pCfg->replications < 0) pCfg->replications = tsReplications; if (pCfg->replications < 0) pCfg->replications = tsReplications;
if (pCfg->quorum < 0) pCfg->quorum = tsQuorum; if (pCfg->quorum < 0) pCfg->quorum = tsQuorum;
if (pCfg->update < 0) pCfg->update = tsUpdate;
} }
static int32_t mnodeCreateDbCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeCreateDbCb(SMnodeMsg *pMsg, int32_t code) {
...@@ -391,7 +397,8 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg * ...@@ -391,7 +397,8 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *
.compression = pCreate->compression, .compression = pCreate->compression,
.walLevel = pCreate->walLevel, .walLevel = pCreate->walLevel,
.replications = pCreate->replications, .replications = pCreate->replications,
.quorum = pCreate->quorum .quorum = pCreate->quorum,
.update = pCreate->update
}; };
mnodeSetDefaultDbCfg(&pDb->cfg); mnodeSetDefaultDbCfg(&pDb->cfg);
...@@ -610,6 +617,12 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn ...@@ -610,6 +617,12 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 1;
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema[cols].name, "update");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE; pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "status"); strcpy(pSchema[cols].name, "status");
...@@ -749,6 +762,10 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void ...@@ -749,6 +762,10 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2); STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pDb->cfg.update;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pDb->status == TSDB_DB_STATUS_READY) { if (pDb->status == TSDB_DB_STATUS_READY) {
const char *src = "ready"; const char *src = "ready";
...@@ -848,6 +865,7 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) { ...@@ -848,6 +865,7 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
int8_t replications = pAlter->replications; int8_t replications = pAlter->replications;
int8_t quorum = pAlter->quorum; int8_t quorum = pAlter->quorum;
int8_t precision = pAlter->precision; int8_t precision = pAlter->precision;
int8_t update = pAlter->update;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
...@@ -950,6 +968,16 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) { ...@@ -950,6 +968,16 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
newCfg.quorum = quorum; newCfg.quorum = quorum;
} }
if (update >= 0 && update != pDb->cfg.update) {
#if 0
mDebug("db:%s, update:%d change to %d", pDb->name, pDb->cfg.update, update);
newCfg.update = update;
#else
mError("db:%s, can't alter update option", pDb->name);
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
#endif
}
return newCfg; return newCfg;
} }
......
...@@ -850,6 +850,7 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) { ...@@ -850,6 +850,7 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) {
pCfg->replications = (int8_t) pVgroup->numOfVnodes; pCfg->replications = (int8_t) pVgroup->numOfVnodes;
pCfg->wals = 3; pCfg->wals = 3;
pCfg->quorum = pDb->cfg.quorum; pCfg->quorum = pDb->cfg.quorum;
pCfg->update = pDb->cfg.update;
SVnodeDesc *pNodes = pVnode->nodes; SVnodeDesc *pNodes = pVnode->nodes;
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
......
...@@ -129,6 +129,7 @@ typedef struct SCreateDBInfo { ...@@ -129,6 +129,7 @@ typedef struct SCreateDBInfo {
int32_t compressionLevel; int32_t compressionLevel;
SStrToken precision; SStrToken precision;
bool ignoreExists; bool ignoreExists;
int8_t update;
tVariantList *keep; tVariantList *keep;
} SCreateDBInfo; } SCreateDBInfo;
......
...@@ -239,6 +239,7 @@ wal(Y) ::= WAL INTEGER(X). { Y = X; } ...@@ -239,6 +239,7 @@ wal(Y) ::= WAL INTEGER(X). { Y = X; }
fsync(Y) ::= FSYNC INTEGER(X). { Y = X; } fsync(Y) ::= FSYNC INTEGER(X). { Y = X; }
comp(Y) ::= COMP INTEGER(X). { Y = X; } comp(Y) ::= COMP INTEGER(X). { Y = X; }
prec(Y) ::= PRECISION STRING(X). { Y = X; } prec(Y) ::= PRECISION STRING(X). { Y = X; }
update(Y) ::= UPDATE INTEGER(X). { Y = X; }
%type db_optr {SCreateDBInfo} %type db_optr {SCreateDBInfo}
db_optr(Y) ::= . {setDefaultCreateDbOption(&Y);} db_optr(Y) ::= . {setDefaultCreateDbOption(&Y);}
...@@ -256,6 +257,7 @@ db_optr(Y) ::= db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtol(X.z ...@@ -256,6 +257,7 @@ db_optr(Y) ::= db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtol(X.z
db_optr(Y) ::= db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); } db_optr(Y) ::= db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) prec(X). { Y = Z; Y.precision = X; } db_optr(Y) ::= db_optr(Z) prec(X). { Y = Z; Y.precision = X; }
db_optr(Y) ::= db_optr(Z) keep(X). { Y = Z; Y.keep = X; } db_optr(Y) ::= db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
db_optr(Y) ::= db_optr(Z) update(X). { Y = Z; Y.update = strtol(X.z, NULL, 10); }
%type alter_db_optr {SCreateDBInfo} %type alter_db_optr {SCreateDBInfo}
alter_db_optr(Y) ::= . { setDefaultCreateDbOption(&Y);} alter_db_optr(Y) ::= . { setDefaultCreateDbOption(&Y);}
...@@ -267,6 +269,7 @@ alter_db_optr(Y) ::= alter_db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = s ...@@ -267,6 +269,7 @@ alter_db_optr(Y) ::= alter_db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = s
alter_db_optr(Y) ::= alter_db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); } alter_db_optr(Y) ::= alter_db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); } alter_db_optr(Y) ::= alter_db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtol(X.z, NULL, 10); } alter_db_optr(Y) ::= alter_db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) update(X). { Y = Z; Y.update = strtol(X.z, NULL, 10); }
%type typename {TAOS_FIELD} %type typename {TAOS_FIELD}
typename(A) ::= ids(X). { typename(A) ::= ids(X). {
......
...@@ -414,9 +414,9 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr ...@@ -414,9 +414,9 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
} }
if (cond.start != NULL) { if (cond.start != NULL) {
iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->keyInfo.type, TSDB_ORDER_ASC); iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_ASC);
} else { } else {
iter = tSkipListCreateIterFromVal(pSkipList, (char*)(cond.end ? cond.end->v: NULL), pSkipList->keyInfo.type, TSDB_ORDER_DESC); iter = tSkipListCreateIterFromVal(pSkipList, (char*)(cond.end ? cond.end->v: NULL), pSkipList->type, TSDB_ORDER_DESC);
} }
if (cond.start != NULL) { if (cond.start != NULL) {
...@@ -431,7 +431,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr ...@@ -431,7 +431,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
break; break;
} }
STableKeyInfo info = {.pTable = *(void**)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL}; STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info); taosArrayPush(result, &info);
} }
} else if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL) { // greater equal } else if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL) { // greater equal
...@@ -449,7 +449,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr ...@@ -449,7 +449,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
if (ret == 0 && optr == TSDB_RELATION_GREATER) { if (ret == 0 && optr == TSDB_RELATION_GREATER) {
continue; continue;
} else { } else {
STableKeyInfo info = {.pTable = *(void**)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL}; STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info); taosArrayPush(result, &info);
comp = false; comp = false;
} }
...@@ -464,22 +464,22 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr ...@@ -464,22 +464,22 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
continue; continue;
} }
STableKeyInfo info = {.pTable = *(void**)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL}; STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info); taosArrayPush(result, &info);
} }
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
comp = true; comp = true;
iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->keyInfo.type, TSDB_ORDER_DESC); iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_DESC);
while(tSkipListIterNext(iter)) { while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter); SSkipListNode* pNode = tSkipListIterGet(iter);
comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0); comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
if (comp) { if (comp) {
continue; continue;
} }
STableKeyInfo info = {.pTable = *(void**)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL}; STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info); taosArrayPush(result, &info);
} }
...@@ -503,7 +503,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr ...@@ -503,7 +503,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
if (ret == 0 && optr == TSDB_RELATION_LESS) { if (ret == 0 && optr == TSDB_RELATION_LESS) {
continue; continue;
} else { } else {
STableKeyInfo info = {.pTable = *(void **)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL}; STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info); taosArrayPush(result, &info);
comp = false; // no need to compare anymore comp = false; // no need to compare anymore
} }
...@@ -517,7 +517,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr ...@@ -517,7 +517,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
bool isnull = isNull(SL_GET_NODE_KEY(pSkipList, pNode), pQueryInfo->sch.type); bool isnull = isNull(SL_GET_NODE_KEY(pSkipList, pNode), pQueryInfo->sch.type);
if ((pQueryInfo->optr == TSDB_RELATION_ISNULL && isnull) || if ((pQueryInfo->optr == TSDB_RELATION_ISNULL && isnull) ||
(pQueryInfo->optr == TSDB_RELATION_NOTNULL && (!isnull))) { (pQueryInfo->optr == TSDB_RELATION_NOTNULL && (!isnull))) {
STableKeyInfo info = {.pTable = *(void **)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL}; STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info); taosArrayPush(result, &info);
} }
} }
...@@ -681,7 +681,7 @@ static void tSQLBinaryTraverseOnSkipList(tExprNode *pExpr, SArray *pResult, SSki ...@@ -681,7 +681,7 @@ static void tSQLBinaryTraverseOnSkipList(tExprNode *pExpr, SArray *pResult, SSki
while (tSkipListIterNext(iter)) { while (tSkipListIterNext(iter)) {
SSkipListNode *pNode = tSkipListIterGet(iter); SSkipListNode *pNode = tSkipListIterGet(iter);
if (filterItem(pExpr, pNode, param)) { if (filterItem(pExpr, pNode, param)) {
taosArrayPush(pResult, SL_GET_NODE_DATA(pNode)); taosArrayPush(pResult, &(SL_GET_NODE_DATA(pNode)));
} }
} }
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
...@@ -696,7 +696,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, ...@@ -696,7 +696,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo,
SSkipListNode *pNode = tSkipListIterGet(iter); SSkipListNode *pNode = tSkipListIterGet(iter);
char * pData = SL_GET_NODE_DATA(pNode); char * pData = SL_GET_NODE_DATA(pNode);
tstr *name = (tstr*) tsdbGetTableName(*(void**) pData); tstr *name = (tstr*) tsdbGetTableName((void*) pData);
// todo speed up by using hash // todo speed up by using hash
if (pQueryInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) { if (pQueryInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
...@@ -710,7 +710,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, ...@@ -710,7 +710,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo,
} }
if (addToResult) { if (addToResult) {
STableKeyInfo info = {.pTable = *(void**)pData, .lastKey = TSKEY_INITIAL_VAL}; STableKeyInfo info = {.pTable = (void*)pData, .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(res, &info); taosArrayPush(res, &info);
} }
} }
......
...@@ -872,5 +872,6 @@ void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) { ...@@ -872,5 +872,6 @@ void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) {
pDBInfo->quorum = -1; pDBInfo->quorum = -1;
pDBInfo->keep = NULL; pDBInfo->keep = NULL;
pDBInfo->update = -1;
memset(&pDBInfo->precision, 0, sizeof(SStrToken)); memset(&pDBInfo->precision, 0, sizeof(SStrToken));
} }
...@@ -155,6 +155,7 @@ static SKeyword keywordTable[] = { ...@@ -155,6 +155,7 @@ static SKeyword keywordTable[] = {
{"INSERT", TK_INSERT}, {"INSERT", TK_INSERT},
{"INTO", TK_INTO}, {"INTO", TK_INTO},
{"VALUES", TK_VALUES}, {"VALUES", TK_VALUES},
{"UPDATE", TK_UPDATE},
{"RESET", TK_RESET}, {"RESET", TK_RESET},
{"QUERY", TK_QUERY}, {"QUERY", TK_QUERY},
{"ADD", TK_ADD}, {"ADD", TK_ADD},
...@@ -664,4 +665,4 @@ void taosCleanupKeywordsTable() { ...@@ -664,4 +665,4 @@ void taosCleanupKeywordsTable() {
if (m != NULL && atomic_val_compare_exchange_ptr(&keywordHashTable, m, 0) == m) { if (m != NULL && atomic_val_compare_exchange_ptr(&keywordHashTable, m, 0) == m) {
taosHashCleanup(m); taosHashCleanup(m);
} }
} }
\ No newline at end of file
此差异已折叠。
...@@ -320,6 +320,15 @@ typedef struct { ...@@ -320,6 +320,15 @@ typedef struct {
void* compBuffer; // Buffer for temperary compress/decompress purpose void* compBuffer; // Buffer for temperary compress/decompress purpose
} SRWHelper; } SRWHelper;
typedef struct {
int rowsInserted;
int rowsUpdated;
int rowsDeleteSucceed;
int rowsDeleteFailed;
int nOperations;
TSKEY keyFirst;
TSKEY keyLast;
} SMergeInfo;
// ------------------ tsdbScan.c // ------------------ tsdbScan.c
typedef struct { typedef struct {
SFileGroup fGroup; SFileGroup fGroup;
...@@ -422,7 +431,7 @@ void tsdbCloseBufPool(STsdbRepo* pRepo); ...@@ -422,7 +431,7 @@ void tsdbCloseBufPool(STsdbRepo* pRepo);
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
// ------------------ tsdbMemTable.c // ------------------ tsdbMemTable.c
int tsdbInsertRowToMem(STsdbRepo* pRepo, SDataRow row, STable* pTable); int tsdbUpdateRowInMem(STsdbRepo* pRepo, SDataRow row, STable* pTable);
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
...@@ -430,7 +439,7 @@ void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem) ...@@ -430,7 +439,7 @@ void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem)
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
int tsdbAsyncCommit(STsdbRepo* pRepo); int tsdbAsyncCommit(STsdbRepo* pRepo);
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
TSKEY* filterKeys, int nFilterKeys); TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo);
static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
if (pIter == NULL) return NULL; if (pIter == NULL) return NULL;
...@@ -438,16 +447,23 @@ static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { ...@@ -438,16 +447,23 @@ static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
SSkipListNode* node = tSkipListIterGet(pIter); SSkipListNode* node = tSkipListIterGet(pIter);
if (node == NULL) return NULL; if (node == NULL) return NULL;
return *(SDataRow *)SL_GET_NODE_DATA(node); return (SDataRow)SL_GET_NODE_DATA(node);
} }
static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) { static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) {
SDataRow row = tsdbNextIterRow(pIter); SDataRow row = tsdbNextIterRow(pIter);
if (row == NULL) return -1; if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL;
return dataRowKey(row); return dataRowKey(row);
} }
static FORCE_INLINE TKEY tsdbNextIterTKey(SSkipListIterator* pIter) {
SDataRow row = tsdbNextIterRow(pIter);
if (row == NULL) return TKEY_NULL;
return dataRowTKey(row);
}
static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
ASSERT(pRepo != NULL); ASSERT(pRepo != NULL);
if (pRepo->mem == NULL) return NULL; if (pRepo->mem == NULL) return NULL;
......
...@@ -512,6 +512,9 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { ...@@ -512,6 +512,9 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
} }
} }
// update check
if (pCfg->update != 0) pCfg->update = 1;
return 0; return 0;
_err: _err:
...@@ -762,7 +765,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY ...@@ -762,7 +765,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
return -1; return -1;
} }
if (tsdbInsertRowToMem(pRepo, row, pTable) < 0) return -1; if (tsdbUpdateRowInMem(pRepo, row, pTable) < 0) return -1;
(*affectedrows)++; (*affectedrows)++;
points++; points++;
...@@ -923,6 +926,7 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) { ...@@ -923,6 +926,7 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) {
tlen += taosEncodeVariantI32(buf, pCfg->maxRowsPerFileBlock); tlen += taosEncodeVariantI32(buf, pCfg->maxRowsPerFileBlock);
tlen += taosEncodeFixedI8(buf, pCfg->precision); tlen += taosEncodeFixedI8(buf, pCfg->precision);
tlen += taosEncodeFixedI8(buf, pCfg->compression); tlen += taosEncodeFixedI8(buf, pCfg->compression);
tlen += taosEncodeFixedI8(buf, pCfg->update);
return tlen; return tlen;
} }
...@@ -939,6 +943,7 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) { ...@@ -939,6 +943,7 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
buf = taosDecodeVariantI32(buf, &(pCfg->maxRowsPerFileBlock)); buf = taosDecodeVariantI32(buf, &(pCfg->maxRowsPerFileBlock));
buf = taosDecodeFixedI8(buf, &(pCfg->precision)); buf = taosDecodeFixedI8(buf, &(pCfg->precision));
buf = taosDecodeFixedI8(buf, &(pCfg->compression)); buf = taosDecodeFixedI8(buf, &(pCfg->compression));
buf = taosDecodeFixedI8(buf, &(pCfg->update));
return buf; return buf;
} }
......
...@@ -32,43 +32,40 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe ...@@ -32,43 +32,40 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row);
// ---------------- INTERNAL FUNCTIONS ---------------- // ---------------- INTERNAL FUNCTIONS ----------------
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { int tsdbUpdateRowInMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
STsdbCfg * pCfg = &pRepo->config; STsdbCfg * pCfg = &pRepo->config;
STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbMeta * pMeta = pRepo->tsdbMeta;
int32_t level = 0; TKEY tkey = dataRowTKey(row);
int32_t headSize = 0;
TSKEY key = dataRowKey(row); TSKEY key = dataRowKey(row);
SMemTable * pMemTable = pRepo->mem; SMemTable * pMemTable = pRepo->mem;
STableData *pTableData = NULL; STableData *pTableData = NULL;
SSkipList * pSList = NULL; bool isRowDelete = TKEY_IS_DELETED(tkey);
if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL && if (isRowDelete) {
pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) { if (!pCfg->update) {
pTableData = pMemTable->tData[TABLE_TID(pTable)]; tsdbWarn("vgId:%d vnode is not allowed to update but try to delete a data row", REPO_ID(pRepo));
pSList = pTableData->pData; terrno = TSDB_CODE_TDB_INVALID_ACTION;
} return -1;
}
tSkipListNewNodeInfo(pSList, &level, &headSize);
SSkipListNode *pNode = (SSkipListNode *)malloc(headSize + sizeof(SDataRow *)); if (key > TABLE_LASTKEY(pTable)) {
if (pNode == NULL) { tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64,
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; REPO_ID(pRepo), key, TABLE_LASTKEY(pTable));
return -1; return 0;
}
} }
void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row)); void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row));
if (pRow == NULL) { if (pRow == NULL) {
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s", tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno)); REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno));
free(pNode);
return -1; return -1;
} }
pNode->level = level;
dataRowCpy(pRow, row); dataRowCpy(pRow, row);
*(SDataRow *)SL_GET_NODE_DATA(pNode) = pRow;
// Operations above may change pRepo->mem, retake those values // Operations above may change pRepo->mem, retake those values
ASSERT(pRepo->mem != NULL); ASSERT(pRepo->mem != NULL);
...@@ -77,7 +74,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { ...@@ -77,7 +74,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
if (TABLE_TID(pTable) >= pMemTable->maxTables) { if (TABLE_TID(pTable) >= pMemTable->maxTables) {
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) { if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
tsdbFreeBytes(pRepo, pRow, dataRowLen(row)); tsdbFreeBytes(pRepo, pRow, dataRowLen(row));
free(pNode);
return -1; return -1;
} }
} }
...@@ -97,7 +93,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { ...@@ -97,7 +93,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
" to table %s while create new table data object since %s", " to table %s while create new table data object since %s",
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno)); REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno));
tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row)); tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
free(pNode);
return -1; return -1;
} }
...@@ -106,24 +101,31 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { ...@@ -106,24 +101,31 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable)); ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
if (tSkipListPut(pTableData->pData, pNode) == NULL) { int64_t oldSize = SL_SIZE(pTableData->pData);
if (tSkipListPut(pTableData->pData, pRow) == NULL) {
tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row)); tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
free(pNode);
} else { } else {
if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key; int64_t deltaSize = SL_SIZE(pTableData->pData) - oldSize;
if (isRowDelete) {
if (TABLE_LASTKEY(pTable) == key) {
// TODO: need to update table last key here (may from file)
}
} else {
if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key;
}
if (pMemTable->keyFirst > key) pMemTable->keyFirst = key; if (pMemTable->keyFirst > key) pMemTable->keyFirst = key;
if (pMemTable->keyLast < key) pMemTable->keyLast = key; if (pMemTable->keyLast < key) pMemTable->keyLast = key;
pMemTable->numOfRows++; pMemTable->numOfRows += deltaSize;
if (pTableData->keyFirst > key) pTableData->keyFirst = key; if (pTableData->keyFirst > key) pTableData->keyFirst = key;
if (pTableData->keyLast < key) pTableData->keyLast = key; if (pTableData->keyLast < key) pTableData->keyLast = key;
pTableData->numOfRows++; pTableData->numOfRows += deltaSize;
ASSERT(pTableData->numOfRows == tSkipListGetSize(pTableData->pData));
} }
tsdbTrace("vgId:%d a row is inserted to table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), key); isRowDelete ? "deleted from" : "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable),
key);
return 0; return 0;
} }
...@@ -295,63 +297,124 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { ...@@ -295,63 +297,124 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
return 0; return 0;
} }
/**
* This is an important function to load data or try to load data from memory skiplist iterator.
*
* This function load memory data until:
* 1. iterator ends
* 2. data key exceeds maxKey
* 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
* 4. operations in pCols not exceeds its max capacity if pCols is given
*
* The function tries to procceed AS MUSH AS POSSIBLE.
*/
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
TSKEY *filterKeys, int nFilterKeys) { TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) {
ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0); ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0 && pMergeInfo != NULL);
if (pIter == NULL) return 0; if (pIter == NULL) return 0;
STSchema *pSchema = NULL; STSchema *pSchema = NULL;
int numOfRows = 0; TSKEY rowKey = 0;
TSKEY keyNext = 0; TSKEY fKey = 0;
bool isRowDel = false;
int filterIter = 0; int filterIter = 0;
SDataRow row = NULL;
memset(pMergeInfo, 0, sizeof(*pMergeInfo));
pMergeInfo->keyFirst = INT64_MAX;
pMergeInfo->keyLast = INT64_MIN;
if (pCols) tdResetDataCols(pCols);
row = tsdbNextIterRow(pIter);
if (row == NULL || dataRowKey(row) > maxKey) {
rowKey = INT64_MAX;
isRowDel = false;
} else {
rowKey = dataRowKey(row);
isRowDel = dataRowDeleted(row);
}
if (nFilterKeys != 0) { // for filter purpose if (filterIter >= nFilterKeys) {
ASSERT(filterKeys != NULL); fKey = INT64_MAX;
keyNext = tsdbNextIterKey(pIter); } else {
if (keyNext < 0 || keyNext > maxKey) return numOfRows; fKey = tdGetKey(filterKeys[filterIter]);
void *ptr = taosbsearch((void *)(&keyNext), (void *)filterKeys, nFilterKeys, sizeof(TSKEY), compTSKEY, TD_GE); }
filterIter = (ptr == NULL) ? nFilterKeys : (int)((POINTER_DISTANCE(ptr, filterKeys) / sizeof(TSKEY)));
} while (true) {
if (fKey == INT64_MAX && rowKey == INT64_MAX) break;
do {
SDataRow row = tsdbNextIterRow(pIter); if (fKey < rowKey) {
if (row == NULL) break; pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, fKey);
pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, fKey);
keyNext = dataRowKey(row);
if (keyNext > maxKey) break; filterIter++;
if (filterIter >= nFilterKeys) {
bool keyFiltered = false; fKey = INT64_MAX;
if (nFilterKeys != 0) { } else {
while (true) { fKey = tdGetKey(filterKeys[filterIter]);
if (filterIter >= nFilterKeys) break; }
if (keyNext == filterKeys[filterIter]) { } else if (fKey > rowKey) {
keyFiltered = true; if (isRowDel) {
filterIter++; pMergeInfo->rowsDeleteFailed++;
break; } else {
} else if (keyNext < filterKeys[filterIter]) { if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break;
break; if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
pMergeInfo->rowsInserted++;
pMergeInfo->nOperations++;
pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, rowKey);
pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, rowKey);
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
}
tSkipListIterNext(pIter);
row = tsdbNextIterRow(pIter);
if (row == NULL || dataRowKey(row) > maxKey) {
rowKey = INT64_MAX;
isRowDel = false;
} else {
rowKey = dataRowKey(row);
isRowDel = dataRowDeleted(row);
}
} else {
if (isRowDel) {
ASSERT(!keepDup);
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
pMergeInfo->rowsDeleteSucceed++;
pMergeInfo->nOperations++;
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
} else {
if (keepDup) {
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
pMergeInfo->rowsUpdated++;
pMergeInfo->nOperations++;
pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, rowKey);
pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, rowKey);
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
} else { } else {
filterIter++; pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, fKey);
pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, fKey);
} }
} }
}
if (!keyFiltered) { tSkipListIterNext(pIter);
if (numOfRows >= maxRowsToRead) break; row = tsdbNextIterRow(pIter);
if (pCols) { if (row == NULL || dataRowKey(row) > maxKey) {
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { rowKey = INT64_MAX;
pSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row)); isRowDel = false;
if (pSchema == NULL) { } else {
ASSERT(0); rowKey = dataRowKey(row);
} isRowDel = dataRowDeleted(row);
} }
tdAppendDataRowToDataCol(row, pSchema, pCols); filterIter++;
if (filterIter >= nFilterKeys) {
fKey = INT64_MAX;
} else {
fKey = tdGetKey(filterKeys[filterIter]);
} }
numOfRows++;
} }
} while (tSkipListIterNext(pIter)); }
return numOfRows; return 0;
} }
// ---------------- LOCAL FUNCTIONS ---------------- // ---------------- LOCAL FUNCTIONS ----------------
...@@ -440,8 +503,9 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { ...@@ -440,8 +503,9 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
pTableData->keyLast = 0; pTableData->keyLast = 0;
pTableData->numOfRows = 0; pTableData->numOfRows = 0;
pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, pTableData->pData =
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 1, tsdbGetTsTupleKey); tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP],
tkeyComparFn, pCfg->update ? SL_UPDATE_DUP_KEY : SL_DISCARD_DUP_KEY, tsdbGetTsTupleKey);
if (pTableData->pData == NULL) { if (pTableData->pData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; goto _err;
...@@ -461,7 +525,7 @@ static void tsdbFreeTableData(STableData *pTableData) { ...@@ -461,7 +525,7 @@ static void tsdbFreeTableData(STableData *pTableData) {
} }
} }
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(*(SDataRow *)data); } static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple((SDataRow)data); }
static void *tsdbCommitData(void *arg) { static void *tsdbCommitData(void *arg) {
STsdbRepo * pRepo = (STsdbRepo *)arg; STsdbRepo * pRepo = (STsdbRepo *)arg;
...@@ -583,7 +647,7 @@ static void tsdbEndCommit(STsdbRepo *pRepo) { ...@@ -583,7 +647,7 @@ static void tsdbEndCommit(STsdbRepo *pRepo) {
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
for (int i = 0; i < nIters; i++) { for (int i = 0; i < nIters; i++) {
TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter); TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1; if (nextKey != TSDB_DATA_TIMESTAMP_NULL && (nextKey >= minKey && nextKey <= maxKey)) return 1;
} }
return 0; return 0;
} }
...@@ -779,5 +843,21 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) { ...@@ -779,5 +843,21 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
taosTFree(tData); taosTFree(tData);
return 0;
}
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row) {
if (pCols) {
if (*ppSchema == NULL || schemaVersion(*ppSchema) != dataRowVersion(row)) {
*ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row));
if (*ppSchema == NULL) {
ASSERT(false);
return -1;
}
}
tdAppendDataRowToDataCol(row, *ppSchema, pCols);
}
return 0; return 0;
} }
\ No newline at end of file
...@@ -86,7 +86,8 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { ...@@ -86,7 +86,8 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
if (pTable != NULL) { if (pTable != NULL) {
tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
TABLE_TID(pTable), TABLE_UID(pTable)); TABLE_TID(pTable), TABLE_UID(pTable));
return TSDB_CODE_TDB_TABLE_ALREADY_EXIST; terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
goto _err;
} }
if (pCfg->type == TSDB_CHILD_TABLE) { if (pCfg->type == TSDB_CHILD_TABLE) {
...@@ -643,7 +644,7 @@ static void tsdbOrgMeta(void *pHandle) { ...@@ -643,7 +644,7 @@ static void tsdbOrgMeta(void *pHandle) {
} }
static char *getTagIndexKey(const void *pData) { static char *getTagIndexKey(const void *pData) {
STable *pTable = *(STable **)pData; STable *pTable = (STable *)pData;
STSchema *pSchema = tsdbGetTableTagSchema(pTable); STSchema *pSchema = tsdbGetTableTagSchema(pTable);
STColumn *pCol = schemaColAt(pSchema, DEFAULT_TAG_INDEX_COLUMN); STColumn *pCol = schemaColAt(pSchema, DEFAULT_TAG_INDEX_COLUMN);
...@@ -700,7 +701,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper) { ...@@ -700,7 +701,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper) {
} }
pTable->tagVal = NULL; pTable->tagVal = NULL;
STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN); STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN);
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), 1, 0, 1, getTagIndexKey); pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL, SL_ALLOW_DUP_KEY, getTagIndexKey);
if (pTable->pIndex == NULL) { if (pTable->pIndex == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; goto _err;
...@@ -900,23 +901,8 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper ...@@ -900,23 +901,8 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper
pTable->pSuper = pSTable; pTable->pSuper = pSTable;
int32_t level = 0; tSkipListPut(pSTable->pIndex, (void *)pTable);
int32_t headSize = 0;
tSkipListNewNodeInfo(pSTable->pIndex, &level, &headSize);
// NOTE: do not allocate the space for key, since in each skip list node, only keep the pointer to pTable, not the
// actual key value, and the key value will be retrieved during query through the pTable and getTagIndexKey function
SSkipListNode *pNode = calloc(1, headSize + sizeof(STable *));
if (pNode == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
pNode->level = level;
memcpy(SL_GET_NODE_DATA(pNode), &pTable, sizeof(STable *));
tSkipListPut(pSTable->pIndex, pNode);
if (refSuper) T_REF_INC(pSTable); if (refSuper) T_REF_INC(pSTable);
return 0; return 0;
} }
...@@ -940,7 +926,7 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) { ...@@ -940,7 +926,7 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
SSkipListNode *pNode = taosArrayGetP(res, i); SSkipListNode *pNode = taosArrayGetP(res, i);
// STableIndexElem* pElem = (STableIndexElem*) SL_GET_NODE_DATA(pNode); // STableIndexElem* pElem = (STableIndexElem*) SL_GET_NODE_DATA(pNode);
if (*(STable **)SL_GET_NODE_DATA(pNode) == pTable) { // this is the exact what we need if ((STable *)SL_GET_NODE_DATA(pNode) == pTable) { // this is the exact what we need
tSkipListRemoveNode(pSTable->pIndex, pNode); tSkipListRemoveNode(pSTable->pIndex, pNode);
} }
} }
...@@ -1170,8 +1156,8 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) { ...@@ -1170,8 +1156,8 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
buf = tdDecodeSchema(buf, &(pTable->tagSchema)); buf = tdDecodeSchema(buf, &(pTable->tagSchema));
STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN); STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN);
pTable->pIndex = pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL,
tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), 1, 0, 1, getTagIndexKey); SL_ALLOW_DUP_KEY, getTagIndexKey);
if (pTable->pIndex == NULL) { if (pTable->pIndex == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbFreeTable(pTable); tsdbFreeTable(pTable);
...@@ -1197,7 +1183,7 @@ static int tsdbGetTableEncodeSize(int8_t act, STable *pTable) { ...@@ -1197,7 +1183,7 @@ static int tsdbGetTableEncodeSize(int8_t act, STable *pTable) {
tlen = sizeof(SListNode) + sizeof(SActObj) + sizeof(SActCont) + tsdbEncodeTable(NULL, pTable) + sizeof(TSCKSUM); tlen = sizeof(SListNode) + sizeof(SActObj) + sizeof(SActCont) + tsdbEncodeTable(NULL, pTable) + sizeof(TSCKSUM);
} else { } else {
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
tlen = (int)((sizeof(SListNode) + sizeof(SActObj)) * (tSkipListGetSize(pTable->pIndex) + 1)); tlen = (int)((sizeof(SListNode) + sizeof(SActObj)) * (SL_SIZE(pTable->pIndex) + 1));
} else { } else {
tlen = sizeof(SListNode) + sizeof(SActObj); tlen = sizeof(SListNode) + sizeof(SActObj);
} }
...@@ -1244,7 +1230,7 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable) { ...@@ -1244,7 +1230,7 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable) {
} }
while (tSkipListIterNext(pIter)) { while (tSkipListIterNext(pIter)) {
STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter)); STable *tTable = (STable *)SL_GET_NODE_DATA(tSkipListIterGet(pIter));
ASSERT(TABLE_TYPE(tTable) == TSDB_CHILD_TABLE); ASSERT(TABLE_TYPE(tTable) == TSDB_CHILD_TABLE);
pBuf = tsdbInsertTableAct(pRepo, TSDB_DROP_META, pBuf, tTable); pBuf = tsdbInsertTableAct(pRepo, TSDB_DROP_META, pBuf, tTable);
} }
...@@ -1269,7 +1255,7 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) { ...@@ -1269,7 +1255,7 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) {
tsdbWLockRepoMeta(pRepo); tsdbWLockRepoMeta(pRepo);
while (tSkipListIterNext(pIter)) { while (tSkipListIterNext(pIter)) {
STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter)); STable *tTable = (STable *)SL_GET_NODE_DATA(tSkipListIterGet(pIter));
tsdbRemoveTableFromMeta(pRepo, tTable, false, false); tsdbRemoveTableFromMeta(pRepo, tTable, false, false);
} }
......
此差异已折叠。
...@@ -457,7 +457,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh ...@@ -457,7 +457,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
assert(node != NULL); assert(node != NULL);
SDataRow row = *(SDataRow *)SL_GET_NODE_DATA(node); SDataRow row = (SDataRow)SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); // first timestamp in buffer TSKEY key = dataRowKey(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %p", "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %p",
...@@ -479,7 +479,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh ...@@ -479,7 +479,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
assert(node != NULL); assert(node != NULL);
SDataRow row = *(SDataRow *)SL_GET_NODE_DATA(node); SDataRow row = (SDataRow)SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); // first timestamp in buffer TSKEY key = dataRowKey(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %p", "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %p",
...@@ -504,19 +504,19 @@ static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) { ...@@ -504,19 +504,19 @@ static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) {
tSkipListDestroyIter(pCheckInfo->iiter); tSkipListDestroyIter(pCheckInfo->iiter);
} }
static SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order) { static SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update) {
SDataRow rmem = NULL, rimem = NULL; SDataRow rmem = NULL, rimem = NULL;
if (pCheckInfo->iter) { if (pCheckInfo->iter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
if (node != NULL) { if (node != NULL) {
rmem = *(SDataRow *)SL_GET_NODE_DATA(node); rmem = (SDataRow)SL_GET_NODE_DATA(node);
} }
} }
if (pCheckInfo->iiter) { if (pCheckInfo->iiter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
if (node != NULL) { if (node != NULL) {
rimem = *(SDataRow *)SL_GET_NODE_DATA(node); rimem = (SDataRow)SL_GET_NODE_DATA(node);
} }
} }
...@@ -538,9 +538,15 @@ static SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order ...@@ -538,9 +538,15 @@ static SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order
TSKEY r2 = dataRowKey(rimem); TSKEY r2 = dataRowKey(rimem);
if (r1 == r2) { // data ts are duplicated, ignore the data in mem if (r1 == r2) { // data ts are duplicated, ignore the data in mem
tSkipListIterNext(pCheckInfo->iter); if (!update) {
pCheckInfo->chosen = 1; tSkipListIterNext(pCheckInfo->iter);
return rimem; pCheckInfo->chosen = 1;
return rimem;
} else {
tSkipListIterNext(pCheckInfo->iiter);
pCheckInfo->chosen = 0;
return rmem;
}
} else { } else {
if (ASCENDING_TRAVERSE(order)) { if (ASCENDING_TRAVERSE(order)) {
if (r1 < r2) { if (r1 < r2) {
...@@ -594,6 +600,7 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) { ...@@ -594,6 +600,7 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
} }
static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
STsdbCfg *pCfg = &pHandle->pTsdb->config;
size_t size = taosArrayGetSize(pHandle->pTableCheckInfo); size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1); assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
pHandle->cur.fid = -1; pHandle->cur.fid = -1;
...@@ -607,7 +614,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { ...@@ -607,7 +614,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
initTableMemIterator(pHandle, pCheckInfo); initTableMemIterator(pHandle, pCheckInfo);
} }
SDataRow row = getSDataRowInTableMem(pCheckInfo, pHandle->order); SDataRow row = getSDataRowInTableMem(pCheckInfo, pHandle->order, pCfg->update);
if (row == NULL) { if (row == NULL) {
return false; return false;
} }
...@@ -827,11 +834,12 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl ...@@ -827,11 +834,12 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl
static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order); SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update);
assert(cur->pos >= 0 && cur->pos <= binfo.rows); assert(cur->pos >= 0 && cur->pos <= binfo.rows);
...@@ -1316,6 +1324,7 @@ int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBl ...@@ -1316,6 +1324,7 @@ int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBl
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) { static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
initTableMemIterator(pQueryHandle, pCheckInfo); initTableMemIterator(pQueryHandle, pCheckInfo);
...@@ -1353,7 +1362,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1353,7 +1362,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
SSkipListNode* node = NULL; SSkipListNode* node = NULL;
do { do {
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order); SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update);
if (row == NULL) { if (row == NULL) {
break; break;
} }
...@@ -1383,7 +1392,22 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1383,7 +1392,22 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
moveToNextRowInMem(pCheckInfo); if (pCfg->update) {
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable);
numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
}
cur->win.ekey = key;
cur->lastKey = key + step;
cur->mixBlock = true;
moveToNextRowInMem(pCheckInfo);
pos += step;
} else {
moveToNextRowInMem(pCheckInfo);
}
} else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { (key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
...@@ -1394,7 +1418,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1394,7 +1418,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
assert(end != -1); assert(end != -1);
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
moveToNextRowInMem(pCheckInfo); if (!pCfg->update) {
moveToNextRowInMem(pCheckInfo);
} else {
end -= step;
}
} }
int32_t qstart = 0, qend = 0; int32_t qstart = 0, qend = 0;
...@@ -1414,8 +1442,8 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1414,8 +1442,8 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
* copy them all to result buffer, since it may be overlapped with file data block. * copy them all to result buffer, since it may be overlapped with file data block.
*/ */
if (node == NULL || if (node == NULL ||
((dataRowKey(*(SDataRow *)SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || ((dataRowKey((SDataRow)SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((dataRowKey(*(SDataRow *)SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) { ((dataRowKey((SDataRow)SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// no data in cache or data in cache is greater than the ekey of time window, load data from file block // no data in cache or data in cache is greater than the ekey of time window, load data from file block
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos]; cur->win.skey = tsArray[pos];
...@@ -1862,13 +1890,14 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int ...@@ -1862,13 +1890,14 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
STsdbQueryHandle* pQueryHandle) { STsdbQueryHandle* pQueryHandle) {
int numOfRows = 0; int numOfRows = 0;
int32_t numOfCols = (int32_t)taosArrayGetSize(pQueryHandle->pColumns); int32_t numOfCols = (int32_t)taosArrayGetSize(pQueryHandle->pColumns);
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
win->skey = TSKEY_INITIAL_VAL; win->skey = TSKEY_INITIAL_VAL;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
STable* pTable = pCheckInfo->pTableObj; STable* pTable = pCheckInfo->pTableObj;
do { do {
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order); SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update);
if (row == NULL) { if (row == NULL) {
break; break;
} }
...@@ -1919,9 +1948,9 @@ static int32_t getAllTableList(STable* pSuperTable, SArray* list) { ...@@ -1919,9 +1948,9 @@ static int32_t getAllTableList(STable* pSuperTable, SArray* list) {
while (tSkipListIterNext(iter)) { while (tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter); SSkipListNode* pNode = tSkipListIterGet(iter);
STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode); STable* pTable = (STable*) SL_GET_NODE_DATA((SSkipListNode*) pNode);
STableKeyInfo info = {.pTable = *pTable, .lastKey = TSKEY_INITIAL_VAL}; STableKeyInfo info = {.pTable = pTable, .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(list, &info); taosArrayPush(list, &info);
} }
...@@ -2464,7 +2493,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC ...@@ -2464,7 +2493,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
static bool indexedNodeFilterFp(const void* pNode, void* param) { static bool indexedNodeFilterFp(const void* pNode, void* param) {
tQueryInfo* pInfo = (tQueryInfo*) param; tQueryInfo* pInfo = (tQueryInfo*) param;
STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode)); STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
char* val = NULL; char* val = NULL;
......
...@@ -8,7 +8,7 @@ TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4 z) ...@@ -8,7 +8,7 @@ TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4 z)
IF (TD_LINUX) IF (TD_LINUX)
TARGET_LINK_LIBRARIES(tutil m rt) TARGET_LINK_LIBRARIES(tutil m rt)
ADD_SUBDIRECTORY(tests) # ADD_SUBDIRECTORY(tests)
FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/) FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/)
IF (ICONV_INCLUDE_EXIST) IF (ICONV_INCLUDE_EXIST)
......
此差异已折叠。
此差异已折叠。
...@@ -247,7 +247,7 @@ void skiplistPerformanceTest() { ...@@ -247,7 +247,7 @@ void skiplistPerformanceTest() {
printf("total:%" PRIu64 " ms, avg:%f\n", e - s, (e - s) / (double)size); printf("total:%" PRIu64 " ms, avg:%f\n", e - s, (e - s) / (double)size);
printf("max level of skiplist:%d, actually level:%d\n ", pSkipList->maxLevel, pSkipList->level); printf("max level of skiplist:%d, actually level:%d\n ", pSkipList->maxLevel, pSkipList->level);
assert(tSkipListGetSize(pSkipList) == size); assert(SL_GET_SIZE(pSkipList) == size);
// printf("the level of skiplist is:\n"); // printf("the level of skiplist is:\n");
// //
...@@ -273,7 +273,7 @@ void skiplistPerformanceTest() { ...@@ -273,7 +273,7 @@ void skiplistPerformanceTest() {
int64_t et = taosGetTimestampMs(); int64_t et = taosGetTimestampMs();
printf("delete %d data from skiplist, elapased time:%" PRIu64 "ms\n", 10000, et - st); printf("delete %d data from skiplist, elapased time:%" PRIu64 "ms\n", 10000, et - st);
assert(tSkipListGetSize(pSkipList) == size); assert(SL_GET_SIZE(pSkipList) == size);
tSkipListDestroy(pSkipList); tSkipListDestroy(pSkipList);
taosTFree(total); taosTFree(total);
......
...@@ -140,6 +140,7 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) { ...@@ -140,6 +140,7 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) {
tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock; tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock;
tsdbCfg.precision = pVnodeCfg->cfg.precision; tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.compression = pVnodeCfg->cfg.compression; tsdbCfg.compression = pVnodeCfg->cfg.compression;
tsdbCfg.update = pVnodeCfg->cfg.update;
char tsdbDir[TSDB_FILENAME_LEN] = {0}; char tsdbDir[TSDB_FILENAME_LEN] = {0};
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
......
# -*- coding: utf-8 -*-
import random
import string
import subprocess
import sys
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdLog.debug("check database")
tdSql.prepare()
# check default update value
sql = "create database if not exists db"
tdSql.execute(sql)
tdSql.query('show databases')
tdSql.checkRows(1)
tdSql.checkData(0,16,0)
sql = "alter database db update 1"
# check update value
tdSql.execute(sql)
tdSql.query('show databases')
tdSql.checkRows(1)
tdSql.checkData(0,16,1)
sql = "alter database db update 0"
tdSql.execute(sql)
tdSql.query('show databases')
tdSql.checkRows(1)
tdSql.checkData(0,16,0)
sql = "alter database db update -1"
tdSql.error(sql)
sql = "alter database db update 100"
tdSql.error(sql)
tdSql.query('show databases')
tdSql.checkRows(1)
tdSql.checkData(0,16,0)
tdSql.execute('drop database db')
tdSql.error('create database db update 100')
tdSql.error('create database db update -1')
tdSql.execute('create database db update 1')
tdSql.query('show databases')
tdSql.checkRows(1)
tdSql.checkData(0,16,1)
tdSql.execute('drop database db')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
# update
python3 ./test.py -f update/allow_update.py
python3 ./test.py -f update/allow_update-0.py
python3 ./test.py -f update/append_commit_data.py
python3 ./test.py -f update/append_commit_last-0.py
python3 ./test.py -f update/append_commit_last.py
python3 ./test.py -f update/merge_commit_data.py
python3 ./test.py -f update/merge_commit_data2.py
python3 ./test.py -f update/merge_commit_last-0.py
python3 ./test.py -f update/merge_commit_last.py
\ No newline at end of file
...@@ -81,7 +81,7 @@ print =============== step2 - no db ...@@ -81,7 +81,7 @@ print =============== step2 - no db
#11 #11
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'show databases' 127.0.0.1:7111/rest/sql system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'show databases' 127.0.0.1:7111/rest/sql
print 11-> $system_content print 11-> $system_content
if $system_content != @{"status":"succ","head":["name","created_time","ntables","vgroups","replica","quorum","days","keep1,keep2,keep(D)","cache(MB)","blocks","minrows","maxrows","wallevel","fsync","comp","precision","status"],"data":[],"rows":0}@ then if $system_content != @{"status":"succ","head":["name","created_time","ntables","vgroups","replica","quorum","days","keep1,keep2,keep(D)","cache(MB)","blocks","minrows","maxrows","wallevel","fsync","comp","precision","update","status"],"data":[],"rows":0}@ then
return -1 return -1
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册