提交 7edcbfd5 编写于 作者: S Shengliang Guan

Merge branch 'fix/stopFunc' into feature/3_liaohj

......@@ -75,10 +75,10 @@ database_option: {
- TABLE_PREFIX:The prefix length in the table name that is ignored when distributing table to vnode based on table name.
- TABLE_SUFFIX:The suffix length in the table name that is ignored when distributing table to vnode based on table name.
- TSDB_PAGESIZE: The page size of the data storage engine in a vnode. The unit is KB. The default is 4 KB. The range is 1 to 16384, that is, 1 KB to 16 MB.
- WAL_RETENTION_PERIOD: specifies the time after which WAL files are deleted. This parameter is used for data subscription. Enter a time in seconds. The default value of single copy is 0. A value of 0 indicates that each WAL file is deleted immediately after its contents are written to disk. -1: WAL files are never deleted. The default value of multiple copy is 4 days.
- WAL_RETENTION_SIZE: specifies the size at which WAL files are deleted. This parameter is used for data subscription. Enter a size in KB. The default value of single copy is 0. A value of 0 indicates that each WAL file is deleted immediately after its contents are written to disk. -1: WAL files are never deleted. The default value of multiple copy is -1.
- WAL_ROLL_PERIOD: specifies the time after which WAL files are rotated. After this period elapses, a new WAL file is created. The default value of single copy is 0. A value of 0 indicates that a new WAL file is created only after the previous WAL file was written to disk. The default values of multiple copy is 1 day.
- WAL_SEGMENT_SIZE: specifies the maximum size of a WAL file. After the current WAL file reaches this size, a new WAL file is created. The default value is 0. A value of 0 indicates that a new WAL file is created only after the previous WAL file was written to disk.
- WAL_RETENTION_PERIOD: specifies the maximum time of which WAL files are to be kept after consumption. This parameter is used for data subscription. Enter a time in seconds. The default value 0. A value of 0 indicates that WAL files are not required to keep after consumption. -1: the time of WAL files to keep has no upper limit.
- WAL_RETENTION_SIZE: specifies the maximum total size of which WAL files are to be kept after consumption. This parameter is used for data subscription. Enter a size in KB. The default value is 0. A value of 0 indicates that WAL files are not required to keep after consumption. -1: the total size of WAL files to keep has no upper limit.
- WAL_ROLL_PERIOD: specifies the time after which WAL files are rotated. After this period elapses, a new WAL file is created. The default value is 0. A value of 0 indicates that a new WAL file is created only after TSDB data in memory are flushed to disk.
- WAL_SEGMENT_SIZE: specifies the maximum size of a WAL file. After the current WAL file reaches this size, a new WAL file is created. The default value is 0. A value of 0 indicates that a new WAL file is created only after TSDB data in memory are flushed to disk.
### Example Statement
......
......@@ -75,10 +75,10 @@ database_option: {
- TABLE_PREFIX:内部存储引擎根据表名分配存储该表数据的 VNODE 时要忽略的前缀的长度。
- TABLE_SUFFIX:内部存储引擎根据表名分配存储该表数据的 VNODE 时要忽略的后缀的长度。
- TSDB_PAGESIZE:一个 VNODE 中时序数据存储引擎的页大小,单位为 KB,默认为 4 KB。范围为 1 到 16384,即 1 KB到 16 MB。
- WAL_RETENTION_PERIOD:wal 文件的额外保留策略,用于数据订阅。wal 的保存时长,单位为 s。单副本默认为 0,即落盘后立即删除。-1 表示不删除。多副本默认为 4 天
- WAL_RETENTION_SIZE:wal 文件的额外保留策略,用于数据订阅。wal 的保存的最大上限,单位为 KB。单副本默认为 0,即落盘后立即删除。多副本默认为-1,表示不删除
- WAL_ROLL_PERIOD:wal 文件切换时长,单位为 s。当 wal 文件创建并写入后,经过该时间,会自动创建一个新的 wal 文件。单副本默认为 0,即仅在落盘时创建新文件。多副本默认为 1 天
- WAL_SEGMENT_SIZE:wal 单个文件大小,单位为 KB。当前写入文件大小超过上限后会自动创建一个新的 wal 文件。默认为 0,即仅在落盘时创建新文件。
- WAL_RETENTION_PERIOD:数据订阅已消费WAL日志,WAL文件的最大额外保留的时长策略。单位为 s。默认为 0,表示无需额外保留。-1, 表示额外保留,时间无上限
- WAL_RETENTION_SIZE:数据订阅已消费WAL日志,WAL文件的最大额外保留的累计大小策略。单位为 KB。默认为 0,表示无需额外保留。-1, 表示额外保留,累计大小无上限
- WAL_ROLL_PERIOD:wal 文件切换时长,单位为 s。当WAL文件创建并写入后,经过该时间,会自动创建一个新的WAL文件。默认为 0,即仅在TSDB落盘时创建新文件
- WAL_SEGMENT_SIZE:wal 单个文件大小,单位为 KB。当前写入文件大小超过上限后会自动创建一个新的WAL文件。默认为 0,即仅在TSDB落盘时创建新文件。
### 创建数据库示例
......
......@@ -76,6 +76,7 @@ extern int64_t tsVndCommitMaxIntervalMs;
// mnode
extern int64_t tsMndSdbWriteDelta;
extern int64_t tsMndLogRetention;
// monitor
extern bool tsEnableMonitor;
......
......@@ -41,14 +41,15 @@ extern "C" {
#define SHOW_LOCAL_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
#define SHOW_ALIVE_RESULT_COLS 1
#define PRIVILEGE_TYPE_MASK(n) (1 << n)
#define PRIVILEGE_TYPE_ALL PRIVILEGE_TYPE_MASK(0)
#define PRIVILEGE_TYPE_READ PRIVILEGE_TYPE_MASK(1)
#define PRIVILEGE_TYPE_WRITE PRIVILEGE_TYPE_MASK(2)
#define PRIVILEGE_TYPE_SUBSCRIBE PRIVILEGE_TYPE_MASK(3)
#define BIT_FLAG_MASK(n) (1 << n)
#define BIT_FLAG_SET_MASK(val, mask) ((val) |= (mask))
#define BIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
#define PRIVILEGE_TYPE_TEST_MASK(val, mask) (((val) & (mask)) != 0)
#define PRIVILEGE_TYPE_ALL BIT_FLAG_MASK(0)
#define PRIVILEGE_TYPE_READ BIT_FLAG_MASK(1)
#define PRIVILEGE_TYPE_WRITE BIT_FLAG_MASK(2)
#define PRIVILEGE_TYPE_SUBSCRIBE BIT_FLAG_MASK(3)
typedef struct SDatabaseOptions {
ENodeType type;
......@@ -393,6 +394,15 @@ typedef struct SKillQueryStmt {
char queryId[TSDB_QUERY_ID_LEN];
} SKillQueryStmt;
typedef enum EStreamOptionsSetFlag {
SOPT_TRIGGER_TYPE_SET = BIT_FLAG_MASK(0),
SOPT_WATERMARK_SET = BIT_FLAG_MASK(1),
SOPT_DELETE_MARK_SET = BIT_FLAG_MASK(2),
SOPT_FILL_HISTORY_SET = BIT_FLAG_MASK(3),
SOPT_IGNORE_EXPIRED_SET = BIT_FLAG_MASK(4),
SOPT_IGNORE_UPDATE_SET = BIT_FLAG_MASK(5),
} EStreamOptionsSetFlag;
typedef struct SStreamOptions {
ENodeType type;
int8_t triggerType;
......@@ -402,6 +412,7 @@ typedef struct SStreamOptions {
int8_t fillHistory;
int8_t ignoreExpired;
int8_t ignoreUpdate;
int64_t setFlag;
} SStreamOptions;
typedef struct SCreateStreamStmt {
......
......@@ -35,7 +35,6 @@ extern "C" {
#define SYNC_MAX_RECV_TIME_RANGE_MS 1200
#define SYNC_DEL_WAL_MS (1000 * 60)
#define SYNC_ADD_QUORUM_COUNT 3
#define SYNC_MNODE_LOG_RETENTION 10000
#define SYNC_VNODE_LOG_RETENTION (TSDB_SYNC_LOG_BUFFER_RETENTION + 1)
#define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10
#define SNAPSHOT_WAIT_MS 1000 * 30
......
......@@ -190,7 +190,7 @@ typedef struct {
//
SArray *preLineTagKV;
SArray *maxTagKVs;
SArray *preLineColKV;
SArray *masColKVs;
SSmlLineInfo preLine;
STableMeta *currSTableMeta;
......
......@@ -1073,8 +1073,6 @@ void smlDestroyInfo(SSmlHandle *info) {
taosArrayDestroy(info->valueJsonArray);
taosArrayDestroy(info->preLineTagKV);
taosArrayDestroy(info->maxTagKVs);
taosArrayDestroy(info->preLineColKV);
if (!info->dataFormat) {
for (int i = 0; i < info->lineNum; i++) {
......@@ -1117,8 +1115,6 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
info->tagJsonArray = taosArrayInit(8, POINTER_BYTES);
info->valueJsonArray = taosArrayInit(8, POINTER_BYTES);
info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
info->maxTagKVs = taosArrayInit(8, sizeof(SSmlKv));
info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv));
if (NULL == info->pVgHash || NULL == info->childTables || NULL == info->superTables) {
uError("create SSmlHandle failed");
......@@ -1141,6 +1137,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
if(terrno == TSDB_CODE_DUP_KEY){return terrno;}
}
taosArrayPush(colsArray, &kvHash);
......@@ -1204,6 +1201,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
if(terrno == TSDB_CODE_DUP_KEY){return terrno;}
smlInsertMeta(meta->colHash, meta->cols, elements->colArray);
taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
}
......
......@@ -683,13 +683,10 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo
int cnt = 0;
SArray *preLineKV = info->preLineTagKV;
SArray *maxKVs = info->maxTagKVs;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if (info->dataFormat) {
if (unlikely(!isSameMeasure)) {
SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
SSmlSTableMeta *sMeta = NULL;
SSmlSTableMeta *sMeta = NULL;
if (unlikely(tmp == NULL)) {
STableMeta *pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
if (pTableMeta == NULL) {
......@@ -700,18 +697,16 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo
sMeta = smlBuildSTableMeta(info->dataFormat);
sMeta->tableMeta = pTableMeta;
taosHashPut(info->superTables, elements->measure, elements->measureLen, &sMeta, POINTER_BYTES);
for(int i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++){
SSchema *tag = pTableMeta->schema + i;
SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type, .length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE };
taosArrayPush(sMeta->tags, &kv);
}
tmp = &sMeta;
}
info->currSTableMeta = (*tmp)->tableMeta;
superKV = (*tmp)->tags;
if (unlikely(taosArrayGetSize(superKV) == 0)) {
isSuperKVInit = false;
}
taosArrayClear(maxKVs);
info->maxTagKVs = (*tmp)->tags;
}
} else {
taosArrayClear(maxKVs);
}
taosArrayClear(preLineKV);
......@@ -747,58 +742,21 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo
return TSDB_CODE_SUCCESS;
}
if (isSameMeasure) {
if (unlikely(cnt >= taosArrayGetSize(maxKVs))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(maxKVs, cnt);
if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length;
SSmlSTableMeta **tableMeta =
(SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
if (unlikely(NULL == tableMeta)) {
uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id);
return TSDB_CODE_SML_INTERNAL_ERROR;
}
SSmlKv *oldKV = (SSmlKv *)taosArrayGet((*tableMeta)->tags, cnt);
oldKV->length = kv.length;
info->needModifySchema = true;
}
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
} else {
if (isSuperKVInit) {
if (unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(superKV, cnt);
if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length;
} else {
kv.length = maxKV->length;
}
info->needModifySchema = true;
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
} else {
taosArrayPush(superKV, &kv);
}
taosArrayPush(maxKVs, &kv);
if (unlikely(cnt >= taosArrayGetSize(info->maxTagKVs))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxTagKVs, cnt);
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length;
info->needModifySchema = true;
}
} else {
taosArrayPush(maxKVs, &kv);
}
taosArrayPush(preLineKV, &kv);
cnt++;
......@@ -1214,7 +1172,7 @@ static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo *
return TSDB_CODE_INVALID_TIMESTAMP;
} else if (elements->timestamp[0] == '{') {
char tmp = elements->timestamp[elements->timestampLen];
elements->cols[elements->timestampLen] = '\0';
elements->timestamp[elements->timestampLen] = '\0';
cJSON *tsJson = cJSON_Parse(elements->timestamp);
ts = smlParseTSFromJSON(info, tsJson);
if (unlikely(ts < 0)) {
......
......@@ -144,13 +144,11 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
int cnt = 0;
SArray *preLineKV = info->preLineTagKV;
SArray *maxKVs = info->maxTagKVs;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if (info->dataFormat) {
if (unlikely(!isSameMeasure)) {
SSmlSTableMeta **tmp =
(SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
SSmlSTableMeta *sMeta = NULL;
if (unlikely(tmp == NULL)) {
STableMeta *pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
......@@ -162,18 +160,16 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
sMeta = smlBuildSTableMeta(info->dataFormat);
sMeta->tableMeta = pTableMeta;
taosHashPut(info->superTables, currElement->measure, currElement->measureLen, &sMeta, POINTER_BYTES);
for(int i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++){
SSchema *tag = pTableMeta->schema + i;
SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type, .length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE };
taosArrayPush(sMeta->tags, &kv);
}
tmp = &sMeta;
}
info->currSTableMeta = (*tmp)->tableMeta;
superKV = (*tmp)->tags;
if (unlikely(taosArrayGetSize(superKV) == 0)) {
isSuperKVInit = false;
}
taosArrayClear(maxKVs);
info->maxTagKVs = (*tmp)->tags;
}
} else {
taosArrayClear(maxKVs);
}
taosArrayClear(preLineKV);
......@@ -252,58 +248,23 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
return TSDB_CODE_SUCCESS;
}
if (isSameMeasure) {
if (unlikely(cnt >= taosArrayGetSize(maxKVs))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(maxKVs, cnt);
if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length;
SSmlSTableMeta **tableMeta =
(SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
if (unlikely(NULL == tableMeta)) {
uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id);
return TSDB_CODE_SML_INTERNAL_ERROR;
}
if (unlikely(cnt >= taosArrayGetSize(info->maxTagKVs))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxTagKVs, cnt);
SSmlKv *oldKV = (SSmlKv *)taosArrayGet((*tableMeta)->tags, cnt);
oldKV->length = kv.length;
info->needModifySchema = true;
}
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
} else {
if (isSuperKVInit) {
if (unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(superKV, cnt);
if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length;
} else {
kv.length = maxKV->length;
}
info->needModifySchema = true;
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
} else {
taosArrayPush(superKV, &kv);
}
taosArrayPush(maxKVs, &kv);
if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length;
info->needModifySchema = true;
}
} else {
taosArrayPush(maxKVs, &kv);
}
taosArrayPush(preLineKV, &kv);
......@@ -344,9 +305,6 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *currElement, bool isSameMeasure,
bool isSameCTable) {
int cnt = 0;
SArray *preLineKV = info->preLineColKV;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if (info->dataFormat) {
if (unlikely(!isSameCTable)) {
SSmlTableInfo **oneTable =
......@@ -361,7 +319,6 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
if (unlikely(!isSameMeasure)) {
SSmlSTableMeta **tmp =
(SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
SSmlSTableMeta *sMeta = NULL;
if (unlikely(tmp == NULL)) {
STableMeta *pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
if (pTableMeta == NULL) {
......@@ -369,17 +326,23 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
sMeta = smlBuildSTableMeta(info->dataFormat);
sMeta->tableMeta = pTableMeta;
taosHashPut(info->superTables, currElement->measure, currElement->measureLen, &sMeta, POINTER_BYTES);
tmp = &sMeta;
*tmp = smlBuildSTableMeta(info->dataFormat);
(*tmp)->tableMeta = pTableMeta;
taosHashPut(info->superTables, currElement->measure, currElement->measureLen, tmp, POINTER_BYTES);
for(int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++){
SSchema *tag = pTableMeta->schema + i;
SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type };
if(tag->type == TSDB_DATA_TYPE_NCHAR){
kv.length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
}else if(tag->type == TSDB_DATA_TYPE_BINARY){
kv.length = tag->bytes - VARSTR_HEADER_SIZE;
}
taosArrayPush((*tmp)->cols, &kv);
}
}
info->currSTableMeta = (*tmp)->tableMeta;
superKV = (*tmp)->cols;
if (unlikely(taosArrayGetSize(superKV) == 0)) {
isSuperKVInit = false;
}
taosArrayClear(preLineKV);
info->masColKVs = (*tmp)->cols;
}
}
......@@ -478,69 +441,26 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if (cnt >= taosArrayGetSize(info->masColKVs)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->masColKVs, cnt);
if (kv.type != maxKV->type) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if (isSameMeasure) {
if (cnt >= taosArrayGetSize(preLineKV)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(preLineKV, cnt);
if (kv.type != maxKV->type) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if (unlikely(IS_VAR_DATA_TYPE(kv.type) && kv.length > maxKV->length)) {
maxKV->length = kv.length;
SSmlSTableMeta **tableMeta =
(SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
if (unlikely(NULL == tableMeta)) {
uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id);
return TSDB_CODE_SML_INTERNAL_ERROR;
}
SSmlKv *oldKV = (SSmlKv *)taosArrayGet((*tableMeta)->cols, cnt);
oldKV->length = kv.length;
info->needModifySchema = true;
}
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
} else {
if (isSuperKVInit) {
if (unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(superKV, cnt);
if (unlikely(kv.type != maxKV->type)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if (IS_VAR_DATA_TYPE(kv.type)) {
if (kv.length > maxKV->length) {
maxKV->length = kv.length;
} else {
kv.length = maxKV->length;
}
info->needModifySchema = true;
}
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
} else {
taosArrayPush(superKV, &kv);
}
taosArrayPush(preLineKV, &kv);
if (unlikely(IS_VAR_DATA_TYPE(kv.type) && kv.length > maxKV->length)) {
maxKV->length = kv.length;
info->needModifySchema = true;
}
} else {
if (currElement->colArray == NULL) {
......
......@@ -79,13 +79,10 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
int cnt = 0;
SArray *preLineKV = info->preLineTagKV;
SArray *maxKVs = info->maxTagKVs;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if (info->dataFormat) {
if (!isSameMeasure) {
SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
SSmlSTableMeta *sMeta = NULL;
SSmlSTableMeta *sMeta = NULL;
if (unlikely(tmp == NULL)) {
STableMeta *pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
if (pTableMeta == NULL) {
......@@ -96,18 +93,16 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
sMeta = smlBuildSTableMeta(info->dataFormat);
sMeta->tableMeta = pTableMeta;
taosHashPut(info->superTables, elements->measure, elements->measureLen, &sMeta, POINTER_BYTES);
for(int i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++){
SSchema *tag = pTableMeta->schema + i;
SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type, .length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE };
taosArrayPush(sMeta->tags, &kv);
}
tmp = &sMeta;
}
info->currSTableMeta = (*tmp)->tableMeta;
superKV = (*tmp)->tags;
if (unlikely(taosArrayGetSize(superKV) == 0)) {
isSuperKVInit = false;
}
taosArrayClear(maxKVs);
info->maxTagKVs = (*tmp)->tags;
}
} else {
taosArrayClear(maxKVs);
}
taosArrayClear(preLineKV);
......@@ -175,59 +170,21 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if (isSameMeasure) {
if (unlikely(cnt >= taosArrayGetSize(maxKVs))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(maxKVs, cnt);
if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length;
SSmlSTableMeta **tableMeta =
(SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
if (unlikely(NULL == tableMeta)) {
uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id);
return TSDB_CODE_SML_INTERNAL_ERROR;
}
SSmlKv *oldKV = (SSmlKv *)taosArrayGet((*tableMeta)->tags, cnt);
oldKV->length = kv.length;
info->needModifySchema = true;
}
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
} else {
if (isSuperKVInit) {
if (unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(superKV, cnt);
if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length;
} else {
kv.length = maxKV->length;
}
info->needModifySchema = true;
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
} else {
taosArrayPush(superKV, &kv);
}
taosArrayPush(maxKVs, &kv);
if (unlikely(cnt >= taosArrayGetSize(info->maxTagKVs))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxTagKVs, cnt);
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length;
info->needModifySchema = true;
}
} else {
taosArrayPush(maxKVs, &kv);
}
taosArrayPush(preLineKV, &kv);
cnt++;
......
......@@ -66,7 +66,8 @@ int32_t tsHeartbeatTimeout = 20 * 1000;
int64_t tsVndCommitMaxIntervalMs = 600 * 1000;
// mnode
int64_t tsMndSdbWriteDelta = 2000;
int64_t tsMndSdbWriteDelta = 200;
int64_t tsMndLogRetention = 2000;
// monitor
bool tsEnableMonitor = true;
......@@ -461,6 +462,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt64(pCfg, "vndCommitMaxInterval", tsVndCommitMaxIntervalMs, 1000, 1000 * 60 * 60, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "mndLogRetention", tsMndLogRetention, 500, 10000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1;
......@@ -808,6 +810,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsVndCommitMaxIntervalMs = cfgGetItem(pCfg, "vndCommitMaxInterval")->i64;
tsMndSdbWriteDelta = cfgGetItem(pCfg, "mndSdbWriteDelta")->i64;
tsMndLogRetention = cfgGetItem(pCfg, "mndLogRetention")->i64;
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs));
......@@ -1246,6 +1249,7 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
taosSetAllDebugFlag(cfgGetItem(pCfg, "debugFlag")->i32, false);
if (taosMulModeMkDir(tsLogDir, 0777) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create dir:%s since %s", tsLogDir, terrstr());
cfgCleanup(pCfg);
return -1;
......
......@@ -292,7 +292,7 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
code = vnodePreProcessWriteMsg(pVnode, pMsg);
if (code != 0) {
vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, tstrerror(code));
if (terrno != 0) code = terrno;
vnodeHandleProposeError(pVnode, pMsg, code);
rpcFreeCont(pMsg->pCont);
......
......@@ -215,6 +215,8 @@ SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool
const SToken* pLibPath, SDataType dataType, int32_t bufSize);
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pFuncName);
SNode* createStreamOptions(SAstCreateContext* pCxt);
SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptionsSetFlag setflag, SToken* pToken,
SNode* pNode);
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable,
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols);
SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName);
......
......@@ -562,14 +562,14 @@ tag_def_or_ref_opt(A) ::= tags_def(B).
tag_def_or_ref_opt(A) ::= TAGS NK_LP col_name_list(B) NK_RP. { A = B; }
stream_options(A) ::= . { A = createStreamOptions(pCxt); }
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_AT_ONCE; A = B; }
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; }
stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C). { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_MAX_DELAY; ((SStreamOptions*)B)->pDelay = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { ((SStreamOptions*)B)->fillHistory = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) DELETE_MARK duration_literal(C). { ((SStreamOptions*)B)->pDeleteMark = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreUpdate = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); }
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); }
stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY(C) duration_literal(D). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, releaseRawExprNode(pCxt, D)); }
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { A = setStreamOptions(pCxt, B, SOPT_WATERMARK_SET, NULL, releaseRawExprNode(pCxt, C)); }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_IGNORE_EXPIRED_SET, &C, NULL); }
stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_FILL_HISTORY_SET, &C, NULL); }
stream_options(A) ::= stream_options(B) DELETE_MARK duration_literal(C). { A = setStreamOptions(pCxt, B, SOPT_DELETE_MARK_SET, NULL, releaseRawExprNode(pCxt, C)); }
stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_IGNORE_UPDATE_SET, &C, NULL); }
subtable_opt(A) ::= . { A = NULL; }
subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }
......
......@@ -1817,6 +1817,59 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) {
return (SNode*)pOptions;
}
static int8_t getTriggerType(uint32_t tokenType) {
switch (tokenType) {
case TK_AT_ONCE:
return STREAM_TRIGGER_AT_ONCE;
case TK_WINDOW_CLOSE:
return STREAM_TRIGGER_WINDOW_CLOSE;
case TK_MAX_DELAY:
return STREAM_TRIGGER_MAX_DELAY;
default:
break;
}
return STREAM_TRIGGER_WINDOW_CLOSE;
}
SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptionsSetFlag setflag, SToken* pToken,
SNode* pNode) {
SStreamOptions* pStreamOptions = (SStreamOptions*)pOptions;
if (BIT_FLAG_TEST_MASK(setflag, pStreamOptions->setFlag)) {
pCxt->errCode =
generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "stream options each item is only set once");
return pOptions;
}
switch (setflag) {
case SOPT_TRIGGER_TYPE_SET:
pStreamOptions->triggerType = getTriggerType(pToken->type);
if (STREAM_TRIGGER_MAX_DELAY == pStreamOptions->triggerType) {
pStreamOptions->pDelay = pNode;
}
break;
case SOPT_WATERMARK_SET:
pStreamOptions->pWatermark = pNode;
break;
case SOPT_DELETE_MARK_SET:
pStreamOptions->pDeleteMark = pNode;
break;
case SOPT_FILL_HISTORY_SET:
pStreamOptions->fillHistory = taosStr2Int8(pToken->z, NULL, 10);
break;
case SOPT_IGNORE_EXPIRED_SET:
pStreamOptions->ignoreExpired = taosStr2Int8(pToken->z, NULL, 10);
break;
case SOPT_IGNORE_UPDATE_SET:
pStreamOptions->ignoreUpdate = taosStr2Int8(pToken->z, NULL, 10);
break;
default:
break;
}
BIT_FLAG_SET_MASK(pStreamOptions->setFlag, setflag);
return pOptions;
}
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable,
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols) {
CHECK_PARSER_STATUS(pCxt);
......
......@@ -6384,15 +6384,15 @@ static int32_t translateDropFunction(STranslateContext* pCxt, SDropFunctionStmt*
static int32_t translateGrant(STranslateContext* pCxt, SGrantStmt* pStmt) {
SAlterUserReq req = {0};
if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) ||
(PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) &&
PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) {
if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) ||
(BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) &&
BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) {
req.alterType = TSDB_ALTER_USER_ADD_ALL_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) {
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) {
req.alterType = TSDB_ALTER_USER_ADD_READ_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) {
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) {
req.alterType = TSDB_ALTER_USER_ADD_WRITE_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) {
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) {
req.alterType = TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC;
}
strcpy(req.user, pStmt->userName);
......@@ -6402,15 +6402,15 @@ static int32_t translateGrant(STranslateContext* pCxt, SGrantStmt* pStmt) {
static int32_t translateRevoke(STranslateContext* pCxt, SRevokeStmt* pStmt) {
SAlterUserReq req = {0};
if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) ||
(PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) &&
PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) {
if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) ||
(BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) &&
BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) {
req.alterType = TSDB_ALTER_USER_REMOVE_ALL_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) {
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) {
req.alterType = TSDB_ALTER_USER_REMOVE_READ_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) {
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) {
req.alterType = TSDB_ALTER_USER_REMOVE_WRITE_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) {
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) {
req.alterType = TSDB_ALTER_USER_REMOVE_SUBSCRIBE_TOPIC;
}
strcpy(req.user, pStmt->userName);
......@@ -6482,11 +6482,11 @@ static int32_t translateShowCreateDatabase(STranslateContext* pCxt, SShowCreateD
if (NULL == pStmt->pCfg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SName name;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
tNameGetFullDbName(&name, pStmt->dbFName);
return getDBCfg(pCxt, pStmt->dbName, (SDbCfgInfo*)pStmt->pCfg);
}
......
......@@ -4608,7 +4608,6 @@ static YYACTIONTYPE yy_reduce(
{ yymsp[1].minor.yy42 = createStreamOptions(pCxt); }
break;
case 279: /* sma_stream_opt ::= sma_stream_opt WATERMARK duration_literal */
case 316: /* stream_options ::= stream_options WATERMARK duration_literal */ yytestcase(yyruleno==316);
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->pWatermark = releaseRawExprNode(pCxt, yymsp[0].minor.yy42); yylhsminor.yy42 = yymsp[-2].minor.yy42; }
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
......@@ -4617,7 +4616,6 @@ static YYACTIONTYPE yy_reduce(
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
case 281: /* sma_stream_opt ::= sma_stream_opt DELETE_MARK duration_literal */
case 319: /* stream_options ::= stream_options DELETE_MARK duration_literal */ yytestcase(yyruleno==319);
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->pDeleteMark = releaseRawExprNode(pCxt, yymsp[0].minor.yy42); yylhsminor.yy42 = yymsp[-2].minor.yy42; }
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
......@@ -4677,27 +4675,32 @@ static YYACTIONTYPE yy_reduce(
{ pCxt->pRootNode = createDropStreamStmt(pCxt, yymsp[-1].minor.yy103, &yymsp[0].minor.yy225); }
break;
case 313: /* stream_options ::= stream_options TRIGGER AT_ONCE */
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->triggerType = STREAM_TRIGGER_AT_ONCE; yylhsminor.yy42 = yymsp[-2].minor.yy42; }
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
case 314: /* stream_options ::= stream_options TRIGGER WINDOW_CLOSE */
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; yylhsminor.yy42 = yymsp[-2].minor.yy42; }
case 314: /* stream_options ::= stream_options TRIGGER WINDOW_CLOSE */ yytestcase(yyruleno==314);
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_TRIGGER_TYPE_SET, &yymsp[0].minor.yy0, NULL); }
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
case 315: /* stream_options ::= stream_options TRIGGER MAX_DELAY duration_literal */
{ ((SStreamOptions*)yymsp[-3].minor.yy42)->triggerType = STREAM_TRIGGER_MAX_DELAY; ((SStreamOptions*)yymsp[-3].minor.yy42)->pDelay = releaseRawExprNode(pCxt, yymsp[0].minor.yy42); yylhsminor.yy42 = yymsp[-3].minor.yy42; }
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-3].minor.yy42, SOPT_TRIGGER_TYPE_SET, &yymsp[-1].minor.yy0, releaseRawExprNode(pCxt, yymsp[0].minor.yy42)); }
yymsp[-3].minor.yy42 = yylhsminor.yy42;
break;
case 316: /* stream_options ::= stream_options WATERMARK duration_literal */
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_WATERMARK_SET, NULL, releaseRawExprNode(pCxt, yymsp[0].minor.yy42)); }
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
case 317: /* stream_options ::= stream_options IGNORE EXPIRED NK_INTEGER */
{ ((SStreamOptions*)yymsp[-3].minor.yy42)->ignoreExpired = taosStr2Int8(yymsp[0].minor.yy0.z, NULL, 10); yylhsminor.yy42 = yymsp[-3].minor.yy42; }
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-3].minor.yy42, SOPT_IGNORE_EXPIRED_SET, &yymsp[0].minor.yy0, NULL); }
yymsp[-3].minor.yy42 = yylhsminor.yy42;
break;
case 318: /* stream_options ::= stream_options FILL_HISTORY NK_INTEGER */
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->fillHistory = taosStr2Int8(yymsp[0].minor.yy0.z, NULL, 10); yylhsminor.yy42 = yymsp[-2].minor.yy42; }
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_FILL_HISTORY_SET, &yymsp[0].minor.yy0, NULL); }
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
case 319: /* stream_options ::= stream_options DELETE_MARK duration_literal */
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_DELETE_MARK_SET, NULL, releaseRawExprNode(pCxt, yymsp[0].minor.yy42)); }
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
case 320: /* stream_options ::= stream_options IGNORE UPDATE NK_INTEGER */
{ ((SStreamOptions*)yymsp[-3].minor.yy42)->ignoreUpdate = taosStr2Int8(yymsp[0].minor.yy0.z, NULL, 10); yylhsminor.yy42 = yymsp[-3].minor.yy42; }
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-3].minor.yy42, SOPT_IGNORE_UPDATE_SET, &yymsp[0].minor.yy0, NULL); }
yymsp[-3].minor.yy42 = yylhsminor.yy42;
break;
case 322: /* subtable_opt ::= SUBTABLE NK_LP expression NK_RP */
......
......@@ -285,7 +285,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
if (syncNodeIsMnode(pSyncNode)) {
// mnode
logRetention = SYNC_MNODE_LOG_RETENTION;
logRetention = tsMndLogRetention;
} else {
// vnode
if (pSyncNode->replicaNum > 1) {
......
......@@ -881,6 +881,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
conn->broken = true;
if (conn->list != NULL) {
SConnList* connList = conn->list;
connList->list->numOfConn--;
......
......@@ -137,6 +137,8 @@ int smlProcess_json1_Test() {
for (int i = 0; i < 1; i++) {
taosMemoryFree(sql1[i]);
}
ASSERT(code == 0);
const char *sql2[] = {
"[{\"metric\":\"sys.cpu.nice\",\"timestamp\":1662344041,\"value\":13,\"tags\":{\"host\":\"web01\",\"dc\":\"lga\"}"
......@@ -164,6 +166,34 @@ int smlProcess_json1_Test() {
taosMemoryFree(sql3[i]);
}
ASSERT(code == 0);
// TD-22903
const char *sql4[] = {
"[{\"metric\": \"test_us\", \"timestamp\": {\"value\": 1626006833639, \"type\": \"ms\"}, \"value\": true, \"tags\": {\"t0\": true}}, {\"metric\": \"test_us\", \"timestamp\": {\"value\": 1626006833638, \"type\": \"ms\"}, \"value\": false, \"tags\": {\"t0\": true}}]"
};
char *sql5[1] = {0};
for (int i = 0; i < 1; i++) {
sql5[i] = taosMemoryCalloc(1, 1024);
strncpy(sql5[i], sql4[i], 1023);
}
pRes = taos_schemaless_insert(taos, (char **)sql5, sizeof(sql5) / sizeof(sql5[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
code = taos_errno(pRes);
if (code != 0) {
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
} else {
printf("%s result:success\n", __FUNCTION__);
}
taos_free_result(pRes);
for (int i = 0; i < 1; i++) {
taosMemoryFree(sql5[i]);
}
ASSERT(code == 0);
taos_close(taos);
return code;
......@@ -927,6 +957,71 @@ int sml_ts2164_Test() {
return code;
}
int sml_td22898_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes =
taos_query(taos, "CREATE DATABASE IF NOT EXISTS line_test BUFFER 384 MINROWS 1000 PAGES 256 PRECISION 'ns'");
taos_free_result(pRes);
const char *sql[] = {
"svlzxdfutx,id=nyavpjyfas,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=t,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64 1626006833639"
};
pRes = taos_query(taos, "use line_test");
taos_free_result(pRes);
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
int code = taos_errno(pRes);
taos_free_result(pRes);
const char *sql1[] = {
"svlzxdfutx,id=nyavpjyfas,t0=True,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"tgqkvsws\",t8=L\"ncharTagValue\" c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"htvnnldm\",c8=L\"ncharColValue\",c9=7u64 1626006833639"
};
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
code = taos_errno(pRes);
taos_free_result(pRes);
pRes = taos_query(taos, "select * from svlzxdfutx");
taos_free_result(pRes);
taos_close(taos);
return code;
}
int sml_td22900_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes =
taos_query(taos, "CREATE DATABASE IF NOT EXISTS line_test BUFFER 384 MINROWS 1000 PAGES 256 PRECISION 'ns'");
taos_free_result(pRes);
const char *sql[] = {
"qddkgilwfu,id=qddkgilwfu_42383_49198,t0=t,t1=127i8 c4=9223372036854775807i64,c6=11.12345f32,c6=22.123456789f64 1626006833639"
};
pRes = taos_query(taos, "use line_test");
taos_free_result(pRes);
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
int code = taos_errno(pRes);
taos_free_result(pRes);
taos_close(taos);
return code;
}
int sml_ttl_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
......@@ -1106,6 +1201,10 @@ int main(int argc, char *argv[]) {
ASSERT(!ret);
ret = sml_ts2164_Test();
ASSERT(!ret);
ret = sml_td22898_Test();
ASSERT(!ret);
ret = sml_td22900_Test();
ASSERT(ret);
ret = smlProcess_influx_Test();
ASSERT(!ret);
ret = smlProcess_telnet_Test();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册