提交 5d6494d5 编写于 作者: wmmhello's avatar wmmhello

opti:modify logic for telnet and json in schemaless

上级 ef6e9f48
......@@ -89,6 +89,8 @@
#define VALUE "_value"
#define VALUE_LEN 6
#define JSON_METERS_NAME "__JM"
#define BINARY_ADD_LEN 2 // "binary" 2 means " "
#define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" "
......@@ -118,12 +120,21 @@ typedef struct NodeList{
struct NodeList* next;
}NodeList;
static void* nodeListGet(NodeList* list, const void *key, int32_t len){
typedef int32_t (*_equal_fn_sml)(const void *, const void *);
static void* nodeListGet(NodeList* list, const void *key, int32_t len, _equal_fn_sml fn){
NodeList *tmp = list;
while(tmp){
if(fn == NULL){
if(tmp->data.used && tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
return tmp->data.value;
}
}else{
if(tmp->data.used && fn(tmp->data.key, key) == 0) {
return tmp->data.value;
}
}
tmp = tmp->next;
}
return NULL;
......@@ -170,10 +181,10 @@ static int nodeListSize(NodeList* list){
/*********************** list end *********************************/
typedef struct {
const char *measure;
const char *tags;
const char *cols;
const char *timestamp;
char *measure;
char *tags;
char *cols;
char *timestamp;
int32_t measureLen;
int32_t measureTagsLen;
......@@ -256,13 +267,6 @@ typedef struct {
SSmlLineInfo *lines; // element is SSmlLineInfo
//
NodeList *superTableTagKeyStr;
NodeList *superTableColKeyStr;
void *currentLineTagKeys;
void *preLineTagKeys;
void *currentLineColKeys;
void *preLineColKeys;
SArray *preLineTagKV;
SArray *preLineColKV;
......@@ -846,48 +850,30 @@ static bool smlIsNchar(const char *pVal, uint16_t len) {
/******************************* parse basic type function end **********************/
/******************************* time function **********************/
static int8_t precisionConvert[7] = {TSDB_TIME_PRECISION_NANO, TSDB_TIME_PRECISION_HOURS, TSDB_TIME_PRECISION_MINUTES,
static uint8_t smlPrecisionConvert[7] = {TSDB_TIME_PRECISION_NANO, TSDB_TIME_PRECISION_HOURS, TSDB_TIME_PRECISION_MINUTES,
TSDB_TIME_PRECISION_SECONDS, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_MICRO,
TSDB_TIME_PRECISION_NANO};
static int64_t smlFactorNS[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
static int64_t smlFactorS[3] = {1000LL, 1000000LL, 1000000000LL};
static int64_t smlToMilli[3] = {3600LL, 60LL, 1LL};
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
static int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, uint8_t toPrecision) {
char *endPtr = NULL;
int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
if (value + len != endPtr) {
if (unlikely(value + len != endPtr)) {
return -1;
}
double ts = tsInt64;
switch (type) {
case TSDB_TIME_PRECISION_HOURS:
ts *= NANOSECOND_PER_HOUR;
tsInt64 *= NANOSECOND_PER_HOUR;
break;
case TSDB_TIME_PRECISION_MINUTES:
ts *= NANOSECOND_PER_MINUTE;
tsInt64 *= NANOSECOND_PER_MINUTE;
break;
case TSDB_TIME_PRECISION_SECONDS:
ts *= NANOSECOND_PER_SEC;
tsInt64 *= NANOSECOND_PER_SEC;
break;
case TSDB_TIME_PRECISION_MILLI:
ts *= NANOSECOND_PER_MSEC;
tsInt64 *= NANOSECOND_PER_MSEC;
break;
case TSDB_TIME_PRECISION_MICRO:
ts *= NANOSECOND_PER_USEC;
tsInt64 *= NANOSECOND_PER_USEC;
break;
case TSDB_TIME_PRECISION_NANO:
break;
default:
ASSERT(0);
}
if (ts >= (double)INT64_MAX || ts < 0) {
if(unlikely(fromPrecision >= TSDB_TIME_PRECISION_HOURS)){
fromPrecision = TSDB_TIME_PRECISION_MILLI;
int64_t unit = smlToMilli[fromPrecision - TSDB_TIME_PRECISION_HOURS];
if(unit > INT64_MAX / tsInt64){
return -1;
}
tsInt64 *= unit;
}
return tsInt64;
return convertTimePrecision(tsInt64, fromPrecision, toPrecision);
}
static int8_t smlGetTsTypeByLen(int32_t len) {
......@@ -901,18 +887,16 @@ static int8_t smlGetTsTypeByLen(int32_t len) {
}
static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) {
if (len == 0 || (len == 1 && data[0] == '0')) {
return taosGetTimestampNs();
}
uint8_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;
int8_t tsType = precisionConvert[info->precision];
if (tsType == -1) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL);
return -1;
if(unlikely(len == 0 || (len == 1 && data[0] == '0'))){
return taosGetTimestampNs()/smlFactorNS[toPrecision];
}
int64_t ts = smlGetTimeValue(data, len, tsType);
if (ts == -1) {
uint8_t fromPrecision = smlPrecisionConvert[info->precision];
int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision);
if (unlikely(ts == -1)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
return -1;
}
......@@ -920,28 +904,30 @@ static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t le
}
static int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len) {
if (!data) {
uint8_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;
if (unlikely(!data)) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
return -1;
}
if (len == 1 && data[0] == '0') {
return taosGetTimestampNs();
if (unlikely(len == 1 && data[0] == '0')) {
return taosGetTimestampNs()/smlFactorNS[toPrecision];
}
int8_t tsType = smlGetTsTypeByLen(len);
if (tsType == -1) {
uint8_t fromPrecision = smlGetTsTypeByLen(len);
if (unlikely(fromPrecision == -1)) {
smlBuildInvalidDataMsg(&info->msgBuf,
"timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
return -1;
}
int64_t ts = smlGetTimeValue(data, len, tsType);
if (ts == -1) {
int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision);
if (unlikely(ts == -1)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
return -1;
}
return ts;
}
static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArray *cols) {
static int64_t smlParseTS(SSmlHandle *info, const char *data, int32_t len) {
int64_t ts = 0;
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
// uError("SML:data:%s,len:%d", data, len);
......@@ -953,21 +939,7 @@ static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArra
}
uDebug("SML:0x%" PRIx64 " smlParseTS:%" PRId64, info->id, ts);
if (ts <= 0) {
uError("SML:0x%" PRIx64 " smlParseTS error:%" PRId64, info->id, ts);
return TSDB_CODE_INVALID_TIMESTAMP;
}
// add ts to
SSmlKv kv = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
if(info->dataFormat){
kv.i = convertTimePrecision(kv.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
}else{
taosArraySet(cols, 0, &kv);
}
return TSDB_CODE_SUCCESS;
return ts;
}
/******************************* time function end **********************/
......@@ -1047,10 +1019,12 @@ static int32_t smlSetCTableName(SSmlTableInfo *oneTable){
smlParseTableName(oneTable->tags, oneTable->childTableName);
if (strlen(oneTable->childTableName) == 0) {
RandTableName rName = {oneTable->tags, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen,
SArray* dst = taosArrayDup(oneTable->tags, NULL);
RandTableName rName = {dst, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen,
oneTable->childTableName, 0};
buildChildTableName(&rName);
taosArrayDestroy(dst);
oneTable->uid = rName.uid;
} else {
oneTable->uid = *(uint64_t *)(oneTable->childTableName);
......@@ -1106,32 +1080,6 @@ static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols){
}
}
bool smlFormatJudge(NodeList **superTableKeyStr, void* preLineKeys, void* currentLineKeys,
SSmlLineInfo *currElements, bool isSameMeasure, int32_t len){
// same measure
if(isSameMeasure){
if(varDataTLen(preLineKeys) != varDataTLen(currentLineKeys)
|| memcmp(preLineKeys, currentLineKeys, varDataTLen(preLineKeys)) != 0){
return false;
}
}else{ // diff measure
void *keyStr = nodeListGet(*superTableKeyStr, currElements->measure, currElements->measureLen);
if(unlikely(keyStr == NULL)){
keyStr = taosMemoryMalloc(len);
varDataCopy(keyStr, currentLineKeys);
nodeListSet(superTableKeyStr, currElements->measure, currElements->measureLen, keyStr);
}else{
if(varDataTLen(keyStr) != varDataTLen(currentLineKeys)
&& memcmp(keyStr, currentLineKeys, varDataTLen(currentLineKeys)) != 0){
return false;
}
}
}
varDataCopy(preLineKeys, currentLineKeys);
return true;
}
static STableMeta* smlGetMeta(SSmlHandle *info, const void* measure, int32_t measureLen){
STableMeta *pTableMeta = NULL;
......@@ -1197,42 +1145,33 @@ static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd,
SSmlLineInfo* currElement, bool isTag){
bool isSameMeasure = false;
bool isSameCTable = false;
int cnt = 0;
void *keyStr = NULL;
// bool isPreLineKVNULL = false;
SArray *preLineKV = NULL;
bool isSuperKVInit = false;
SArray *superKV = NULL;
if(info->dataFormat){
if(currElement->measureTagsLen == info->preLine.measureTagsLen
&& memcmp(currElement->measure, info->preLine.measure, currElement->measureTagsLen) == 0){
isSameCTable = true;
if(isTag) return TSDB_CODE_SUCCESS;
}else if(!isTag){
SSmlTableInfo *oneTable = (SSmlTableInfo *)nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen);
if (unlikely(oneTable == NULL)) {
smlBuildInvalidDataMsg(&info->msgBuf, "child table should inside", currElement->measure);
return TSDB_CODE_SML_INVALID_DATA;
}
info->currTableDataCtx = oneTable->tableDataCtx;
}
int32_t is_same_child_table_json(const void *a, const void *b){
return (cJSON_Compare((const cJSON *)a, (const cJSON *)b, true)) ? 0 : 1;
}
#define IS_SAME_CHILD_TABLE (elements->measureTagsLen == info->preLine.measureTagsLen \
&& memcmp(elements->measure, info->preLine.measure, elements->measureTagsLen) == 0)
#define IS_SAME_SUPER_TABLE (elements->measureLen == info->preLine.measureLen \
&& memcmp(elements->measure, info->preLine.measure, elements->measureLen) == 0)
#define IS_SAME_KEY (preKV->keyLen == kv.keyLen && memcmp(preKV->key, kv.key, kv.keyLen) == 0)
static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd,
SSmlLineInfo* currElement, bool isSameMeasure, bool isSameCTable){
if(isSameCTable){
isSameMeasure = true;
}else if(currElement->measureLen == info->preLine.measureLen
&& memcmp(currElement->measure, info->preLine.measure, currElement->measureLen) == 0){
isSameMeasure = true;
return TSDB_CODE_SUCCESS;
}
int cnt = 0;
SArray *preLineKV = info->preLineTagKV;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if(info->dataFormat){
if(!isSameMeasure){
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen);
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
if(sMeta == NULL){
if(unlikely(sMeta == NULL)){
sMeta = smlBuildSTableMeta(info->dataFormat);
STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
sMeta->tableMeta = pTableMeta;
......@@ -1244,228 +1183,141 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd
nodeListSet(&info->superTables, currElement->measure, currElement->measureLen, sMeta);
}
info->currSTableMeta = sMeta->tableMeta;
if(isTag){
superKV = sMeta->tags;
}else{
superKV = sMeta->cols;
}
if(unlikely(taosArrayGetSize(superKV) == 0)){
isSuperKVInit = true;
}
}
if(isTag){
// prepare for judging if tag or col is the same for each line
if(unlikely(info->currentLineTagKeys == NULL)){ // sml todo size need remalloc
info->currentLineTagKeys = taosMemoryMalloc(sqlEnd - *sql);
}
if(info->preLineTagKeys == NULL){
info->preLineTagKeys = taosMemoryMalloc(sqlEnd - *sql);
}
keyStr = info->currentLineTagKeys;
if(info->preLineTagKV == NULL){
info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
// isPreLineKVNULL = true;
}
preLineKV = info->preLineTagKV;
}else{
if(unlikely(info->currentLineColKeys == NULL)){ // sml todo size need remalloc
info->currentLineColKeys = taosMemoryMalloc(sqlEnd - *sql);
}
if(info->preLineColKeys == NULL){
info->preLineColKeys = taosMemoryMalloc(sqlEnd - *sql);
}
keyStr = info->currentLineColKeys;
if(info->preLineColKV == NULL){
info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv));
// isPreLineKVNULL = true;
}
preLineKV = info->preLineColKV;
if(unlikely(taosArrayGetSize(superKV) == 0)){
isSuperKVInit = false;
}
if(!isSameMeasure){
taosArraySetSize(preLineKV, 0);
}
varDataLen(keyStr) = 0; // clear keys
}else{
preLineKV = taosArrayInit(8, sizeof(SSmlKv));
taosArraySetSize(preLineKV, 0);
}
while (*sql < sqlEnd) {
if (IS_SPACE(*sql)) {
if (unlikely(IS_SPACE(*sql))) {
break;
}
bool hasSlash = false;
// parse key
const char *key = *sql;
int32_t keyLen = 0;
while (*sql < sqlEnd) {
if (IS_COMMA(*sql)) {
if (unlikely(IS_COMMA(*sql))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
return TSDB_CODE_SML_INVALID_DATA;
}
if (IS_EQUAL(*sql)) {
if (unlikely(IS_EQUAL(*sql))) {
keyLen = *sql - key;
(*sql)++;
break;
}
if(!hasSlash){
hasSlash = (*(*sql) == SLASH);
}
(*sql)++;
}
if(unlikely(hasSlash)) {
PROCESS_SLASH(key, keyLen)
}
if (IS_INVALID_COL_LEN(keyLen)) {
if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
if(info->dataFormat){
memcpy(keyStr + varDataTLen(keyStr), key, keyLen + 1); // use = symbol
varDataLen(keyStr) += keyLen + 1;
}
// parse value
const char *value = *sql;
int32_t valueLen = 0;
bool isInQuote = false;
hasSlash = false;
while (*sql < sqlEnd) {
// parse value
if (!isTag && IS_QUOTE(*sql)) {
isInQuote = !isInQuote;
(*sql)++;
continue;
}
if (!isInQuote){
if (IS_SPACE(*sql)) {
break;
}else if (IS_COMMA(*sql)) {
if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
break;
}else if (IS_EQUAL(*sql)) {
}else if (unlikely(IS_EQUAL(*sql))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
return TSDB_CODE_SML_INVALID_DATA;
}
if(!hasSlash){
hasSlash = (*(*sql) == SLASH);
}
(*sql)++;
}
valueLen = *sql - value;
if (isInQuote) {
smlBuildInvalidDataMsg(&info->msgBuf, "only one quote", value);
return TSDB_CODE_SML_INVALID_DATA;
}
if (valueLen == 0) {
if (unlikely(valueLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
return TSDB_CODE_SML_INVALID_DATA;
}
PROCESS_SLASH(key, keyLen)
PROCESS_SLASH(value, valueLen)
SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen};
if (!isTag) {
int32_t ret = smlParseValue(&kv, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
if(unlikely(hasSlash)) {
PROCESS_SLASH(value, valueLen)
}
} else {
if (valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
if (unlikely(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
}
kv.type = TSDB_DATA_TYPE_NCHAR;
}
SSmlKv kv = {.key = key, .type = TSDB_DATA_TYPE_NCHAR, .keyLen = keyLen, .value = value, .length = valueLen};
if(info->dataFormat){
if(!isTag && cnt + 1 > info->currSTableMeta->tableInfo.numOfColumns){
smlBuildInvalidDataMsg(&info->msgBuf, "col more than meta", NULL);
return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
}
if(isTag && cnt + 1 > info->currSTableMeta->tableInfo.numOfTags){
smlBuildInvalidDataMsg(&info->msgBuf, "tag more than meta", NULL);
return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
}
// bind data
if(!isTag){
int ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, cnt + 1);
if (ret != TSDB_CODE_SUCCESS) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;
}
if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
info->needModifySchema = true;
}
do {
if(isSameMeasure){
if(cnt >= taosArrayGetSize(preLineKV)) {
if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
if(!isTag && kv.type != preKV->type){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if(IS_VAR_DATA_TYPE(kv.type) && kv.length > preKV->length){
if(unlikely(kv.length > preKV->length)){
preKV->length = kv.length;
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen);
if(tableMeta == NULL){
smlBuildInvalidDataMsg(&info->msgBuf, "measure should has inside", value);
return TSDB_CODE_SML_INVALID_DATA;
}
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
ASSERT(tableMeta != NULL);
if(isTag){
superKV = tableMeta->tags;
}else{
superKV = tableMeta->cols;
}
SSmlKv *oldKV = taosArrayGet(superKV, cnt);
SSmlKv *oldKV = taosArrayGet(tableMeta->tags, cnt);
oldKV->length = kv.length;
info->needModifySchema = true;
}
}else{
if(isSuperKVInit){
taosArrayPush(superKV, &kv);
}else{
if(cnt >= taosArrayGetSize(superKV)) {
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = taosArrayGet(superKV, cnt);
if(!isTag && kv.type != preKV->type){
}else{
if(isSuperKVInit){
if(unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if(IS_VAR_DATA_TYPE(kv.type)){
if(kv.length > preKV->length) {
SSmlKv *preKV = taosArrayGet(superKV, cnt);
if(unlikely(kv.length > preKV->length)) {
preKV->length = kv.length;
}else{
kv.length = preKV->length;
}
info->needModifySchema = true;
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
taosArrayPush(superKV, &kv);
}
taosArrayPush(preLineKV, &kv);
}
break;
}while(0);
}else{
taosArrayPush(preLineKV, &kv);
}
if(!info->dataFormat && !isTag){
if(currElement->colArray == NULL){
currElement->colArray = taosArrayInit(16, sizeof(SSmlKv));
taosArraySetSize(currElement->colArray, 1);
}
taosArrayPush(currElement->colArray, &kv); //reserve for timestamp
}
cnt++;
if(IS_SPACE(*sql)){
break;
......@@ -1473,27 +1325,11 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd
(*sql)++;
}
if(isTag && cnt > TSDB_MAX_TAGS){
smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
return TSDB_CODE_PAR_INVALID_TAGS_NUM;
}
if(info->dataFormat){
if(isTag){
info->dataFormat = smlFormatJudge(&info->superTableTagKeyStr, info->preLineTagKeys,
info->currentLineTagKeys, currElement, isSameMeasure, sqlEnd - currElement->tags);
if(!info->dataFormat) {
info->reRun = true;
void* oneTable = nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen, NULL);
if ((oneTable != NULL)) {
return TSDB_CODE_SUCCESS;
}
if(!isSameCTable){
if(taosArrayGetSize(preLineKV) > TSDB_MAX_TAGS){
smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
return TSDB_CODE_PAR_INVALID_TAGS_NUM;
}
void* oneTable = nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen);
if (unlikely(oneTable == NULL)) {
SSmlTableInfo *tinfo = smlBuildTableInfo(1, currElement->measure, currElement->measureLen);
if (!tinfo) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -1502,80 +1338,270 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd
taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
}
smlSetCTableName(tinfo);
if(info->dataFormat) {
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if(tinfo->tableDataCtx == NULL){
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
}
nodeListSet(&info->childTables, currElement->measure, currElement->measureTagsLen, tinfo);
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd,
SSmlLineInfo* currElement, bool isSameMeasure, bool isSameCTable){
int cnt = 0;
SArray *preLineKV = info->preLineColKV;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if(info->dataFormat){
if(unlikely(!isSameCTable)){
SSmlTableInfo *oneTable = (SSmlTableInfo *)nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen, NULL);
if (unlikely(oneTable == NULL)) {
smlBuildInvalidDataMsg(&info->msgBuf, "child table should inside", currElement->measure);
return TSDB_CODE_SML_INVALID_DATA;
}
info->currTableDataCtx = oneTable->tableDataCtx;
}
}else{
info->dataFormat = smlFormatJudge(&info->superTableColKeyStr, info->preLineColKeys,
info->currentLineColKeys, currElement, isSameMeasure, sqlEnd - currElement->cols);
if(!info->dataFormat) {
if(unlikely(!isSameMeasure)){
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
if(unlikely(sMeta == NULL)){
sMeta = smlBuildSTableMeta(info->dataFormat);
STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
sMeta->tableMeta = pTableMeta;
if(pTableMeta == NULL){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
nodeListSet(&info->superTables, currElement->measure, currElement->measureLen, sMeta);
}
}else{
void* oneTable = nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen);
if (unlikely(oneTable == NULL)) {
SSmlTableInfo *tinfo = smlBuildTableInfo(info->lineNum / 2, currElement->measure, currElement->measureLen);
if (!tinfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for(int i = 0; i < taosArrayGetSize(preLineKV); i++){
taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
info->currSTableMeta = sMeta->tableMeta;
superKV = sMeta->cols;
if(unlikely(taosArrayGetSize(superKV) == 0)){
isSuperKVInit = false;
}
smlSetCTableName(tinfo);
nodeListSet(&info->childTables, currElement->measure, currElement->measureTagsLen, tinfo);
taosArraySetSize(preLineKV, 0);
}
taosArrayDestroy(preLineKV); // smltodo
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseInfluxString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlLineInfo *elements) {
if (!sql) return TSDB_CODE_SML_INVALID_DATA;
JUMP_SPACE(sql, sqlEnd)
if (*sql == COMMA) return TSDB_CODE_SML_INVALID_DATA;
elements->measure = sql;
// parse measure
while (sql < sqlEnd) {
if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) {
MOVE_FORWARD_ONE(sql, sqlEnd - sql);
sqlEnd--;
continue;
}
if (IS_COMMA(sql)) {
while (*sql < sqlEnd) {
if (unlikely(IS_SPACE(*sql))) {
break;
}
if (IS_SPACE(sql)) {
bool hasSlash = false;
// parse key
const char *key = *sql;
int32_t keyLen = 0;
while (*sql < sqlEnd) {
if (unlikely(IS_COMMA(*sql))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
return TSDB_CODE_SML_INVALID_DATA;
}
if (unlikely(IS_EQUAL(*sql))) {
keyLen = *sql - key;
(*sql)++;
break;
}
sql++;
if(!hasSlash){
hasSlash = (*(*sql) == SLASH);
}
elements->measureLen = sql - elements->measure;
if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
(*sql)++;
}
// to get measureTagsLen before
const char* tmp = sql;
while (tmp < sqlEnd){
if (IS_SPACE(tmp)) {
break;
if(unlikely(hasSlash)) {
PROCESS_SLASH(key, keyLen)
}
tmp++;
if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
elements->measureTagsLen = tmp - elements->measure;
// parse value
const char *value = *sql;
int32_t valueLen = 0;
hasSlash = false;
bool isInQuote = false;
while (*sql < sqlEnd) {
// parse value
if (IS_QUOTE(*sql)) {
isInQuote = !isInQuote;
(*sql)++;
continue;
}
if (!isInQuote){
if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
break;
} else if (unlikely(IS_EQUAL(*sql))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
return TSDB_CODE_SML_INVALID_DATA;
}
}
if(!hasSlash){
hasSlash = (*(*sql) == SLASH);
}
(*sql)++;
}
valueLen = *sql - value;
if (unlikely(isInQuote)) {
smlBuildInvalidDataMsg(&info->msgBuf, "only one quote", value);
return TSDB_CODE_SML_INVALID_DATA;
}
if (unlikely(valueLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
return TSDB_CODE_SML_INVALID_DATA;
}
if(unlikely(hasSlash)) {
PROCESS_SLASH(value, valueLen)
}
SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen};
int32_t ret = smlParseValue(&kv, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
if(info->dataFormat){
//cnt begin 0, add ts so + 2
if(unlikely(cnt + 2 > info->currSTableMeta->tableInfo.numOfColumns)){
info->needModifySchema = true;
}
// bind data
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, cnt + 1);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;
}
if(isSameMeasure){
if(cnt >= taosArrayGetSize(preLineKV)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
if(kv.type != preKV->type){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if(unlikely(IS_VAR_DATA_TYPE(kv.type) && kv.length > preKV->length)){
preKV->length = kv.length;
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
ASSERT(tableMeta != NULL);
SSmlKv *oldKV = taosArrayGet(tableMeta->cols, cnt);
oldKV->length = kv.length;
info->needModifySchema = true;
}
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
if(isSuperKVInit){
if(unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = taosArrayGet(superKV, cnt);
if(unlikely(kv.type != preKV->type)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if(IS_VAR_DATA_TYPE(kv.type)){
if(kv.length > preKV->length) {
preKV->length = kv.length;
}else{
kv.length = preKV->length;
}
info->needModifySchema = true;
}
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
taosArrayPush(superKV, &kv);
}
taosArrayPush(preLineKV, &kv);
}
}else{
taosArraySetSize(currElement->colArray, 1);
taosArrayPush(currElement->colArray, &kv); //reserve for timestamp
}
cnt++;
if(IS_SPACE(*sql)){
break;
}
(*sql)++;
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements) {
if (!sql) return TSDB_CODE_SML_INVALID_DATA;
JUMP_SPACE(sql, sqlEnd)
if (unlikely(*sql == COMMA)) return TSDB_CODE_SML_INVALID_DATA;
elements->measure = sql;
// parse measure
while (sql < sqlEnd) {
if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) {
MOVE_FORWARD_ONE(sql, sqlEnd - sql);
sqlEnd--;
continue;
}
if (IS_COMMA(sql)) {
break;
}
if (IS_SPACE(sql)) {
break;
}
sql++;
}
elements->measureLen = sql - elements->measure;
if (unlikely(IS_INVALID_TABLE_LEN(elements->measureLen))) {
smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
// to get measureTagsLen before
const char* tmp = sql;
while (tmp < sqlEnd){
if (IS_SPACE(tmp)) {
break;
}
tmp++;
}
elements->measureTagsLen = tmp - elements->measure;
bool isSameCTable = false;
bool isSameMeasure = false;
if(IS_SAME_CHILD_TABLE){
isSameCTable = true;
isSameMeasure = true;
}else if(info->dataFormat) {
isSameMeasure = IS_SAME_SUPER_TABLE;
}
// parse tag
if (*sql == SPACE) {
elements->tagsLen = 0;
......@@ -1584,16 +1610,16 @@ static int32_t smlParseInfluxString(SSmlHandle *info, const char *sql, const cha
elements->tags = sql;
// tinfo != NULL means child table has never occur before
int ret = smlParseKv(info, &sql, sqlEnd, elements, true);
if(ret != TSDB_CODE_SUCCESS){
int ret = smlParseTagKv(info, &sql, sqlEnd, elements, isSameMeasure, isSameCTable);
if(unlikely(ret != TSDB_CODE_SUCCESS)){
return ret;
}
sql = elements->measure + elements->measureTagsLen;
if(info->reRun){
if(unlikely(info->reRun)){
return TSDB_CODE_SUCCESS;
}
sql = elements->measure + elements->measureTagsLen;
elements->tagsLen = sql - elements->tags;
}
......@@ -1601,17 +1627,17 @@ static int32_t smlParseInfluxString(SSmlHandle *info, const char *sql, const cha
JUMP_SPACE(sql, sqlEnd)
elements->cols = sql;
int ret = smlParseKv(info, &sql, sqlEnd, elements, false);
if(ret != TSDB_CODE_SUCCESS){
int ret = smlParseColKv(info, &sql, sqlEnd, elements, isSameMeasure, isSameCTable);
if(unlikely(ret != TSDB_CODE_SUCCESS)){
return ret;
}
if(info->reRun){
if(unlikely(info->reRun)){
return TSDB_CODE_SUCCESS;
}
elements->colsLen = sql - elements->cols;
if (elements->colsLen == 0) {
if (unlikely(elements->colsLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "cols is empty", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
......@@ -1627,25 +1653,29 @@ static int32_t smlParseInfluxString(SSmlHandle *info, const char *sql, const cha
}
elements->timestampLen = sql - elements->timestamp;
ret = smlParseTS(info, elements->timestamp, elements->timestampLen, elements->colArray);
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlParseTS failed", info->id);
return ret;
int64_t ts = smlParseTS(info, elements->timestamp, elements->timestampLen);
if (ts <= 0) {
uError("SML:0x%" PRIx64 " smlParseTS error:%" PRId64, info->id, ts);
return TSDB_CODE_INVALID_TIMESTAMP;
}
// add ts to
SSmlKv kv = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
if(info->dataFormat){
smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
smlBuildRow(info->currTableDataCtx);
info->preLine = *elements;
}else{
taosArraySet(elements->colArray, 0, &kv);
}
info->preLine = *elements;
return TSDB_CODE_SUCCESS;
return ret;
}
static void smlParseTelnetElement(const char **sql, const char *sqlEnd, const char **data, int32_t *len) {
static void smlParseTelnetElement(char **sql, char *sqlEnd, char **data, int32_t *len) {
while (*sql < sqlEnd) {
if (**sql != SPACE && !(*data)) {
if (unlikely((**sql != SPACE && !(*data)))) {
*data = *sql;
} else if (**sql == SPACE && *data) {
} else if (unlikely(**sql == SPACE && *data)) {
*len = *sql - *data;
break;
}
......@@ -1653,25 +1683,60 @@ static void smlParseTelnetElement(const char **sql, const char *sqlEnd, const ch
}
}
static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *cols, char *childTableName,
SHashObj *dumplicateKey, SSmlMsgBuf *msg) {
if (!cols) return TSDB_CODE_OUT_OF_MEMORY;
static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SSmlLineInfo *elements, SSmlMsgBuf *msg) {
if(IS_SAME_CHILD_TABLE){
return TSDB_CODE_SUCCESS;
}
bool isSameMeasure = IS_SAME_SUPER_TABLE;
int cnt = 0;
SArray *preLineKV = info->preLineTagKV;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if(info->dataFormat){
if(!isSameMeasure){
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
if(unlikely(sMeta == NULL)){
sMeta = smlBuildSTableMeta(info->dataFormat);
STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
sMeta->tableMeta = pTableMeta;
if(pTableMeta == NULL){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta);
}
info->currSTableMeta = sMeta->tableMeta;
superKV = sMeta->tags;
if(unlikely(taosArrayGetSize(superKV) == 0)){
isSuperKVInit = false;
}
taosArraySetSize(preLineKV, 0);
}
}else{
taosArraySetSize(preLineKV, 0);
}
const char *sql = data;
size_t childTableNameLen = strlen(tsSmlChildTableName);
while (sql < sqlEnd) {
JUMP_SPACE(sql, sqlEnd)
if (*sql == '\0') break;
if (unlikely(*sql == '\0')) break;
const char *key = sql;
int32_t keyLen = 0;
// parse key
while (sql < sqlEnd) {
if (*sql == SPACE) {
if (unlikely(*sql == SPACE)) {
smlBuildInvalidDataMsg(msg, "invalid data", sql);
return TSDB_CODE_SML_INVALID_DATA;
}
if (*sql == EQUAL) {
if (unlikely(*sql == EQUAL)) {
keyLen = sql - key;
sql++;
break;
......@@ -1679,24 +1744,24 @@ static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *
sql++;
}
if (IS_INVALID_COL_LEN(keyLen)) {
if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
smlBuildInvalidDataMsg(msg, "dumplicate key", key);
return TSDB_CODE_TSC_DUP_NAMES;
}
// if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
// smlBuildInvalidDataMsg(msg, "dumplicate key", key);
// return TSDB_CODE_TSC_DUP_NAMES;
// }
// parse value
const char *value = sql;
int32_t valueLen = 0;
while (sql < sqlEnd) {
// parse value
if (*sql == SPACE) {
if (unlikely(*sql == SPACE)) {
break;
}
if (*sql == EQUAL) {
if (unlikely(*sql == EQUAL)) {
smlBuildInvalidDataMsg(msg, "invalid data", sql);
return TSDB_CODE_SML_INVALID_DATA;
}
......@@ -1704,91 +1769,176 @@ static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *
}
valueLen = sql - value;
if (valueLen == 0) {
if (unlikely(valueLen == 0)) {
smlBuildInvalidDataMsg(msg, "invalid value", value);
return TSDB_CODE_TSC_INVALID_VALUE;
}
// handle child table name
if (childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0) {
memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
continue;
if (unlikely(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
}
if (valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen, .type = TSDB_DATA_TYPE_NCHAR};
if(info->dataFormat){
if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
info->needModifySchema = true;
}
// add kv to SSmlKv
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
kv->key = key;
kv->keyLen = keyLen;
kv->value = value;
kv->length = valueLen;
kv->type = TSDB_DATA_TYPE_NCHAR;
if(isSameMeasure){
if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
if(unlikely(kv.length > preKV->length)){
preKV->length = kv.length;
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
ASSERT(tableMeta != NULL);
SSmlKv *oldKV = taosArrayGet(tableMeta->tags, cnt);
oldKV->length = kv.length;
info->needModifySchema = true;
}
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
if(isSuperKVInit){
if(unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = taosArrayGet(superKV, cnt);
if(unlikely(kv.length > preKV->length)) {
preKV->length = kv.length;
}else{
kv.length = preKV->length;
}
info->needModifySchema = true;
taosArrayPush(cols, &kv);
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
taosArrayPush(superKV, &kv);
}
taosArrayPush(preLineKV, &kv);
}
}else{
taosArrayPush(preLineKV, &kv);
}
cnt++;
}
SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->measure, elements->measureTagsLen, NULL);
if (unlikely(tinfo == NULL)) {
tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
if (!tinfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for(int i = 0; i < taosArrayGetSize(preLineKV); i++){
taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
}
smlSetCTableName(tinfo);
if (info->dataFormat) {
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if (tinfo->tableDataCtx == NULL) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
}
nodeListSet(&info->childTables, elements->measure, elements->measureTagsLen, tinfo);
}
return TSDB_CODE_SUCCESS;
}
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlTableInfo *tinfo,
SArray *cols) {
static int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements) {
if (!sql) return TSDB_CODE_SML_INVALID_DATA;
// parse metric
smlParseTelnetElement(&sql, sqlEnd, &tinfo->sTableName, &tinfo->sTableNameLen);
if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
smlParseTelnetElement(&sql, sqlEnd, &elements->measure, &elements->measureLen);
if (unlikely((!(elements->measure) || IS_INVALID_TABLE_LEN(elements->measureLen)))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
// parse timestamp
const char *timestamp = NULL;
int32_t tLen = 0;
smlParseTelnetElement(&sql, sqlEnd, &timestamp, &tLen);
if (!timestamp || tLen == 0) {
smlParseTelnetElement(&sql, sqlEnd, &elements->timestamp, &elements->timestampLen);
if (unlikely(!elements->timestamp || elements->timestampLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
return TSDB_CODE_SML_INVALID_DATA;
}
int32_t ret = smlParseTS(info, timestamp, tLen, cols);
if (ret != TSDB_CODE_SUCCESS) {
bool needConverTime = false; // get TS before parse tag(get meta), so need conver time
if(info->dataFormat && info->currSTableMeta == NULL){
needConverTime = true;
}
int64_t ts = smlParseTS(info, elements->timestamp, elements->timestampLen);
if (unlikely(ts < 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
return ret;
return TSDB_CODE_INVALID_TIMESTAMP;
}
SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
// parse value
const char *value = NULL;
int32_t valueLen = 0;
smlParseTelnetElement(&sql, sqlEnd, &value, &valueLen);
if (!value || valueLen == 0) {
smlParseTelnetElement(&sql, sqlEnd, &elements->cols, &elements->colsLen);
if (unlikely(!elements->cols || elements->colsLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
return TSDB_CODE_TSC_INVALID_VALUE;
}
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
taosArrayPush(cols, &kv);
kv->key = VALUE;
kv->keyLen = VALUE_LEN;
kv->value = value;
kv->length = valueLen;
if ((ret = smlParseValue(kv, &info->msgBuf)) != TSDB_CODE_SUCCESS) {
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = elements->colsLen};
if (smlParseNumber(&kv, &info->msgBuf)) {
kv.length = (int16_t)tDataTypes[kv.type].bytes;
return TSDB_CODE_SUCCESS;
}else{
return TSDB_CODE_TSC_INVALID_VALUE;
}
// move measure before tags to combine keys to identify child table
memcpy(sql - elements->measureLen, elements->measure, elements->measureLen);
elements->measure = sql - elements->measureLen;
elements->measureLen += sqlEnd - sql;
int ret = smlParseTelnetTags(info, sql, sqlEnd, elements, &info->msgBuf);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
return ret;
}
// parse tags sml todo
ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, NULL, &info->msgBuf);
// ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
if(unlikely(info->reRun)){
return TSDB_CODE_SUCCESS;
}
if(info->dataFormat){
if(needConverTime) {
kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
}
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
if(ret == TSDB_CODE_SUCCESS){
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1);
}
if(ret == TSDB_CODE_SUCCESS){
ret = smlBuildRow(info->currTableDataCtx);
}
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;
}
}else{
taosArrayPush(elements->colArray, &kvTs);
taosArrayPush(elements->colArray, &kv);
}
info->preLine = *elements;
return TSDB_CODE_SUCCESS;
}
......@@ -1881,30 +2031,6 @@ void smlDestroyInfo(SSmlHandle *info) {
// destroy info->pVgHash
taosHashCleanup(info->pVgHash);
tmp = info->superTableTagKeyStr;
while (tmp) {
if(tmp->data.used) {
taosMemoryFree(tmp->data.value);
}
NodeList* t = tmp->next;
taosMemoryFree(tmp);
tmp = tmp->next;
}
tmp = info->superTableColKeyStr;
while (tmp) {
if(tmp->data.used) {
taosMemoryFree(tmp->data.value);
}
NodeList* t = tmp->next;
taosMemoryFree(tmp);
tmp = tmp->next;
}
taosMemoryFree(info->currentLineTagKeys);
taosMemoryFree(info->preLineTagKeys);
taosMemoryFree(info->currentLineColKeys);
taosMemoryFree(info->preLineColKeys);
taosArrayDestroy(info->preLineTagKV);
taosArrayDestroy(info->preLineColKV);
......@@ -1936,6 +2062,9 @@ static SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
info->pQuery = smlInitHandle();
info->dataFormat = true;
info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv));
if (NULL == info->pVgHash) {
uError("create SSmlHandle failed");
goto cleanup;
......@@ -1949,96 +2078,86 @@ cleanup:
}
/************* TSDB_SML_JSON_PROTOCOL function start **************/
static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo) {
static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
cJSON *metric = cJSON_GetObjectItem(root, "metric");
if (!cJSON_IsString(metric)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
tinfo->sTableNameLen = strlen(metric->valuestring);
if (IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
elements->measureLen = strlen(metric->valuestring);
if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
uError("OTD:0x%" PRIx64 " Metric lenght is 0 or large than 192", info->id);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
tinfo->sTableName = metric->valuestring;
elements->measure = metric->valuestring;
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int64_t *tsVal) {
static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPrecision) {
int32_t size = cJSON_GetArraySize(root);
if (size != OTD_JSON_SUB_FIELDS_NUM) {
return TSDB_CODE_TSC_INVALID_JSON;
if (unlikely(size != OTD_JSON_SUB_FIELDS_NUM)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
}
cJSON *value = cJSON_GetObjectItem(root, "value");
if (!cJSON_IsNumber(value)) {
return TSDB_CODE_TSC_INVALID_JSON;
if (unlikely(!cJSON_IsNumber(value))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
}
cJSON *type = cJSON_GetObjectItem(root, "type");
if (!cJSON_IsString(type)) {
return TSDB_CODE_TSC_INVALID_JSON;
if (unlikely(!cJSON_IsString(type))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
}
double timeDouble = value->valuedouble;
if (smlDoubleToInt64OverFlow(timeDouble)) {
if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_INVALID_TIMESTAMP;
return -1;
}
if (timeDouble == 0) {
*tsVal = taosGetTimestampNs();
return TSDB_CODE_SUCCESS;
return taosGetTimestampNs()/smlFactorNS[toPrecision];
}
if (timeDouble < 0) {
return TSDB_CODE_INVALID_TIMESTAMP;
return timeDouble;
}
*tsVal = timeDouble;
int64_t tsInt64 = timeDouble;
size_t typeLen = strlen(type->valuestring);
if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
// seconds
*tsVal = *tsVal * NANOSECOND_PER_SEC;
timeDouble = timeDouble * NANOSECOND_PER_SEC;
if (smlDoubleToInt64OverFlow(timeDouble)) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_INVALID_TIMESTAMP;
int8_t fromPrecision = TSDB_TIME_PRECISION_SECONDS;
if(smlFactorS[toPrecision] < INT64_MAX / tsInt64){
return tsInt64 * smlFactorS[toPrecision];
}
return -1;
} else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
switch (type->valuestring[0]) {
case 'm':
case 'M':
// milliseconds
*tsVal = *tsVal * NANOSECOND_PER_MSEC;
timeDouble = timeDouble * NANOSECOND_PER_MSEC;
if (smlDoubleToInt64OverFlow(timeDouble)) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_INVALID_TIMESTAMP;
}
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MILLI, toPrecision);
break;
case 'u':
case 'U':
// microseconds
*tsVal = *tsVal * NANOSECOND_PER_USEC;
timeDouble = timeDouble * NANOSECOND_PER_USEC;
if (smlDoubleToInt64OverFlow(timeDouble)) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_INVALID_TIMESTAMP;
}
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MICRO, toPrecision);
break;
case 'n':
case 'N':
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_NANO, toPrecision);
break;
default:
return TSDB_CODE_TSC_INVALID_JSON;
return -1;
}
} else {
return TSDB_CODE_TSC_INVALID_JSON;
return -1;
}
return TSDB_CODE_SUCCESS;
}
static uint8_t smlGetTimestampLen(int64_t num) {
......@@ -2050,60 +2169,42 @@ static uint8_t smlGetTimestampLen(int64_t num) {
return len;
}
static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) {
static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root) {
// Timestamp must be the first KV to parse
int64_t tsVal = 0;
int32_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;
cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
if (cJSON_IsNumber(timestamp)) {
// timestamp value 0 indicates current system time
double timeDouble = timestamp->valuedouble;
if (smlDoubleToInt64OverFlow(timeDouble)) {
if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_INVALID_TIMESTAMP;
return -1;
}
if (timeDouble < 0) {
return TSDB_CODE_INVALID_TIMESTAMP;
if (unlikely(timeDouble < 0)) {
smlBuildInvalidDataMsg(&info->msgBuf,
"timestamp is negative", NULL);
return timeDouble;
}else if (unlikely(timeDouble == 0)) {
return taosGetTimestampNs()/smlFactorNS[toPrecision];
}
uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
tsVal = (int64_t)timeDouble;
if (tsLen == TSDB_TIME_PRECISION_SEC_DIGITS) {
tsVal = tsVal * NANOSECOND_PER_SEC;
timeDouble = timeDouble * NANOSECOND_PER_SEC;
if (smlDoubleToInt64OverFlow(timeDouble)) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_INVALID_TIMESTAMP;
}
} else if (tsLen == TSDB_TIME_PRECISION_MILLI_DIGITS) {
tsVal = tsVal * NANOSECOND_PER_MSEC;
timeDouble = timeDouble * NANOSECOND_PER_MSEC;
if (smlDoubleToInt64OverFlow(timeDouble)) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_INVALID_TIMESTAMP;
}
} else if (timeDouble == 0) {
tsVal = taosGetTimestampNs();
} else {
return TSDB_CODE_INVALID_TIMESTAMP;
int8_t fromPrecision = smlGetTsTypeByLen(tsLen);
if (unlikely(fromPrecision == -1)) {
smlBuildInvalidDataMsg(&info->msgBuf,
"timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", NULL);
return -1;
}
return convertTimePrecision(timeDouble, fromPrecision, toPrecision);
} else if (cJSON_IsObject(timestamp)) {
int32_t ret = smlParseTSFromJSONObj(info, timestamp, &tsVal);
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " Failed to parse timestamp from JSON Obj", info->id);
return ret;
}
return smlParseTSFromJSONObj(info, timestamp, toPrecision);
} else {
return TSDB_CODE_TSC_INVALID_JSON;
smlBuildInvalidDataMsg(&info->msgBuf,
"invalidate json", NULL);
return -1;
}
// add ts to
SSmlKv kv = {.key = TS, .keyLen = TS_LEN, .i = tsVal,
.type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
taosArrayPush(cols, &kv);
return TSDB_CODE_SUCCESS;
}
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
......@@ -2299,123 +2400,243 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
if (!cols) return TSDB_CODE_OUT_OF_MEMORY;
static int32_t smlParseColsFromJSON(cJSON *root, SSmlKv *kv) {
cJSON *metricVal = cJSON_GetObjectItem(root, "value");
if (metricVal == NULL) {
return TSDB_CODE_TSC_INVALID_JSON;
}
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN};
int32_t ret = smlParseValueFromJSON(metricVal, &kv);
int32_t ret = smlParseValueFromJSON(metricVal, kv);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
taosArrayPush(cols, &kv);
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey,
SSmlMsgBuf *msg) {
static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS;
if (!pKVs) {
return TSDB_CODE_OUT_OF_MEMORY;
}
cJSON *tags = cJSON_GetObjectItem(root, "tags");
if (tags == NULL || tags->type != cJSON_Object) {
if (unlikely(tags == NULL || tags->type != cJSON_Object)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
size_t childTableNameLen = strlen(tsSmlChildTableName);
// add measure to tags to identify one child table
cJSON *cMeasure = cJSON_AddStringToObject(tags, JSON_METERS_NAME, elements->measure);
if(unlikely(cMeasure == NULL)){
return TSDB_CODE_TSC_INVALID_JSON;
}
if(is_same_child_table_json(elements->tags, info->preLine.tags) == 0){
return TSDB_CODE_SUCCESS;
}
bool isSameMeasure = IS_SAME_SUPER_TABLE;
int cnt = 0;
SArray *preLineKV = info->preLineTagKV;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if(info->dataFormat){
if(unlikely(!isSameMeasure)){
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
if(unlikely(sMeta == NULL)){
sMeta = smlBuildSTableMeta(info->dataFormat);
STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
sMeta->tableMeta = pTableMeta;
if(pTableMeta == NULL){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta);
}
info->currSTableMeta = sMeta->tableMeta;
superKV = sMeta->tags;
if(unlikely(taosArrayGetSize(superKV) == 0)){
isSuperKVInit = false;
}
taosArraySetSize(preLineKV, 0);
}
}else{
taosArraySetSize(preLineKV, 0);
}
int32_t tagNum = cJSON_GetArraySize(tags);
for (int32_t i = 0; i < tagNum; ++i) {
cJSON *tag = cJSON_GetArrayItem(tags, i);
if (tag == NULL) {
if (unlikely(tag == NULL)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
if(unlikely(tag == cMeasure)) continue;
size_t keyLen = strlen(tag->string);
if (IS_INVALID_COL_LEN(keyLen)) {
if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
uError("OTD:Tag key length is 0 or too large than 64");
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
// check duplicate keys
if (smlCheckDuplicateKey(tag->string, keyLen, dumplicateKey)) {
return TSDB_CODE_TSC_DUP_NAMES;
}
// handle child table name
if (childTableNameLen != 0 && strcmp(tag->string, tsSmlChildTableName) == 0) {
if (!cJSON_IsString(tag)) {
uError("OTD:ID must be JSON string");
return TSDB_CODE_TSC_INVALID_JSON;
}
memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
tstrncpy(childTableName, tag->valuestring, TSDB_TABLE_NAME_LEN);
continue;
}
// add kv to SSmlKv
SSmlKv kv ={.key = tag->string, .keyLen = keyLen};
// value
ret = smlParseValueFromJSON(tag, &kv);
if (ret != TSDB_CODE_SUCCESS) {
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
return ret;
}
taosArrayPush(pKVs, &kv);
if(info->dataFormat){
if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
info->needModifySchema = true;
}
if(isSameMeasure){
if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
if(unlikely(kv.length > preKV->length)){
preKV->length = kv.length;
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
ASSERT(tableMeta != NULL);
SSmlKv *oldKV = taosArrayGet(tableMeta->tags, cnt);
oldKV->length = kv.length;
info->needModifySchema = true;
}
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
if(isSuperKVInit){
if(unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = taosArrayGet(superKV, cnt);
if(unlikely(kv.length > preKV->length)) {
preKV->length = kv.length;
}else{
kv.length = preKV->length;
}
info->needModifySchema = true;
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
taosArrayPush(superKV, &kv);
}
taosArrayPush(preLineKV, &kv);
}
}else{
taosArrayPush(preLineKV, &kv);
}
cnt++;
}
void* oneTable = nodeListGet(info->childTables, elements->tags, POINTER_BYTES, is_same_child_table_json);
if ((oneTable != NULL)) {
return TSDB_CODE_SUCCESS;
}
SSmlTableInfo *tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
if (unlikely(!tinfo)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for(int i = 0; i < taosArrayGetSize(preLineKV); i++){
taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
}
smlSetCTableName(tinfo);
if(info->dataFormat) {
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if(tinfo->tableDataCtx == NULL){
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
}
nodeListSet(&info->childTables, tags, POINTER_BYTES, tinfo);
return ret;
}
static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo, SArray *cols) {
static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS;
if (!cJSON_IsObject(root)) {
uError("OTD:0x%" PRIx64 " data point needs to be JSON object", info->id);
return TSDB_CODE_TSC_INVALID_JSON;
}
int32_t size = cJSON_GetArraySize(root);
// outmost json fields has to be exactly 4
if (size != OTD_JSON_FIELDS_NUM) {
if (unlikely(size != OTD_JSON_FIELDS_NUM)) {
uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
return TSDB_CODE_TSC_INVALID_JSON;
}
// Parse metric
ret = smlParseMetricFromJSON(info, root, tinfo);
if (ret != TSDB_CODE_SUCCESS) {
ret = smlParseMetricFromJSON(info, root, elements);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
return ret;
}
uDebug("OTD:0x%" PRIx64 " Parse metric from JSON payload finished", info->id);
// Parse timestamp
ret = smlParseTSFromJSON(info, root, cols);
if (ret) {
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
return ret;
}
uDebug("OTD:0x%" PRIx64 " Parse timestamp from JSON payload finished", info->id);
// Parse metric value
ret = smlParseColsFromJSON(root, cols);
if (ret) {
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN};
ret = smlParseColsFromJSON(root, &kv);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
return ret;
}
uDebug("OTD:0x%" PRIx64 " Parse metric value from JSON payload finished", info->id);
// Parse tags sml todo
ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, NULL, &info->msgBuf);
// ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
if (ret) {
// Parse tags
ret = smlParseTagsFromJSON(info, root, elements);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
return ret;
}
uDebug("OTD:0x%" PRIx64 " Parse tags from JSON payload finished", info->id);
if(unlikely(info->reRun)){
return TSDB_CODE_SUCCESS;
}
// Parse timestamp
// notice!!! put ts back to tag to ensure get meta->precision
int64_t ts = smlParseTSFromJSON(info, root);
if (unlikely(ts < 0)) {
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
return TSDB_CODE_INVALID_TIMESTAMP;
}
uDebug("OTD:0x%" PRIx64 " Parse timestamp from JSON payload finished", info->id);
SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
if(info->dataFormat){
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
if(ret == TSDB_CODE_SUCCESS){
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1);
}
if(ret == TSDB_CODE_SUCCESS){
ret = smlBuildRow(info->currTableDataCtx);
}
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;
}
}else{
taosArrayPush(elements->colArray, &kvTs);
taosArrayPush(elements->colArray, &kv);
}
info->preLine = *elements;
return TSDB_CODE_SUCCESS;
}
/************* TSDB_SML_JSON_PROTOCOL function end **************/
......@@ -2424,14 +2645,24 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
for(int32_t i = 0; i < info->lineNum; i ++){
SSmlLineInfo* elements = info->lines + i;
SSmlTableInfo *tinfo =
(SSmlTableInfo *)nodeListGet(info->childTables, elements->measure, elements->measureTagsLen);
SSmlTableInfo *tinfo = NULL;
if(info->protocol != TSDB_SML_JSON_PROTOCOL){
tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->measure, elements->measureTagsLen, NULL);
}else{
tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->tags, POINTER_BYTES, is_same_child_table_json);
}
if(tinfo == NULL){
uError("SML:0x%" PRIx64 "get oneTable failed, line num:%d", info->id, i);
smlBuildInvalidDataMsg(&info->msgBuf, "get oneTable failed", elements->measure);
return TSDB_CODE_SML_INVALID_DATA;
}
if (taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
return TSDB_CODE_PAR_INVALID_TAGS_NUM;
}
if (taosArrayGetSize(elements->colArray) + taosArrayGetSize(tinfo->tags) > TSDB_MAX_COLUMNS) {
smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
......@@ -2442,7 +2673,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
return ret;
}
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen);
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
if (tableMeta) { // update meta
ret = smlUpdateMeta(tableMeta->colHash, tableMeta->cols, elements->colArray, false, &info->msgBuf);
if (ret == TSDB_CODE_SUCCESS) {
......@@ -2469,117 +2700,85 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) {
int ret = TSDB_CODE_SUCCESS;
SSmlTableInfo *tinfo = smlBuildTableInfo(1, "", 0);
if (!tinfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SArray *cols = taosArrayInit(16, POINTER_BYTES);
if (cols == NULL) {
uError("SML:0x%" PRIx64 " smlParseTelnetLine failed to allocate memory", info->id);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
ret = smlParseTelnetString(info, (const char *)data, (char *)data + len, tinfo, cols);
} else if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
} else {
ASSERT(0);
}
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlParseTelnetLine failed", info->id);
smlDestroyTableInfo(tinfo);
taosArrayDestroy(cols);
return ret;
}
if (taosArrayGetSize(tinfo->tags) <= 0 || taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate tags length:[1,128]", NULL);
smlDestroyTableInfo(tinfo);
taosArrayDestroy(cols);
return TSDB_CODE_PAR_INVALID_TAGS_NUM;
}
if (strlen(tinfo->childTableName) == 0) {
RandTableName rName = {tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen, tinfo->childTableName, 0};
buildChildTableName(&rName);
tinfo->uid = rName.uid;
} else {
tinfo->uid = *(uint64_t *)(tinfo->childTableName); // generate uid by name simple
}
bool hasTable = true;
SSmlTableInfo *oneTable =
(SSmlTableInfo *)nodeListGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName));
if (!oneTable) {
nodeListSet(&info->childTables, tinfo->childTableName, strlen(tinfo->childTableName), tinfo);
oneTable = tinfo;
hasTable = false;
} else {
smlDestroyTableInfo(tinfo);
}
taosArrayPush(oneTable->cols, &cols);
SSmlSTableMeta *tableMeta =
(SSmlSTableMeta *)nodeListGet(info->superTables, oneTable->sTableName, oneTable->sTableNameLen);
if (tableMeta) { // update meta
ret = smlUpdateMeta(tableMeta->colHash, tableMeta->cols, cols, false, &info->msgBuf);
if (!hasTable && ret == TSDB_CODE_SUCCESS) {
ret = smlUpdateMeta(tableMeta->tagHash, tableMeta->tags, oneTable->tags, true, &info->msgBuf);
}
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
return ret;
}
} else {
SSmlSTableMeta *meta = smlBuildSTableMeta(false);
smlInsertMeta(meta->tagHash, meta->tags, oneTable->tags);
smlInsertMeta(meta->colHash, meta->cols, cols);
nodeListSet(&info->superTables, oneTable->sTableName, oneTable->sTableNameLen, meta);
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseJSON(SSmlHandle *info, char *payload) {
int32_t payloadNum = 0;
int32_t ret = TSDB_CODE_SUCCESS;
if (payload == NULL) {
if (unlikely(payload == NULL)) {
uError("SML:0x%" PRIx64 " empty JSON Payload", info->id);
return TSDB_CODE_TSC_INVALID_JSON;
}
info->root = cJSON_Parse(payload);
if (info->root == NULL) {
if (unlikely(info->root == NULL)) {
uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload);
return TSDB_CODE_TSC_INVALID_JSON;
}
// multiple data points must be sent in JSON array
if (cJSON_IsObject(info->root)) {
payloadNum = 1;
} else if (cJSON_IsArray(info->root)) {
if (cJSON_IsArray(info->root)) {
payloadNum = cJSON_GetArraySize(info->root);
} else if (cJSON_IsObject(info->root)) {
payloadNum = 1;
} else {
uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
ret = TSDB_CODE_TSC_INVALID_JSON;
goto end;
return TSDB_CODE_TSC_INVALID_JSON;
}
for (int32_t i = 0; i < payloadNum; ++i) {
int32_t i = 0;
while (i < payloadNum) {
cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : cJSON_GetArrayItem(info->root, i);
ret = smlParseTelnetLine(info, dataPoint, -1);
if (ret != TSDB_CODE_SUCCESS) {
if(info->dataFormat) {
SSmlLineInfo element = {0};
ret = smlParseJSONString(info, dataPoint, &element);
}else{
ret = smlParseJSONString(info, dataPoint, info->lines + i);
}
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
goto end;
return ret;
}
if(unlikely(info->reRun)){
i = 0;
info->reRun = false;
// clear info->childTables
NodeList* pList = info->childTables;
while (pList) {
if(pList->data.used) {
smlDestroyTableInfo(pList->data.value);
pList->data.used = false;
}
pList = pList->next;
}
end:
return ret;
// clear info->superTables
pList = info->superTables;
while (pList) {
if(pList->data.used) {
smlDestroySTableMeta(pList->data.value);
pList->data.used = false;
}
pList = pList->next;
}
if(unlikely(info->lines != NULL)){
uError("SML:0x%" PRIx64 " info->lines != NULL", info->id);
return TSDB_CODE_SML_INVALID_DATA;
}
info->lineNum = payloadNum;
info->lines = taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));
for(int j = 0; j < info->lineNum; j++){
info->lines[j].colArray = taosArrayInit(8, sizeof(SSmlKv));
}
memset(&info->preLine, 0, sizeof(SSmlLineInfo));
SVnodeModifOpStmt* stmt= (SVnodeModifOpStmt*)(info->pQuery->pRoot);
stmt->freeHashFunc(stmt->pTableBlockHashObj);
stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
continue;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlInsertData(SSmlHandle *info) {
......@@ -2608,7 +2807,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
SSmlSTableMeta *pMeta =
(SSmlSTableMeta *)nodeListGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
(SSmlSTableMeta *)nodeListGet(info->superTables, tableData->sTableName, tableData->sTableNameLen, NULL);
ASSERT(NULL != pMeta);
// use tablemeta of stable to save vgid and uid of child table
......@@ -2697,7 +2896,13 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
code = smlParseInfluxString(info, tmp, tmp + len, info->lines + i);
}
} else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
code = smlParseTelnetLine(info, tmp, len);
if(info->dataFormat) {
SSmlLineInfo element = {0};
code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, &element);
}else{
code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, info->lines + i);
}
} else {
ASSERT(0);
}
......@@ -2734,6 +2939,13 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
}
info->lines = taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));
for(int j = 0; j < info->lineNum; j++){
info->lines[j].colArray = taosArrayInit(8, sizeof(SSmlKv));
}
memset(&info->preLine, 0, sizeof(SSmlLineInfo));
SVnodeModifOpStmt* stmt= (SVnodeModifOpStmt*)(info->pQuery->pRoot);
stmt->freeHashFunc(stmt->pTableBlockHashObj);
stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
continue;
}
i++;
......
......@@ -301,7 +301,9 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
SSchema* pColSchema = &pSchema[pTableCxt->boundColsInfo.pColIndex[c]];
SColVal* pVal = taosArrayGet(pTableCxt->pValues, pTableCxt->boundColsInfo.pColIndex[c]);
void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
ASSERT(p != NULL);
if (p == NULL) {
continue;
}
SSmlKv *kv = *(SSmlKv **)p;
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
......@@ -365,7 +367,7 @@ SQuery* smlInitHandle() {
qDestroyQuery(pQuery);
return NULL;
}
stmt->pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
stmt->freeHashFunc = insDestroyTableDataCxtHashMap;
stmt->freeArrayFunc = insDestroyVgroupDataCxtList;
......
......@@ -1091,9 +1091,10 @@ int sml_ts2164_Test() {
taos_free_result(pRes);
const char *sql[] = {
// "meters,location=la,groupid=ca current=11.8,voltage=221,phase=0.27",
"meters,location=la,groupid=ca current=11.8,voltage=221",
"meters,location=la,groupid=ca current=11.8,voltage=221,phase=0.27",
"meters,location=la,groupid=ca current=11.8,voltage=221,phase=0.27",
"meters,location=la,groupid=cb current=11.8,voltage=221,phase=0.27",
// "meters,location=la,groupid=cb current=11.8,voltage=221,phase=0.27",
};
pRes = taos_query(taos, "use line_test");
......@@ -1150,8 +1151,8 @@ int sml_ttl_Test() {
int main(int argc, char *argv[]) {
int ret = 0;
ret = sml_ttl_Test();
ASSERT(!ret);
// ret = sml_ttl_Test();
// ASSERT(!ret);
ret = sml_ts2164_Test();
ASSERT(!ret);
ret = smlProcess_influx_Test();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册