未验证 提交 e5f3835e 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #22229 from taosdata/fix/TS-3724

fix:modify dot to underline in schemaless supertable name
......@@ -169,6 +169,8 @@ extern char tsUdfdLdLibPath[];
// schemaless
extern char tsSmlChildTableName[];
extern char tsSmlTagName[];
extern bool tsSmlDot2Underline;
extern char tsSmlTsDefaultName[];
// extern bool tsSmlDataFormat;
// extern int32_t tsSmlBatchSize;
......
......@@ -64,8 +64,8 @@ extern "C" {
#define IS_INVALID_COL_LEN(len) ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN)
#define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN)
#define TS "_ts"
#define TS_LEN 3
//#define TS "_ts"
//#define TS_LEN 3
#define VALUE "_value"
#define VALUE_LEN 6
......@@ -258,6 +258,7 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
int32_t smlParseJSON(SSmlHandle *info, char *payload);
void smlStrReplace(char* src, int32_t len);
#ifdef __cplusplus
}
#endif
......
......@@ -104,7 +104,7 @@ static int32_t smlCheckAuth(SSmlHandle *info, SRequestConnInfo* conn, const cha
SUserAuthRes authRes = {0};
code = catalogChkAuth(info->pCatalog, conn, &pAuth, &authRes);
nodesDestroyNode(authRes.pCond);
return (code == TSDB_CODE_SUCCESS) ? (authRes.pass ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_PERMISSION_DENIED) : code;
......@@ -114,6 +114,15 @@ inline bool smlDoubleToInt64OverFlow(double num) {
return false;
}
void smlStrReplace(char* src, int32_t len){
if (!tsSmlDot2Underline) return;
for(int i = 0; i < len; i++){
if(src[i] == '.'){
src[i] = '_';
}
}
}
int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) {
if (pBuf->buf) {
memset(pBuf->buf, 0, pBuf->len);
......@@ -193,6 +202,9 @@ static int32_t smlParseTableName(SArray *tags, char *childTableName) {
if (childTableNameLen == tag->keyLen && strncmp(tag->key, tsSmlChildTableName, tag->keyLen) == 0) {
memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
strncpy(childTableName, tag->value, (tag->length < TSDB_TABLE_NAME_LEN ? tag->length : TSDB_TABLE_NAME_LEN));
if(tsSmlDot2Underline){
smlStrReplace(childTableName, strlen(childTableName));
}
taosArrayRemove(tags, i);
break;
}
......@@ -838,6 +850,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
char *measure = taosMemoryMalloc(superTableLen);
memcpy(measure, superTable, superTableLen);
PROCESS_SLASH_IN_MEASUREMENT(measure, superTableLen);
smlStrReplace(measure, superTableLen);
memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
memcpy(pName.tname, measure, superTableLen);
taosMemoryFree(measure);
......@@ -1051,7 +1064,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
taosMemoryFreeClear(sTableData->tableMeta);
sTableData->tableMeta = pTableMeta;
uDebug("SML:0x%" PRIx64 "modify schema uid:%" PRIu64 ", sversion:%d, tversion:%d", info->id, pTableMeta->uid,
pTableMeta->sversion, pTableMeta->tversion) tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp);
pTableMeta->sversion, pTableMeta->tversion);
tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp);
}
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas end success, format:%d, needModifySchema:%d", info->id, info->dataFormat,
info->needModifySchema);
......@@ -1394,7 +1408,14 @@ static int32_t smlInsertData(SSmlHandle *info) {
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
while (oneTable) {
SSmlTableInfo *tableData = *oneTable;
tstrncpy(pName.tname, tableData->sTableName, tableData->sTableNameLen + 1);
int measureLen = tableData->sTableNameLen;
char *measure = (char *)taosMemoryMalloc(tableData->sTableNameLen);
memcpy(measure, tableData->sTableName, tableData->sTableNameLen);
PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen);
smlStrReplace(measure, measureLen);
memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
memcpy(pName.tname, measure, measureLen);
if (info->pRequest->tableList == NULL) {
info->pRequest->tableList = taosArrayInit(1, sizeof(SName));
......@@ -1411,6 +1432,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
code = smlCheckAuth(info, &conn, pName.tname, AUTH_TYPE_WRITE);
if(code != TSDB_CODE_SUCCESS){
taosMemoryFree(measure);
return code;
}
......@@ -1418,6 +1440,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
code = catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
taosMemoryFree(measure);
return code;
}
taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
......@@ -1426,6 +1449,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
(SSmlSTableMeta **)taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
if (unlikely(NULL == pMeta || NULL == (*pMeta)->tableMeta)) {
uError("SML:0x%" PRIx64 " NULL == pMeta. table name: %s", info->id, tableData->childTableName);
taosMemoryFree(measure);
return TSDB_CODE_SML_INTERNAL_ERROR;
}
......@@ -1435,11 +1459,6 @@ static int32_t smlInsertData(SSmlHandle *info) {
uDebug("SML:0x%" PRIx64 " smlInsertData table:%s, uid:%" PRIu64 ", format:%d", info->id, pName.tname,
tableData->uid, info->dataFormat);
int measureLen = tableData->sTableNameLen;
char *measure = (char *)taosMemoryMalloc(tableData->sTableNameLen);
memcpy(measure, tableData->sTableName, tableData->sTableNameLen);
PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen);
code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols,
(*pMeta)->tableMeta, tableData->childTableName, measure, measureLen, info->ttl, info->msgBuf.buf,
info->msgBuf.len);
......
......@@ -996,8 +996,8 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
return TSDB_CODE_INVALID_TIMESTAMP;
}
SSmlKv kvTs = {.key = TS,
.keyLen = TS_LEN,
SSmlKv kvTs = {.key = tsSmlTsDefaultName,
.keyLen = strlen(tsSmlTsDefaultName),
.type = TSDB_DATA_TYPE_TIMESTAMP,
.i = ts,
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
......@@ -1200,8 +1200,8 @@ static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo *
return TSDB_CODE_INVALID_TIMESTAMP;
}
}
SSmlKv kvTs = {.key = TS,
.keyLen = TS_LEN,
SSmlKv kvTs = {.key = tsSmlTsDefaultName,
.keyLen = strlen(tsSmlTsDefaultName),
.type = TSDB_DATA_TYPE_TIMESTAMP,
.i = ts,
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
......
......@@ -157,6 +157,7 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
measure = (char *)taosMemoryMalloc(currElement->measureLen);
memcpy(measure, currElement->measure, currElement->measureLen);
PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen);
smlStrReplace(measure, measureLen);
}
STableMeta *pTableMeta = smlGetMeta(info, measure, measureLen);
if (currElement->measureEscaped) {
......@@ -365,6 +366,7 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
measure = (char *)taosMemoryMalloc(currElement->measureLen);
memcpy(measure, currElement->measure, currElement->measureLen);
PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen);
smlStrReplace(measure, measureLen);
}
STableMeta *pTableMeta = smlGetMeta(info, measure, measureLen);
if (currElement->measureEscaped) {
......@@ -651,8 +653,8 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
return TSDB_CODE_INVALID_TIMESTAMP;
}
// add ts to
SSmlKv kv = {.key = TS,
.keyLen = TS_LEN,
SSmlKv kv = {.key = tsSmlTsDefaultName,
.keyLen = strlen(tsSmlTsDefaultName),
.type = TSDB_DATA_TYPE_TIMESTAMP,
.i = ts,
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes,
......
......@@ -260,8 +260,8 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
return TSDB_CODE_INVALID_TIMESTAMP;
}
SSmlKv kvTs = {.key = TS,
.keyLen = TS_LEN,
SSmlKv kvTs = {.key = tsSmlTsDefaultName,
.keyLen = strlen(tsSmlTsDefaultName),
.type = TSDB_DATA_TYPE_TIMESTAMP,
.i = ts,
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
......
......@@ -105,6 +105,8 @@ char *tsClientCrashReportUri = "/ccrashreport";
char *tsSvrCrashReportUri = "/dcrashreport";
// schemaless
bool tsSmlDot2Underline = true;
char tsSmlTsDefaultName[TSDB_COL_NAME_LEN] = "_ts";
char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null";
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table name can be specified in tag value.
// If set to empty system will generate table name using MD5 hash.
......@@ -366,6 +368,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "keepColumnName", tsKeepColumnName, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddString(pCfg, "smlChildTableName", "", CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddString(pCfg, "smlTsDefaultName", tsSmlTsDefaultName, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddBool(pCfg, "smlDot2Underline", tsSmlDot2Underline, CFG_SCOPE_CLIENT) != 0) return -1;
// if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, CFG_SCOPE_CLIENT) != 0) return -1;
// if (cfgAddInt32(pCfg, "smlBatchSize", tsSmlBatchSize, 1, INT32_MAX, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddInt32(pCfg, "maxInsertBatchRows", tsMaxInsertBatchRows, 1, INT32_MAX, CFG_SCOPE_CLIENT) != 0) return -1;
......@@ -801,6 +805,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
tstrncpy(tsSmlTsDefaultName, cfgGetItem(pCfg, "smlTsDefaultName")->str, TSDB_COL_NAME_LEN);
tsSmlDot2Underline = cfgGetItem(pCfg, "smlDot2Underline")->bval;
// tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
// tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32;
......@@ -1243,6 +1249,10 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
// tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
// } else if (strcasecmp("smlBatchSize", name) == 0) {
// tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32;
} else if(strcasecmp("smlTsDefaultName", name) == 0) {
tstrncpy(tsSmlTsDefaultName, cfgGetItem(pCfg, "smlTsDefaultName")->str, TSDB_COL_NAME_LEN);
} else if(strcasecmp("smlDot2Underline", name) == 0) {
tsSmlDot2Underline = cfgGetItem(pCfg, "smlDot2Underline")->bval;
} else if (strcasecmp("shellActivityTimer", name) == 0) {
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;
} else if (strcasecmp("supportVnodes", name) == 0) {
......
......@@ -345,6 +345,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/smaTest.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/smaTest.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/sma_index.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml_TS-3724.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/spread.py
......
......@@ -24,6 +24,8 @@ import threading
import json
class TDTestCase:
updatecfgDict = {'clientCfg': {'smlDot2Underline': 0}}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
......
......@@ -28,6 +28,8 @@ if platform.system().lower() == 'windows':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer,encoding='utf8')
class TDTestCase:
updatecfgDict = {'clientCfg': {'smlDot2Underline': 0}}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
......
......@@ -15,7 +15,7 @@ sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
updatecfgDict = {'clientCfg': {'smlChildTableName': 'dataModelName', 'fqdn': 'localhost'}, 'fqdn': 'localhost'}
updatecfgDict = {'clientCfg': {'smlChildTableName': 'dataModelName', 'fqdn': 'localhost', 'smlDot2Underline': 0}, 'fqdn': 'localhost'}
print("===================: ", updatecfgDict)
def init(self, conn, logSql, replicaVar=1):
......@@ -101,6 +101,15 @@ class TDTestCase:
tdSql.query(f"desc {dbname}.macylr")
tdSql.checkRows(25)
tdSql.query(f"select * from ts3724.`.stb2`")
tdSql.checkRows(1)
tdSql.query(f"select * from ts3724.`stb.2`")
tdSql.checkRows(1)
tdSql.query(f"select * from ts3724.`stb2.`")
tdSql.checkRows(1)
return
def run(self):
......
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
updatecfgDict = {'clientCfg': {'smlChildTableName': 'dataModelName', 'fqdn': 'localhost', 'smlTsDefaultName': "times"}, 'fqdn': 'localhost'}
print("===================: ", updatecfgDict)
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkContent(self, dbname="sml_db"):
simClientCfg="%s/taos.cfg"%tdDnodes.getSimCfgPath()
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/sml_test %s'%(buildPath, simClientCfg)
print("cmdStr:", cmdStr)
tdLog.info(cmdStr)
ret = os.system(cmdStr)
if ret != 0:
tdLog.info("sml_test ret != 0")
tdSql.query(f"select * from ts3303.stb2")
tdSql.query(f"select * from ts3303.meters")
# tdSql.execute('use sml_db')
tdSql.query(f"select * from {dbname}.t_b7d815c9222ca64cdf2614c61de8f211")
tdSql.checkRows(1)
tdSql.checkData(0, 0, '2016-01-01 08:00:07.000')
tdSql.checkData(0, 1, 2000)
tdSql.checkData(0, 2, 200)
tdSql.checkData(0, 3, 15)
tdSql.checkData(0, 4, 24.5208)
tdSql.checkData(0, 5, 28.09377)
tdSql.checkData(0, 6, 428)
tdSql.checkData(0, 7, 0)
tdSql.checkData(0, 8, 304)
tdSql.checkData(0, 9, 0)
tdSql.checkData(0, 10, 25)
tdSql.query(f"select * from {dbname}.readings")
tdSql.checkRows(9)
tdSql.query(f"select distinct tbname from {dbname}.readings")
tdSql.checkRows(4)
tdSql.query(f"select * from {dbname}.t_0799064f5487946e5d22164a822acfc8 order by times")
tdSql.checkRows(2)
tdSql.checkData(0, 3, "kk")
tdSql.checkData(1, 3, "")
tdSql.query(f"select distinct tbname from {dbname}.`sys_if_bytes_out`")
tdSql.checkRows(2)
tdSql.query(f"select * from {dbname}.t_fc70dec6677d4277c5d9799c4da806da order by times")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 1.300000000)
tdSql.checkData(1, 1, 13.000000000)
tdSql.query(f"select * from {dbname}.`sys_procs_running`")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 42.000000000)
tdSql.checkData(0, 2, "web01")
tdSql.query(f"select distinct tbname from {dbname}.`sys_cpu_nice`")
tdSql.checkRows(3)
tdSql.query(f"select * from {dbname}.`sys_cpu_nice` order by times")
tdSql.checkRows(4)
tdSql.checkData(0, 1, 13.000000000)
tdSql.checkData(0, 2, "web01")
tdSql.checkData(0, 3, None)
tdSql.checkData(0, 4, "lga")
tdSql.checkData(1, 1, 9.000000000)
tdSql.checkData(1, 2, "web02")
tdSql.checkData(3, 3, "t1")
tdSql.checkData(0, 4, "lga")
tdSql.query(f"select * from {dbname}.macylr")
tdSql.checkRows(2)
tdSql.query(f"select * from {dbname}.qelhxo")
tdSql.checkRows(5)
tdSql.query(f"desc {dbname}.macylr")
tdSql.checkRows(25)
tdSql.query(f"select * from ts3724._stb2")
tdSql.checkRows(1)
tdSql.query(f"select * from ts3724.stb_2")
tdSql.checkRows(1)
tdSql.query(f"select * from ts3724.stb2_")
tdSql.checkRows(1)
return
def run(self):
tdSql.prepare()
self.checkContent()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
......@@ -1522,6 +1522,36 @@ int sml_ts2385_Test() {
return code;
}
int sml_ts3724_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "drop database if exists ts3724");
taos_free_result(pRes);
pRes = taos_query(taos, "create database if not exists ts3724");
taos_free_result(pRes);
const char *sql[] = {
"stb.2,t1=1 f1=283i32 1632299372000",
".stb2,t1=1 f1=106i32 1632299378000",
"stb2.,t1=1 f1=106i32 1632299378000",
};
pRes = taos_query(taos, "use ts3724");
taos_free_result(pRes);
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
int code = taos_errno(pRes);
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
taos_free_result(pRes);
taos_close(taos);
return code;
}
int main(int argc, char *argv[]) {
if (argc == 2) {
taos_options(TSDB_OPTION_CONFIGDIR, argv[1]);
......@@ -1579,5 +1609,8 @@ int main(int argc, char *argv[]) {
ASSERT(!ret);
ret = sml_19221_Test();
ASSERT(!ret);
ret = sml_ts3724_Test();
ASSERT(!ret);
return ret;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册