未验证 提交 12a787f9 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #8807 from taosdata/enhance/TD-10870

[TD-10870]<enhance>: [schemaless]add a NULL tag to accommodate influxDB tag is optional
......@@ -152,7 +152,9 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra
static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen,
SSmlLinesInfo* info) {
tscDebug("SML:0x%"PRIx64" taos_sml_insert get child table name through md5", info->id);
qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);
if (point->tagNum) {
qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);
}
SStringBuilder sb; memset(&sb, 0, sizeof(sb));
char sTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
......@@ -185,6 +187,18 @@ static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableNa
return 0;
}
static int32_t buildSmlChildTableName(TAOS_SML_DATA_POINT* point, SSmlLinesInfo* info) {
tscDebug("SML:0x%"PRIx64" taos_sml_insert build child table name", info->id);
char childTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE];
int32_t tableNameLen = TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE;
getSmlMd5ChildTableName(point, childTableName, &tableNameLen, info);
point->childTableName = calloc(1, tableNameLen+1);
strncpy(point->childTableName, childTableName, tableNameLen);
point->childTableName[tableNameLen] = '\0';
return 0;
}
static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas, SSmlLinesInfo* info) {
int32_t code = 0;
SHashObj* sname2shema = taosHashInit(32,
......@@ -216,12 +230,7 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
for (int j = 0; j < point->tagNum; ++j) {
TAOS_SML_KV* tagKv = point->tags + j;
if (!point->childTableName) {
char childTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE];
int32_t tableNameLen = TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE;
getSmlMd5ChildTableName(point, childTableName, &tableNameLen, info);
point->childTableName = calloc(1, tableNameLen+1);
strncpy(point->childTableName, childTableName, tableNameLen);
point->childTableName[tableNameLen] = '\0';
buildSmlChildTableName(point, info);
}
code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags, info);
......@@ -231,6 +240,27 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
}
}
//for Line Protocol tags may be omitted, add a tag with NULL value
if (point->tagNum == 0) {
if (!point->childTableName) {
buildSmlChildTableName(point, info);
}
char tagNullName[TSDB_COL_NAME_LEN] = {0};
size_t nameLen = strlen(tsSmlTagNullName);
strncpy(tagNullName, tsSmlTagNullName, nameLen);
addEscapeCharToString(tagNullName, (int32_t)nameLen);
size_t* pTagNullIdx = taosHashGet(pStableSchema->tagHash, tagNullName, nameLen + TS_ESCAPE_CHAR_SIZE);
if (!pTagNullIdx) {
SSchema tagNull = {0};
tagNull.type = TSDB_DATA_TYPE_NCHAR;
tagNull.bytes = TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
strncpy(tagNull.name, tagNullName, nameLen + TS_ESCAPE_CHAR_SIZE);
taosArrayPush(pStableSchema->tags, &tagNull);
size_t tagNullIdx = taosArrayGetSize(pStableSchema->tags) - 1;
taosHashPut(pStableSchema->tagHash, tagNull.name, nameLen + TS_ESCAPE_CHAR_SIZE, &tagNullIdx, sizeof(tagNullIdx));
}
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* fieldKv = point->fields + j;
code = buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields, info);
......@@ -951,7 +981,7 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam
tagKVs[kv->fieldSchemaIdx] = kv;
}
}
SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
taosArraySetSize(tagBinds, numTags);
int isNullColBind = TSDB_TRUE;
......
......@@ -237,6 +237,7 @@ extern int8_t tsDeadLockKillQuery;
// schemaless
extern char tsDefaultJSONStrType[];
extern char tsSmlChildTableName[];
extern char tsSmlTagNullName[];
typedef struct {
......
......@@ -291,7 +291,11 @@ int8_t tsDeadLockKillQuery = 0;
// default JSON string type
char tsDefaultJSONStrType[7] = "nchar";
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; //user defined child table name can be specified in tag value. If set to empty system will generate table name using MD5 hash.
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; //user defined child table name can be specified in tag value.
//If set to empty system will generate table name using MD5 hash.
char tsSmlTagNullName[TSDB_COL_NAME_LEN] = "_tag_null"; //for line protocol if tag is omitted, add a tag with NULL value
//to make sure inserted records belongs to the same measurement
//default name is _tag_null and can be user configurable
int32_t (*monStartSystemFp)() = NULL;
void (*monStopSystemFp)() = NULL;
......@@ -1701,6 +1705,17 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// name for a NULL value tag added for Line Protocol when tag fields are omitted
cfg.option = "smlTagNullName";
cfg.ptr = tsSmlTagNullName;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = tListLen(tsSmlTagNullName);
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// flush vnode wal file if walSize > walFlushSize and walSize > cache*0.5*blocks
cfg.option = "walFlushSize";
cfg.ptr = &tsdbWalFlushSize;
......
......@@ -129,7 +129,7 @@ class TDTestCase:
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb0_3")
tdSql.checkData(1, 1, "BINARY")
tdSql.checkData(1, 1, "NCHAR")
payload = ['''
{
......@@ -835,7 +835,7 @@ class TDTestCase:
code = self._conn.schemaless_insert(payload, TDSmlProtocolType.JSON.value, TDSmlTimestampType.NOT_CONFIGURED.value)
print("schemaless_insert result {}".format(code))
tdSql.query("describe `stable`")
tdSql.query("describe `STABLE`")
tdSql.checkRows(9)
#tdSql.query("select * from `key`")
......
......@@ -333,7 +333,7 @@ class TDTestCase:
tdSql.query('describe `!@#$.%^&*()`')
tdSql.checkRows(9)
tdSql.query('describe `stable`')
tdSql.query('describe `STABLE`')
tdSql.checkRows(9)
#tdSql.query('select * from `123`')
......
......@@ -86,6 +86,67 @@ class TDTestCase:
#tdSql.query('select tbname, * from childtable')
#tdSql.checkRows(1)
###Test when tag is omitted
lines3 = [ "sti c1=4i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000",
"sti c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000"
]
code = self._conn.schemaless_insert(lines3, TDSmlProtocolType.LINE.value, TDSmlTimestampType.NANO_SECOND.value)
print("schemaless_insert result {}".format(code))
tdSql.query('select * from sti')
tdSql.checkRows(2)
tdSql.query('select tbname from sti')
tdSql.checkRows(1)
lines4 = [ "stp c1=4i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000",
"stp c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000"
]
code = self._conn.schemaless_insert([ lines4[0] ], TDSmlProtocolType.LINE.value, TDSmlTimestampType.NANO_SECOND.value)
print("schemaless_insert result {}".format(code))
code = self._conn.schemaless_insert([ lines4[1] ], TDSmlProtocolType.LINE.value, TDSmlTimestampType.NANO_SECOND.value)
print("schemaless_insert result {}".format(code))
tdSql.query('select * from stp')
tdSql.checkRows(2)
tdSql.query('select tbname from stp')
tdSql.checkRows(1)
lines5 = [ "stq c1=4i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000",
"stq,t1=abc c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000",
"stq,t2=abc c1=3i64,c3=L\"passitagin\",c4=5f64,c5=5f64,c6=true 1626006833640000000"
]
code = self._conn.schemaless_insert([ lines5[0] ], TDSmlProtocolType.LINE.value, TDSmlTimestampType.NANO_SECOND.value)
print("schemaless_insert result {}".format(code))
code = self._conn.schemaless_insert([ lines5[1] ], TDSmlProtocolType.LINE.value, TDSmlTimestampType.NANO_SECOND.value)
print("schemaless_insert result {}".format(code))
code = self._conn.schemaless_insert([ lines5[2] ], TDSmlProtocolType.LINE.value, TDSmlTimestampType.NANO_SECOND.value)
print("schemaless_insert result {}".format(code))
tdSql.query('select * from stq')
tdSql.checkRows(3)
tdSql.query('select tbname from stq')
tdSql.checkRows(3)
lines6 = [ "str c1=4i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000",
"str,t1=abc c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000",
"str,t2=abc c1=3i64,c3=L\"passitagin\",c4=5f64,c5=5f64,c6=true 1626006833640000000"
]
code = self._conn.schemaless_insert(lines6, TDSmlProtocolType.LINE.value, TDSmlTimestampType.NANO_SECOND.value)
print("schemaless_insert result {}".format(code))
tdSql.query('select * from str')
tdSql.checkRows(3)
tdSql.query('select tbname from str')
tdSql.checkRows(3)
###Special Character and keyss
self._conn.schemaless_insert([
"1234,id=3456,abc=4i64,def=3i64 123=3i64,int=2i64,bool=false,into=5f64,column=7u64,!@#$.%^&*()=false 1626006933641",
......@@ -112,7 +173,7 @@ class TDTestCase:
tdSql.query('describe `!@#$.%^&*()`')
tdSql.checkRows(9)
tdSql.query('describe `stable`')
tdSql.query('describe `STABLE`')
tdSql.checkRows(9)
#tdSql.query('select * from `3456`')
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册