提交 ad57b7c7 编写于 作者: S shenglian zhou

Merge remote-tracking branch 'origin/develop' into szhou/hotfix/sml-tiny-batch

......@@ -156,7 +156,9 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra
static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen,
SSmlLinesInfo* info) {
tscDebug("SML:0x%"PRIx64" taos_sml_insert get child table name through md5", info->id);
if (point->tagNum) {
qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);
}
SStringBuilder sb; memset(&sb, 0, sizeof(sb));
char sTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
......@@ -189,6 +191,17 @@ static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableNa
return 0;
}
static int32_t buildSmlChildTableName(TAOS_SML_DATA_POINT* point, SSmlLinesInfo* info) {
tscDebug("SML:0x%"PRIx64" taos_sml_insert build child table name", info->id);
char childTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE];
int32_t tableNameLen = TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE;
getSmlMd5ChildTableName(point, childTableName, &tableNameLen, info);
point->childTableName = calloc(1, tableNameLen+1);
strncpy(point->childTableName, childTableName, tableNameLen);
point->childTableName[tableNameLen] = '\0';
return 0;
}
static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas, SSmlLinesInfo* info) {
int32_t code = 0;
SHashObj* sname2shema = taosHashInit(32,
......@@ -220,12 +233,7 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
for (int j = 0; j < point->tagNum; ++j) {
TAOS_SML_KV* tagKv = point->tags + j;
if (!point->childTableName) {
char childTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE];
int32_t tableNameLen = TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE;
getSmlMd5ChildTableName(point, childTableName, &tableNameLen, info);
point->childTableName = calloc(1, tableNameLen+1);
strncpy(point->childTableName, childTableName, tableNameLen);
point->childTableName[tableNameLen] = '\0';
buildSmlChildTableName(point, info);
}
code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags, info);
......@@ -235,6 +243,27 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
}
}
//for Line Protocol tags may be omitted, add a tag with NULL value
if (point->tagNum == 0) {
if (!point->childTableName) {
buildSmlChildTableName(point, info);
}
char tagNullName[TSDB_COL_NAME_LEN] = {0};
size_t nameLen = strlen(tsSmlTagNullName);
strncpy(tagNullName, tsSmlTagNullName, nameLen);
addEscapeCharToString(tagNullName, (int32_t)nameLen);
size_t* pTagNullIdx = taosHashGet(pStableSchema->tagHash, tagNullName, nameLen + TS_ESCAPE_CHAR_SIZE);
if (!pTagNullIdx) {
SSchema tagNull = {0};
tagNull.type = TSDB_DATA_TYPE_NCHAR;
tagNull.bytes = TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
strncpy(tagNull.name, tagNullName, nameLen + TS_ESCAPE_CHAR_SIZE);
taosArrayPush(pStableSchema->tags, &tagNull);
size_t tagNullIdx = taosArrayGetSize(pStableSchema->tags) - 1;
taosHashPut(pStableSchema->tagHash, tagNull.name, nameLen + TS_ESCAPE_CHAR_SIZE, &tagNullIdx, sizeof(tagNullIdx));
}
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* fieldKv = point->fields + j;
code = buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields, info);
......
......@@ -237,6 +237,7 @@ extern int8_t tsDeadLockKillQuery;
// schemaless
extern char tsDefaultJSONStrType[];
extern char tsSmlChildTableName[];
extern char tsSmlTagNullName[];
typedef struct {
......
......@@ -291,7 +291,11 @@ int8_t tsDeadLockKillQuery = 0;
// default JSON string type
char tsDefaultJSONStrType[7] = "nchar";
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; //user defined child table name can be specified in tag value. If set to empty system will generate table name using MD5 hash.
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; //user defined child table name can be specified in tag value.
//If set to empty system will generate table name using MD5 hash.
char tsSmlTagNullName[TSDB_COL_NAME_LEN] = "_tag_null"; //for line protocol if tag is omitted, add a tag with NULL value
//to make sure inserted records belongs to the same measurement
//default name is _tag_null and can be user configurable
int32_t (*monStartSystemFp)() = NULL;
void (*monStopSystemFp)() = NULL;
......@@ -1701,6 +1705,17 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// name for a NULL value tag added for Line Protocol when tag fields are omitted
cfg.option = "smlTagNullName";
cfg.ptr = tsSmlTagNullName;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = tListLen(tsSmlTagNullName);
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// flush vnode wal file if walSize > walFlushSize and walSize > cache*0.5*blocks
cfg.option = "walFlushSize";
cfg.ptr = &tsdbWalFlushSize;
......
Subproject commit 25f8683ece07897fea12c347d369602b2235665f
Subproject commit b8f76da4a708d158ec3cc4b844571dc4414e36b4
......@@ -51,7 +51,7 @@ conn.close()
import taos
conn = taos.connect()
conn.exec("create database if not exists pytest")
conn.execute("create database if not exists pytest")
result = conn.query("show databases")
num_of_fields = result.field_count
......@@ -60,7 +60,7 @@ for field in result.fields:
for row in result:
print(row)
result.close()
conn.exec("drop database pytest")
conn.execute("drop database pytest")
conn.close()
```
......@@ -136,11 +136,11 @@ from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec(
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
......@@ -196,11 +196,11 @@ from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec(
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
......@@ -249,12 +249,12 @@ import taos
conn = taos.connect()
dbname = "pytest_taos_subscribe_callback"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
conn.execute("create table if not exists log(ts timestamp, n int)")
for i in range(10):
conn.exec("insert into log values(now, %d)" % i)
conn.execute("insert into log values(now, %d)" % i)
sub = conn.subscribe(True, "test", "select * from log", 1000)
print("# consume from begin")
......@@ -263,14 +263,14 @@ for ts, n in sub.consume():
print("# consume new data")
for i in range(5):
conn.exec("insert into log values(now, %d)(now+1s, %d)" % (i, i))
conn.execute("insert into log values(now, %d)(now+1s, %d)" % (i, i))
result = sub.consume()
for ts, n in result:
print(ts, n)
print("# consume with a stop condition")
for i in range(10):
conn.exec("insert into log values(now, %d)" % int(random() * 10))
conn.execute("insert into log values(now, %d)" % int(random() * 10))
result = sub.consume()
try:
ts, n = next(result)
......@@ -284,7 +284,7 @@ for i in range(10):
sub.close()
conn.exec("drop database if exists %s" % dbname)
conn.execute("drop database if exists %s" % dbname)
conn.close()
```
......@@ -311,23 +311,23 @@ def test_subscribe_callback(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback"
try:
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
conn.execute("create table if not exists log(ts timestamp, n int)")
print("# subscribe with callback")
sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback)
for i in range(10):
conn.exec("insert into log values(now, %d)" % i)
conn.execute("insert into log values(now, %d)" % i)
time.sleep(0.7)
sub.close()
conn.exec("drop database if exists %s" % dbname)
conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.exec("drop database if exists %s" % dbname)
conn.execute("drop database if exists %s" % dbname)
conn.close()
raise err
......@@ -374,10 +374,10 @@ def test_stream(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stream"
try:
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
conn.execute("create table if not exists log(ts timestamp, n int)")
result = conn.query("select count(*) from log interval(5s)")
assert result.field_count == 2
......@@ -386,13 +386,13 @@ def test_stream(conn):
stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter))
for _ in range(0, 20):
conn.exec("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
conn.execute("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
time.sleep(2)
stream.close()
conn.exec("drop database if exists %s" % dbname)
conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.exec("drop database if exists %s" % dbname)
conn.execute("drop database if exists %s" % dbname)
conn.close()
raise err
......@@ -408,8 +408,8 @@ import taos
conn = taos.connect()
dbname = "pytest_line"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s precision 'us'" % dbname)
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s precision 'us'" % dbname)
conn.select_db(dbname)
lines = [
......@@ -431,7 +431,7 @@ for row in result:
result.close()
conn.exec("drop database if exists %s" % dbname)
conn.execute("drop database if exists %s" % dbname)
conn.close()
```
......
......@@ -129,7 +129,7 @@ class TDTestCase:
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb0_3")
tdSql.checkData(1, 1, "BINARY")
tdSql.checkData(1, 1, "NCHAR")
payload = ['''
{
......@@ -835,7 +835,7 @@ class TDTestCase:
code = self._conn.schemaless_insert(payload, TDSmlProtocolType.JSON.value, TDSmlTimestampType.NOT_CONFIGURED.value)
print("schemaless_insert result {}".format(code))
tdSql.query("describe `stable`")
tdSql.query("describe `STABLE`")
tdSql.checkRows(9)
#tdSql.query("select * from `key`")
......
......@@ -333,7 +333,7 @@ class TDTestCase:
tdSql.query('describe `!@#$.%^&*()`')
tdSql.checkRows(9)
tdSql.query('describe `stable`')
tdSql.query('describe `STABLE`')
tdSql.checkRows(9)
#tdSql.query('select * from `123`')
......
......@@ -86,6 +86,67 @@ class TDTestCase:
#tdSql.query('select tbname, * from childtable')
#tdSql.checkRows(1)
###Test when tag is omitted
lines3 = [ "sti c1=4i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000",
"sti c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000"
]
code = self._conn.schemaless_insert(lines3, TDSmlProtocolType.LINE.value, TDSmlTimestampType.NANO_SECOND.value)
print("schemaless_insert result {}".format(code))
tdSql.query('select * from sti')
tdSql.checkRows(2)
tdSql.query('select tbname from sti')
tdSql.checkRows(1)
lines4 = [ "stp c1=4i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000",
"stp c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000"
]
code = self._conn.schemaless_insert([ lines4[0] ], TDSmlProtocolType.LINE.value, TDSmlTimestampType.NANO_SECOND.value)
print("schemaless_insert result {}".format(code))
code = self._conn.schemaless_insert([ lines4[1] ], TDSmlProtocolType.LINE.value, TDSmlTimestampType.NANO_SECOND.value)
print("schemaless_insert result {}".format(code))
tdSql.query('select * from stp')
tdSql.checkRows(2)
tdSql.query('select tbname from stp')
tdSql.checkRows(1)
lines5 = [ "stq c1=4i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000",
"stq,t1=abc c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000",
"stq,t2=abc c1=3i64,c3=L\"passitagin\",c4=5f64,c5=5f64,c6=true 1626006833640000000"
]
code = self._conn.schemaless_insert([ lines5[0] ], TDSmlProtocolType.LINE.value, TDSmlTimestampType.NANO_SECOND.value)
print("schemaless_insert result {}".format(code))
code = self._conn.schemaless_insert([ lines5[1] ], TDSmlProtocolType.LINE.value, TDSmlTimestampType.NANO_SECOND.value)
print("schemaless_insert result {}".format(code))
code = self._conn.schemaless_insert([ lines5[2] ], TDSmlProtocolType.LINE.value, TDSmlTimestampType.NANO_SECOND.value)
print("schemaless_insert result {}".format(code))
tdSql.query('select * from stq')
tdSql.checkRows(3)
tdSql.query('select tbname from stq')
tdSql.checkRows(3)
lines6 = [ "str c1=4i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000",
"str,t1=abc c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000",
"str,t2=abc c1=3i64,c3=L\"passitagin\",c4=5f64,c5=5f64,c6=true 1626006833640000000"
]
code = self._conn.schemaless_insert(lines6, TDSmlProtocolType.LINE.value, TDSmlTimestampType.NANO_SECOND.value)
print("schemaless_insert result {}".format(code))
tdSql.query('select * from str')
tdSql.checkRows(3)
tdSql.query('select tbname from str')
tdSql.checkRows(3)
###Special Character and keyss
self._conn.schemaless_insert([
"1234,id=3456,abc=4i64,def=3i64 123=3i64,int=2i64,bool=false,into=5f64,column=7u64,!@#$.%^&*()=false 1626006933641",
......@@ -112,7 +173,7 @@ class TDTestCase:
tdSql.query('describe `!@#$.%^&*()`')
tdSql.checkRows(9)
tdSql.query('describe `stable`')
tdSql.query('describe `STABLE`')
tdSql.checkRows(9)
#tdSql.query('select * from `3456`')
......
......@@ -30,35 +30,50 @@ sql create dnode $hostname2
system sh/exec.sh -n dnode2 -s start
sql create dnode $hostname3
system sh/exec.sh -n dnode3 -s start
sleep 5000
sleep 3000
$x = 0
show1:
$x = $x + 1
sleep 1000
if $x == 30 then
return -1
endi
sql show dnodes
print dnode1 $data5_1
print dnode1 $data5_2
print dnode1 $data5_3
print dnode2 $data5_2
print dnode3 $data5_3
if $data5_1 != mnode then
return -1
goto show1
endi
if $data5_2 != vnode then
return -1
goto show1
endi
if $data5_3 != any then
return -1
goto show1
endi
show2:
$x = $x + 1
sleep 1000
if $x == 30 then
return -1
endi
sql show mnodes
print dnode1 ==> $data2_1
print dnode2 ==> $data2_2
print dnode3 ==> $data2_3
if $data2_1 != master then
return -1
goto show2
endi
if $data2_2 != null then
return -1
goto show2
endi
if $data2_3 != slave then
return -1
goto show2
endi
print ========== step2
......@@ -72,26 +87,28 @@ sql create table d1.t6 (ts timestamp, i int)
sql create table d1.t7 (ts timestamp, i int)
sql create table d1.t8 (ts timestamp, i int)
show3:
$x = $x + 1
sleep 1000
if $x == 30 then
return -1
endi
sql show dnodes
print dnode1 $data2_1
print dnode2 $data2_2
print dnode3 $data2_3
if $data2_1 != 0 then
return -1
goto show3
endi
if $data2_2 != 1 then
return -1
goto show3
endi
if $data2_3 != 1 then
return -1
goto show3
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode4 -s stop -x SIGINT
system sh/exec.sh -n dnode5 -s stop -x SIGINT
system sh/exec.sh -n dnode6 -s stop -x SIGINT
system sh/exec.sh -n dnode7 -s stop -x SIGINT
system sh/exec.sh -n dnode8 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册