提交 0a2f1cb7 编写于 作者: wmmhello's avatar wmmhello

refactor:add the configuration of child table name

上级 6dd2a578
...@@ -125,6 +125,9 @@ extern SDiskCfg tsDiskCfg[]; ...@@ -125,6 +125,9 @@ extern SDiskCfg tsDiskCfg[];
// udf // udf
extern bool tsStartUdfd; extern bool tsStartUdfd;
// schemaless
extern char tsSmlChildTableName[];
// internal // internal
extern int32_t tsTransPullupInterval; extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval; extern int32_t tsMqRebalanceInterval;
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include "clientInt.h" #include "clientInt.h"
#include "tname.h" #include "tname.h"
#include "cJSON.h" #include "cJSON.h"
#include "tglobal.h"
//================================================================================================= //=================================================================================================
#define SPACE ' ' #define SPACE ' '
...@@ -54,6 +55,9 @@ for (int i = 1; i < keyLen; ++i) { \ ...@@ -54,6 +55,9 @@ for (int i = 1; i < keyLen; ++i) { \
} \ } \
} }
#define IS_INVALID_COL_LEN(len) ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN)
#define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN)
#define OTD_MAX_FIELDS_NUM 2 #define OTD_MAX_FIELDS_NUM 2
#define OTD_JSON_SUB_FIELDS_NUM 2 #define OTD_JSON_SUB_FIELDS_NUM 2
#define OTD_JSON_FIELDS_NUM 4 #define OTD_JSON_FIELDS_NUM 4
...@@ -899,8 +903,8 @@ static int32_t smlParseInfluxString(const char* sql, SSmlLineInfo *elements, SSm ...@@ -899,8 +903,8 @@ static int32_t smlParseInfluxString(const char* sql, SSmlLineInfo *elements, SSm
sql++; sql++;
} }
elements->measureLen = sql - elements->measure; elements->measureLen = sql - elements->measure;
if(elements->measureLen == 0) { if(IS_INVALID_TABLE_LEN(elements->measureLen)) {
smlBuildInvalidDataMsg(msg, "measure is empty", NULL); smlBuildInvalidDataMsg(msg, "measure is empty or too large than 192", NULL);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
...@@ -969,8 +973,9 @@ static void smlParseTelnetElement(const char **sql, const char **data, int32_t * ...@@ -969,8 +973,9 @@ static void smlParseTelnetElement(const char **sql, const char **data, int32_t *
} }
} }
static int32_t smlParseTelnetTags(const char* data, SArray *cols, SHashObj *dumplicateKey, SSmlMsgBuf *msg){ static int32_t smlParseTelnetTags(const char* data, SArray *cols, char *childTableName, SHashObj *dumplicateKey, SSmlMsgBuf *msg){
const char *sql = data; const char *sql = data;
size_t childTableNameLen = strlen(tsSmlChildTableName);
while(*sql != '\0'){ while(*sql != '\0'){
JUMP_SPACE(sql) JUMP_SPACE(sql)
if(*sql == '\0') break; if(*sql == '\0') break;
...@@ -992,7 +997,7 @@ static int32_t smlParseTelnetTags(const char* data, SArray *cols, SHashObj *dump ...@@ -992,7 +997,7 @@ static int32_t smlParseTelnetTags(const char* data, SArray *cols, SHashObj *dump
sql++; sql++;
} }
if(keyLen == 0 || keyLen >= TSDB_COL_NAME_LEN){ if(IS_INVALID_COL_LEN(keyLen)){
smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key); smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
...@@ -1022,6 +1027,13 @@ static int32_t smlParseTelnetTags(const char* data, SArray *cols, SHashObj *dump ...@@ -1022,6 +1027,13 @@ static int32_t smlParseTelnetTags(const char* data, SArray *cols, SHashObj *dump
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
//handle child table name
if(childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0){
memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
continue;
}
// add kv to SSmlKv // add kv to SSmlKv
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv) return TSDB_CODE_OUT_OF_MEMORY; if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
...@@ -1043,7 +1055,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable ...@@ -1043,7 +1055,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable
// parse metric // parse metric
smlParseTelnetElement(&sql, &tinfo->sTableName, &tinfo->sTableNameLen); smlParseTelnetElement(&sql, &tinfo->sTableName, &tinfo->sTableNameLen);
if (!(tinfo->sTableName) || tinfo->sTableNameLen == 0) { if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql); smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
...@@ -1085,7 +1097,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable ...@@ -1085,7 +1097,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable
} }
// parse tags // parse tags
ret = smlParseTelnetTags(sql, tinfo->tags, info->dumplicateKey, &info->msgBuf); ret = smlParseTelnetTags(sql, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql); smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
...@@ -1094,7 +1106,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable ...@@ -1094,7 +1106,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, char *childTableName, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){
if(isTag && len == 0){ if(isTag && len == 0){
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv) return TSDB_CODE_OUT_OF_MEMORY; if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
...@@ -1107,6 +1119,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is ...@@ -1107,6 +1119,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
size_t childTableNameLen = strlen(tsSmlChildTableName);
const char *sql = data; const char *sql = data;
while(sql < data + len){ while(sql < data + len){
const char *key = sql; const char *key = sql;
...@@ -1126,7 +1139,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is ...@@ -1126,7 +1139,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
sql++; sql++;
} }
if(keyLen == 0 || keyLen >= TSDB_COL_NAME_LEN){ if(IS_INVALID_COL_LEN(keyLen)){
smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key); smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
...@@ -1169,6 +1182,13 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is ...@@ -1169,6 +1182,13 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
PROCESS_SLASH(key, keyLen) PROCESS_SLASH(key, keyLen)
PROCESS_SLASH(value, valueLen) PROCESS_SLASH(value, valueLen)
//handle child table name
if(childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0){
memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
continue;
}
// add kv to SSmlKv // add kv to SSmlKv
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv) return TSDB_CODE_OUT_OF_MEMORY; if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
...@@ -1477,8 +1497,8 @@ static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlTableIn ...@@ -1477,8 +1497,8 @@ static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlTableIn
} }
tinfo->sTableNameLen = strlen(metric->valuestring); tinfo->sTableNameLen = strlen(metric->valuestring);
if (tinfo->sTableNameLen >= TSDB_TABLE_NAME_LEN) { if (IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
uError("OTD:0x%"PRIx64" Metric cannot exceeds %d characters in JSON", info->id, TSDB_TABLE_NAME_LEN - 1); uError("OTD:0x%"PRIx64" Metric lenght is 0 or large than 192", info->id);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
} }
...@@ -1828,60 +1848,49 @@ static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) { ...@@ -1828,60 +1848,49 @@ static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, SHashObj *dumplicateKey, SSmlMsgBuf *msg) { static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey, SSmlMsgBuf *msg) {
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
cJSON *tags = cJSON_GetObjectItem(root, "tags"); cJSON *tags = cJSON_GetObjectItem(root, "tags");
if (tags == NULL || tags->type != cJSON_Object) { if (tags == NULL || tags->type != cJSON_Object) {
return TSDB_CODE_TSC_INVALID_JSON; return TSDB_CODE_TSC_INVALID_JSON;
} }
//handle child table name todo
// size_t childTableNameLen = strlen(tsSmlChildTableName);
// char childTbName[TSDB_TABLE_NAME_LEN] = {0};
// if (childTableNameLen != 0) {
// memcpy(childTbName, tsSmlChildTableName, childTableNameLen);
// cJSON *id = cJSON_GetObjectItem(tags, childTbName);
// if (id != NULL) {
// if (!cJSON_IsString(id)) {
// tscError("OTD:0x%"PRIx64" ID must be JSON string", info->id);
// return TSDB_CODE_TSC_INVALID_JSON;
// }
// size_t idLen = strlen(id->valuestring);
// *childTableName = tcalloc(idLen + TS_BACKQUOTE_CHAR_SIZE + 1, sizeof(char));
// memcpy(*childTableName, id->valuestring, idLen);
// addEscapeCharToString(*childTableName, (int32_t)idLen);
//
// //check duplicate IDs
// cJSON_DeleteItemFromObject(tags, childTbName);
// id = cJSON_GetObjectItem(tags, childTbName);
// if (id != NULL) {
// return TSDB_CODE_TSC_DUP_TAG_NAMES;
// }
// }
// }
size_t childTableNameLen = strlen(tsSmlChildTableName);
int32_t tagNum = cJSON_GetArraySize(tags); int32_t tagNum = cJSON_GetArraySize(tags);
for (int32_t i = 0; i < tagNum; ++i) { for (int32_t i = 0; i < tagNum; ++i) {
cJSON *tag = cJSON_GetArrayItem(tags, i); cJSON *tag = cJSON_GetArrayItem(tags, i);
if (tag == NULL) { if (tag == NULL) {
return TSDB_CODE_TSC_INVALID_JSON; return TSDB_CODE_TSC_INVALID_JSON;
} }
size_t keyLen = strlen(tag->string);
if (IS_INVALID_COL_LEN(keyLen)) {
uError("OTD:Tag key length is 0 or too large than 64");
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
//check duplicate keys //check duplicate keys
if (smlCheckDuplicateKey(tag->string, strlen(tag->string), dumplicateKey)) { if (smlCheckDuplicateKey(tag->string, keyLen, dumplicateKey)) {
return TSDB_CODE_TSC_DUP_TAG_NAMES; return TSDB_CODE_TSC_DUP_TAG_NAMES;
} }
//handle child table name
if(childTableNameLen != 0 && strcmp(tag->string, tsSmlChildTableName) == 0){
if (!cJSON_IsString(tag)) {
uError("OTD:ID must be JSON string");
return TSDB_CODE_TSC_INVALID_JSON;
}
memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
strncpy(childTableName, tag->valuestring, TSDB_TABLE_NAME_LEN);
continue;
}
// add kv to SSmlKv // add kv to SSmlKv
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv) return TSDB_CODE_OUT_OF_MEMORY; if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
if(pKVs) taosArrayPush(pKVs, &kv); if(pKVs) taosArrayPush(pKVs, &kv);
//key //key
kv->keyLen = strlen(tag->string); kv->keyLen = keyLen;
if (kv->keyLen >= TSDB_COL_NAME_LEN) {
uError("OTD:Tag key cannot exceeds %d characters in JSON", TSDB_COL_NAME_LEN - 1);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
ret = smlJsonCreateSring(&kv->key, tag->string, kv->keyLen); ret = smlJsonCreateSring(&kv->key, tag->string, kv->keyLen);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
...@@ -1937,7 +1946,7 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * ...@@ -1937,7 +1946,7 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *
uDebug("OTD:0x%"PRIx64" Parse metric value from JSON payload finished", info->id); uDebug("OTD:0x%"PRIx64" Parse metric value from JSON payload finished", info->id);
//Parse tags //Parse tags
ret = smlParseTagsFromJSON(root, tinfo->tags, info->dumplicateKey, &info->msgBuf); ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
if (ret) { if (ret) {
uError("OTD:0x%"PRIx64" Unable to parse tags from JSON payload", info->id); uError("OTD:0x%"PRIx64" Unable to parse tags from JSON payload", info->id);
return ret; return ret;
...@@ -2019,11 +2028,16 @@ static int32_t smlParseInfluxLine(SSmlHandle* info, const char* sql) { ...@@ -2019,11 +2028,16 @@ static int32_t smlParseInfluxLine(SSmlHandle* info, const char* sql) {
(*oneTable)->sTableName = elements.measure; (*oneTable)->sTableName = elements.measure;
(*oneTable)->sTableNameLen = elements.measureLen; (*oneTable)->sTableNameLen = elements.measureLen;
RandTableName rName = { (*oneTable)->tags, (*oneTable)->sTableName, (uint8_t)(*oneTable)->sTableNameLen, if(strlen((*oneTable)->childTableName) == 0){
(*oneTable)->childTableName, 0 }; RandTableName rName = { (*oneTable)->tags, (*oneTable)->sTableName, (uint8_t)(*oneTable)->sTableNameLen,
(*oneTable)->childTableName, 0 };
buildChildTableName(&rName);
(*oneTable)->uid = rName.uid;
}else{
(*oneTable)->uid = *(uint64_t*)((*oneTable)->childTableName);
}
buildChildTableName(&rName);
(*oneTable)->uid = rName.uid;
} }
SSmlSTableMeta** tableMeta = (SSmlSTableMeta**)taosHashGet(info->superTables, elements.measure, elements.measureLen); SSmlSTableMeta** tableMeta = (SSmlSTableMeta**)taosHashGet(info->superTables, elements.measure, elements.measureLen);
...@@ -2087,10 +2101,15 @@ static int32_t smlParseTelnetLine(SSmlHandle* info, void *data) { ...@@ -2087,10 +2101,15 @@ static int32_t smlParseTelnetLine(SSmlHandle* info, void *data) {
} }
taosHashClear(info->dumplicateKey); taosHashClear(info->dumplicateKey);
RandTableName rName = { tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen, if(strlen(tinfo->childTableName) == 0){
tinfo->childTableName, 0 }; RandTableName rName = { tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen,
buildChildTableName(&rName); tinfo->childTableName, 0 };
tinfo->uid = rName.uid; buildChildTableName(&rName);
tinfo->uid = rName.uid;
}else{
tinfo->uid = *(uint64_t*)(tinfo->childTableName); // generate uid by name simple
}
bool hasTable = true; bool hasTable = true;
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName)); SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName));
...@@ -2313,9 +2332,9 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr ...@@ -2313,9 +2332,9 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
return (TAOS_RES*)request; return (TAOS_RES*)request;
} }
if (numLines <= 0 || numLines > 65536) { if (!lines) {
request->code = TSDB_CODE_SML_INVALID_DATA; request->code = TSDB_CODE_SML_INVALID_DATA;
smlBuildInvalidDataMsg(&info->msgBuf, "numLines should be between 1 and 65536", NULL); smlBuildInvalidDataMsg(&info->msgBuf, "lines is null", NULL);
goto end; goto end;
} }
......
...@@ -76,6 +76,10 @@ int32_t tsTelemInterval = 86400; ...@@ -76,6 +76,10 @@ int32_t tsTelemInterval = 86400;
char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com"; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com";
uint16_t tsTelemPort = 80; uint16_t tsTelemPort = 80;
// schemaless
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; //user defined child table name can be specified in tag value.
//If set to empty system will generate table name using MD5 hash.
// query // query
int32_t tsQueryPolicy = 1; int32_t tsQueryPolicy = 1;
...@@ -512,6 +516,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { ...@@ -512,6 +516,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
return -1; return -1;
} }
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32; tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;
tsCompressMsgSize = cfgGetItem(pCfg, "compressMsgSize")->i32; tsCompressMsgSize = cfgGetItem(pCfg, "compressMsgSize")->i32;
tsCompressColData = cfgGetItem(pCfg, "compressColData")->i32; tsCompressColData = cfgGetItem(pCfg, "compressColData")->i32;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册