提交 167f4081 编写于 作者: wmmhello's avatar wmmhello

refactor: add telnet and json protocol for schemaless

上级 f6889f4b
...@@ -75,7 +75,7 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* ...@@ -75,7 +75,7 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
void* smlInitHandle(SQuery* pQuery); void* smlInitHandle(SQuery* pQuery);
void smlDestroyHandle(void* pHandle); void smlDestroyHandle(void* pHandle);
int32_t smlBindData(void* handle, SArray* tags, SArray* colsFormat, SArray* colsSchema, SArray* cols, bool format, int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format,
STableMeta* pTableMeta, char* tableName, char* msgBuf, int16_t msgBufLen); STableMeta* pTableMeta, char* tableName, char* msgBuf, int16_t msgBufLen);
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash); int32_t smlBuildOutput(void* handle, SHashObj* pVgHash);
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include "catalog.h" #include "catalog.h"
#include "clientInt.h" #include "clientInt.h"
#include "tname.h" #include "tname.h"
#include "cJSON.h"
//================================================================================================= //=================================================================================================
#define SPACE ' ' #define SPACE ' '
...@@ -25,6 +26,22 @@ ...@@ -25,6 +26,22 @@
#define SLASH '\\' #define SLASH '\\'
#define tsMaxSQLStringLen (1024*1024) #define tsMaxSQLStringLen (1024*1024)
#define OTD_MAX_FIELDS_NUM 2
#define OTD_JSON_SUB_FIELDS_NUM 2
#define OTD_JSON_FIELDS_NUM 4
#define OTD_TIMESTAMP_COLUMN_NAME "ts"
#define OTD_METRIC_VALUE_COLUMN_NAME "value"
#define TS "_ts"
#define TS_LEN 3
#define TAG "_tagNone"
#define TAG_LEN 8
#define VALUE "value"
#define VALUE_LEN 5
#define BINARY_ADD_LEN 2 // "binary" 2 means " "
#define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" "
//================================================================================================= //=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType; typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
...@@ -70,17 +87,15 @@ typedef struct { ...@@ -70,17 +87,15 @@ typedef struct {
typedef struct { typedef struct {
const char *sTableName; // super table name const char *sTableName; // super table name
uint8_t sTableNameLen; int32_t sTableNameLen;
char childTableName[TSDB_TABLE_NAME_LEN]; char childTableName[TSDB_TABLE_NAME_LEN];
uint64_t uid; uint64_t uid;
SArray *tags; SArray *tags;
// colsFormat store cols formated, for quick parse, if info->formatData is true // if info->formatData is true, elements are SArray<SSmlKv*>.
SArray *colsFormat; // elements are SArray<SSmlKv*> // if info->formatData is false, elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
SArray *cols;
// cols store cols un formated
SArray *cols; // elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
} SSmlTableInfo; } SSmlTableInfo;
typedef struct { typedef struct {
...@@ -114,11 +129,11 @@ typedef struct { ...@@ -114,11 +129,11 @@ typedef struct {
} SSmlCostInfo; } SSmlCostInfo;
typedef struct { typedef struct {
uint64_t id; int64_t id;
SMLProtocolType protocol; SMLProtocolType protocol;
int8_t precision; int8_t precision;
bool dataFormat; // true means that the name, number and order of keys in each line are the same bool dataFormat; // true means that the name, number and order of keys in each line are the same(only for influx protocol)
SHashObj *childTables; SHashObj *childTables;
SHashObj *superTables; SHashObj *superTables;
...@@ -134,16 +149,12 @@ typedef struct { ...@@ -134,16 +149,12 @@ typedef struct {
int32_t affectedRows; int32_t affectedRows;
SSmlMsgBuf msgBuf; SSmlMsgBuf msgBuf;
SHashObj *dumplicateKey; // for dumplicate key SHashObj *dumplicateKey; // for dumplicate key
SArray *colsContainer; // for cols parse, if is dataFormat == false SArray *colsContainer; // for cols parse, if dataFormat == false
} SSmlHandle; } SSmlHandle;
//================================================================================================= //=================================================================================================
static volatile int64_t linesSmlHandleId = 0;
static const char* TS = "_ts";
static const char* TAG = "_tagNone";
//================================================================================================= //=================================================================================================
static volatile int64_t linesSmlHandleId = 0;
static int64_t smlGenId() { static int64_t smlGenId() {
int64_t id; int64_t id;
...@@ -154,6 +165,20 @@ static int64_t smlGenId() { ...@@ -154,6 +165,20 @@ static int64_t smlGenId() {
return id; return id;
} }
static inline bool smlDoubleToInt64OverFlow(double num) {
if(num >= (double)INT64_MAX || num <= (double)INT64_MIN) return true;
return false;
}
static inline bool smlCheckDuplicateKey(const char *key, int32_t keyLen, SHashObj *pHash) {
void *val = taosHashGet(pHash, key, keyLen);
if (val) {
return true;
}
taosHashPut(pHash, key, keyLen, key, 1);
return false;
}
static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const char *msg2) { static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const char *msg2) {
if(msg1) strncat(pBuf->buf, msg1, pBuf->len); if(msg1) strncat(pBuf->buf, msg1, pBuf->len);
int32_t left = pBuf->len - strlen(pBuf->buf); int32_t left = pBuf->len - strlen(pBuf->buf);
...@@ -480,294 +505,99 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { ...@@ -480,294 +505,99 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
// *pos = cur; // *pos = cur;
//} //}
static bool smlParseTinyInt(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg){
const char *pVal = kvVal->value; const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen; int32_t len = kvVal->valueLen;
if (len <= 2) { char *endptr = NULL;
double result = strtod(pVal, &endptr);
if(pVal == endptr){
smlBuildInvalidDataMsg(msg, "invalid data", pVal);
return false; return false;
} }
const char *signalPos = pVal + len - 2;
if (!strncasecmp(signalPos, "i8", 2)) {
char *endptr = NULL;
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
smlBuildInvalidDataMsg(msg, "invalid tiny int", endptr);
}else if(!IS_VALID_TINYINT(result)){
*isValid = false;
smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", endptr);
}else{
kvVal->i = result;
*isValid = true;
}
return true;
}
return false;
}
static bool smlParseTinyUint(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { int32_t left = len - (endptr - pVal);
const char *pVal = kvVal->value; if(left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)){
int32_t len = kvVal->valueLen; kvVal->type = TSDB_DATA_TYPE_DOUBLE;
if (len <= 2) { kvVal->d = result;
return false; }else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)){
} if(!IS_VALID_FLOAT(result)){
if (pVal[0] == '-') { smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
return false; return false;
}
const char *signalPos = pVal + len - 2;
if (!strncasecmp(signalPos, "u8", 2)) {
char *endptr = NULL;
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
smlBuildInvalidDataMsg(msg, "invalid unsigned tiny int", endptr);
}else if(!IS_VALID_UTINYINT(result)){
*isValid = false;
smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", endptr);
}else{
kvVal->i = result;
*isValid = true;
} }
return true; kvVal->type = TSDB_DATA_TYPE_FLOAT;
} kvVal->f = (float)result;
return false; }else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)){
} if(smlDoubleToInt64OverFlow(result)){
smlBuildInvalidDataMsg(msg, "big int is too large, out of precision", pVal);
static bool smlParseSmallInt(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { return false;
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
return false;
}
const char *signalPos = pVal + len - 3;
if (!strncasecmp(signalPos, "i16", 3)) {
char *endptr = NULL;
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
smlBuildInvalidDataMsg(msg, "invalid small int", endptr);
}else if(!IS_VALID_SMALLINT(result)){
*isValid = false;
smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", endptr);
}else{
kvVal->i = result;
*isValid = true;
} }
return true; kvVal->type = TSDB_DATA_TYPE_BIGINT;
} kvVal->i = (int64_t)result;
return false; }else if ((left == 3 && strncasecmp(endptr, "u64", left) == 0)){
} if(result >= (double)UINT64_MAX || result < 0){
smlBuildInvalidDataMsg(msg, "unsigned big int is too large, out of precision", pVal);
static bool smlParseSmallUint(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { return false;
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
return false;
}
if (pVal[0] == '-') {
return false;
}
const char *signalPos = pVal + len - 3;
if (strncasecmp(signalPos, "u16", 3) == 0) {
char *endptr = NULL;
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
smlBuildInvalidDataMsg(msg, "invalid unsigned small int", endptr);
}else if(!IS_VALID_USMALLINT(result)){
*isValid = false;
smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", endptr);
}else{
kvVal->i = result;
*isValid = true;
} }
return true; kvVal->type = TSDB_DATA_TYPE_UBIGINT;
} kvVal->u = result;
return false; }else if (left == 3 && strncasecmp(endptr, "i32", left) == 0){
} if(!IS_VALID_INT(result)){
smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
static bool smlParseInt(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { return false;
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
return false;
}
const char *signalPos = pVal + len - 3;
if (strncasecmp(signalPos, "i32", 3) == 0) {
char *endptr = NULL;
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
smlBuildInvalidDataMsg(msg, "invalid int", endptr);
}else if(!IS_VALID_INT(result)){
*isValid = false;
smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", endptr);
}else{
kvVal->i = result;
*isValid = true;
} }
return true; kvVal->type = TSDB_DATA_TYPE_INT;
} kvVal->i = result;
return false; }else if (left == 3 && strncasecmp(endptr, "u32", left) == 0){
} if(!IS_VALID_UINT(result)){
smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
static bool smlParseUint(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { return false;
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
return false;
}
if (pVal[0] == '-') {
return false;
}
const char *signalPos = pVal + len - 3;
if (strncasecmp(signalPos, "u32", 3) == 0) {
char *endptr = NULL;
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
smlBuildInvalidDataMsg(msg, "invalid unsigned int", endptr);
}else if(!IS_VALID_UINT(result)){
*isValid = false;
smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", endptr);
}else{
kvVal->i = result;
*isValid = true;
} }
return true; kvVal->type = TSDB_DATA_TYPE_UINT;
} kvVal->u = result;
return false; }else if (left == 3 && strncasecmp(endptr, "i16", left) == 0){
} if(!IS_VALID_SMALLINT(result)){
smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
static bool smlParseBigInt(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { return false;
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len > 3 && strncasecmp(pVal + len - 3, "i64", 3) == 0) {
char *endptr = NULL;
errno = 0;
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != pVal + len - 3){ // 78ri8
*isValid = false;
smlBuildInvalidDataMsg(msg, "invalid big int", endptr);
}else if(errno == ERANGE || !IS_VALID_BIGINT(result)){
*isValid = false;
smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", endptr);
}else{
kvVal->i = result;
*isValid = true;
} }
return true; kvVal->type = TSDB_DATA_TYPE_SMALLINT;
}else if (len > 1 && pVal[len - 1] == 'i') { kvVal->i = result;
char *endptr = NULL; }else if (left == 3 && strncasecmp(endptr, "u16", left) == 0){
errno = 0; if(!IS_VALID_USMALLINT(result)){
int64_t result = strtoll(pVal, &endptr, 10); smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
if(endptr != pVal + len - 1){ // 78ri8 return false;
*isValid = false;
smlBuildInvalidDataMsg(msg, "invalid big int", endptr);
}else if(errno == ERANGE || !IS_VALID_BIGINT(result)){
*isValid = false;
smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", endptr);
}else{
kvVal->i = result;
*isValid = true;
} }
return true; kvVal->type = TSDB_DATA_TYPE_USMALLINT;
} kvVal->u = result;
return false; }else if (left == 2 && strncasecmp(endptr, "i8", left) == 0){
} if(!IS_VALID_TINYINT(result)){
smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
static bool smlParseBigUint(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { return false;
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
return false;
}
if (pVal[0] == '-') {
return false;
}
const char *signalPos = pVal + len - 3;
if (strncasecmp(signalPos, "u64", 3) == 0) {
char *endptr = NULL;
errno = 0;
uint64_t result = strtoull(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
smlBuildInvalidDataMsg(msg, "invalid unsigned big int", endptr);
}else if(errno == ERANGE || !IS_VALID_UBIGINT(result)){
*isValid = false;
smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", endptr);
}else{
kvVal->u = result;
*isValid = true;
} }
return true; kvVal->type = TSDB_DATA_TYPE_TINYINT;
} kvVal->i = result;
return false; }else if (left == 2 && strncasecmp(endptr, "u8", left) == 0){
} if(!IS_VALID_UTINYINT(result)){
smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
static bool smlParseFloat(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { return false;
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
char *endptr = NULL;
errno = 0;
float result = strtof(pVal, &endptr);
if(endptr == pVal + len && errno != ERANGE && IS_VALID_FLOAT(result)){ // 78
kvVal->f = result;
*isValid = true;
return true;
}
if (len > 3 && strncasecmp(pVal + len - 3, "f32", 3) == 0) {
if(endptr != pVal + len - 3){ // 78ri8
*isValid = false;
smlBuildInvalidDataMsg(msg, "invalid float", endptr);
}else if(errno == ERANGE || !IS_VALID_FLOAT(result)){
*isValid = false;
smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", endptr);
}else{
kvVal->f = result;
*isValid = true;
} }
return true; kvVal->type = TSDB_DATA_TYPE_UTINYINT;
} kvVal->u = result;
return false; }else{
} smlBuildInvalidDataMsg(msg, "invalid data", pVal);
static bool smlParseDouble(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
return false; return false;
} }
const char *signalPos = pVal + len - 3; return true;
if (strncasecmp(signalPos, "f64", 3) == 0) {
char *endptr = NULL;
errno = 0;
double result = strtod(pVal, &endptr);
if(endptr != signalPos){ // 78ri8
*isValid = false;
smlBuildInvalidDataMsg(msg, "invalid double", endptr);
}else if(errno == ERANGE || !IS_VALID_DOUBLE(result)){
*isValid = false;
smlBuildInvalidDataMsg(msg, "double out of range[-1.7976931348623158e+308,1.7976931348623158e+308]", endptr);
}else{
kvVal->d = result;
*isValid = true;
}
return true;
}
return false;
} }
static bool smlParseBool(SSmlKv *kvVal) { static bool smlParseBool(SSmlKv *kvVal) {
const char *pVal = kvVal->value; const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen; int32_t len = kvVal->valueLen;
if ((len == 1) && pVal[len - 1] == 't') { if ((len == 1) && pVal[0] == 't') {
kvVal->i = true; kvVal->i = true;
return true; return true;
} }
if ((len == 1) && pVal[len - 1] == 'f') { if ((len == 1) && pVal[0] == 'f') {
kvVal->i = false; kvVal->i = false;
return true; return true;
} }
...@@ -805,102 +635,181 @@ static bool smlIsNchar(const char *pVal, uint16_t len) { ...@@ -805,102 +635,181 @@ static bool smlIsNchar(const char *pVal, uint16_t len) {
return false; return false;
} }
static bool smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) { static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
// put high probability matching type first char *endPtr = NULL;
bool isValid = false; double ts = (double)strtoll(value, &endPtr, 10);
if(value + len != endPtr){
//binary return -1;
if (smlIsBinary(pVal->value, pVal->valueLen)) {
pVal->type = TSDB_DATA_TYPE_BINARY;
pVal->valueLen -= 2;
pVal->length = pVal->valueLen;
pVal->value++;
return true;
} }
//nchar switch (type) {
if (smlIsNchar(pVal->value, pVal->valueLen)) { case TSDB_TIME_PRECISION_HOURS:
pVal->type = TSDB_DATA_TYPE_NCHAR; ts *= (3600 * 1e9);
pVal->valueLen -= 3; break;
pVal->length = pVal->valueLen; case TSDB_TIME_PRECISION_MINUTES:
pVal->value += 2; ts *= (60 * 1e9);
return true; break;
case TSDB_TIME_PRECISION_SECONDS:
ts *= (1e9);
break;
case TSDB_TIME_PRECISION_MILLI:
ts *= (1e6);
break;
case TSDB_TIME_PRECISION_MICRO:
ts *= (1e3);
break;
case TSDB_TIME_PRECISION_NANO:
break;
default:
ASSERT(0);
} }
//float if(ts >= (double)INT64_MAX || ts <= 0){
if (smlParseFloat(pVal, &isValid, msg)) { return -1;
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_FLOAT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return true;
} }
//double
if (smlParseDouble(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_DOUBLE;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return true; return (int64_t)ts;
} }
//bool
if (smlParseBool(pVal)) { static int64_t smlGetTimeNow(int8_t precision) {
pVal->type = TSDB_DATA_TYPE_BOOL; switch (precision) {
pVal->length = (int16_t)tDataTypes[pVal->type].bytes; case TSDB_TIME_PRECISION_HOURS:
return true; return taosGetTimestampMs()/1000/3600;
case TSDB_TIME_PRECISION_MINUTES:
return taosGetTimestampMs()/1000/60;
case TSDB_TIME_PRECISION_SECONDS:
return taosGetTimestampMs()/1000;
case TSDB_TIME_PRECISION_MILLI:
case TSDB_TIME_PRECISION_MICRO:
case TSDB_TIME_PRECISION_NANO:
return taosGetTimestamp(precision);
default:
ASSERT(0);
} }
}
if (smlParseTinyInt(pVal, &isValid, msg)) { static int8_t smlGetTsTypeByLen(int32_t len) {
if(!isValid) return false; if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
pVal->type = TSDB_DATA_TYPE_TINYINT; return TSDB_TIME_PRECISION_SECONDS;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes; } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
return true; return TSDB_TIME_PRECISION_MILLI_DIGITS;
} else {
return -1;
} }
if (smlParseTinyUint(pVal, &isValid, msg)) { }
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_UTINYINT; static int8_t smlGetTsTypeByPrecision(int8_t precision) {
pVal->length = (int16_t)tDataTypes[pVal->type].bytes; switch (precision) {
return true; case TSDB_SML_TIMESTAMP_HOURS:
return TSDB_TIME_PRECISION_HOURS;
case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
return TSDB_TIME_PRECISION_MILLI;
case TSDB_SML_TIMESTAMP_NANO_SECONDS:
case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
return TSDB_TIME_PRECISION_NANO;
case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
return TSDB_TIME_PRECISION_MICRO;
case TSDB_SML_TIMESTAMP_SECONDS:
return TSDB_TIME_PRECISION_SECONDS;
case TSDB_SML_TIMESTAMP_MINUTES:
return TSDB_TIME_PRECISION_MINUTES;
default:
return -1;
} }
if (smlParseSmallInt(pVal, &isValid, msg)) { }
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_SMALLINT; static int64_t smlParseInfluxTime(SSmlHandle* info, const char* data, int32_t len){
pVal->length = (int16_t)tDataTypes[pVal->type].bytes; int8_t tsType = smlGetTsTypeByPrecision(info->precision);
return true; if (tsType == -1) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL);
return -1;
} }
if (smlParseSmallUint(pVal, &isValid, msg)) { if(!data){
if(!isValid) return false; return smlGetTimeNow(tsType);
pVal->type = TSDB_DATA_TYPE_USMALLINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return true;
} }
if (smlParseInt(pVal, &isValid, msg)) {
if(!isValid) return false; int64_t ts = smlGetTimeValue(data, len, tsType);
pVal->type = TSDB_DATA_TYPE_INT; if(ts == -1){
pVal->length = (int16_t)tDataTypes[pVal->type].bytes; smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
return true; return -1;
} }
if (smlParseUint(pVal, &isValid, msg)) { return ts;
if(!isValid) return false; }
pVal->type = TSDB_DATA_TYPE_UINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes; static int64_t smlParseOpenTsdbTime(SSmlHandle* info, const char* data, int32_t len){
return true; if(!data){
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
return -1;
} }
if (smlParseBigInt(pVal, &isValid, msg)) { int8_t tsType = smlGetTsTypeByLen(len);
if(!isValid) return false; if (tsType == -1) {
pVal->type = TSDB_DATA_TYPE_BIGINT; smlBuildInvalidDataMsg(&info->msgBuf, "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
pVal->length = (int16_t)tDataTypes[pVal->type].bytes; return -1;
}
int64_t ts = smlGetTimeValue(data, len, tsType);
if(ts == -1){
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
return -1;
}
return ts;
}
static int32_t smlParseTS(SSmlHandle* info, const char* data, int32_t len, SArray *cols){
int64_t ts = 0;
if(info->protocol == TSDB_SML_LINE_PROTOCOL){
ts = smlParseInfluxTime(info, data, len);
}else{
ts = smlParseOpenTsdbTime(info, data, len);
}
if(ts == -1) return TSDB_CODE_TSC_INVALID_TIME_STAMP;
// add ts to
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv){
return TSDB_CODE_OUT_OF_MEMORY;
}
kv->key = TS;
kv->keyLen = TS_LEN;
kv->i = ts;
kv->type = TSDB_DATA_TYPE_TIMESTAMP;
kv->length = (int16_t)tDataTypes[kv->type].bytes;
if(cols) taosArrayPush(cols, &kv);
return TSDB_CODE_SUCCESS;
}
static bool smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
//binary
if (smlIsBinary(pVal->value, pVal->valueLen)) {
pVal->type = TSDB_DATA_TYPE_BINARY;
pVal->valueLen -= BINARY_ADD_LEN;
pVal->length = pVal->valueLen;
pVal->value += (BINARY_ADD_LEN - 1);
return true;
}
//nchar
if (smlIsNchar(pVal->value, pVal->valueLen)) {
pVal->type = TSDB_DATA_TYPE_NCHAR;
pVal->valueLen -= NCHAR_ADD_LEN;
pVal->length = pVal->valueLen;
pVal->value += (NCHAR_ADD_LEN - 1);
return true;
}
//bool
if (smlParseBool(pVal)) {
pVal->type = TSDB_DATA_TYPE_BOOL;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return true; return true;
} }
if (smlParseBigUint(pVal, &isValid, msg)) { //number
if(!isValid) return false; if (smlParseNumber(pVal, msg)) {
pVal->type = TSDB_DATA_TYPE_UBIGINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes; pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return true; return true;
} }
smlBuildInvalidDataMsg(msg, "invalid data", pVal->value);
return false; return false;
} }
static int32_t smlParseString(const char* sql, SSmlLineInfo *elements, SSmlMsgBuf *msg){ static int32_t smlParseInfluxString(const char* sql, SSmlLineInfo *elements, SSmlMsgBuf *msg){
if(!sql) return TSDB_CODE_SML_INVALID_DATA; if(!sql) return TSDB_CODE_SML_INVALID_DATA;
while (*sql != '\0') { // jump the space at the begining while (*sql != '\0') { // jump the space at the begining
if(*sql != SPACE) { if(*sql != SPACE) {
...@@ -989,13 +898,137 @@ static int32_t smlParseString(const char* sql, SSmlLineInfo *elements, SSmlMsgBu ...@@ -989,13 +898,137 @@ static int32_t smlParseString(const char* sql, SSmlLineInfo *elements, SSmlMsgBu
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void smlParseTelnetElement(const char **sql, const char **data, int32_t *len){
while (**sql != '\0') {
if(**sql != SPACE && !(*data)) {
*data = *sql;
}else if (**sql == SPACE && *data) {
*len = *sql - *data;
break;
}
(*sql)++;
}
}
static int32_t smlParseTelnetTags(const char* data, int32_t len, SArray *cols, SHashObj *dumplicateKey, SSmlMsgBuf *msg){
for(int i = 0; i < len; i++){
// parse key
const char *key = data + i;
int32_t keyLen = 0;
while(i < len){
if(data[i] == EQUAL){
keyLen = data + i - key;
break;
}
i++;
}
if(keyLen == 0 || keyLen >= TSDB_COL_NAME_LEN){
smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
return TSDB_CODE_SML_INVALID_DATA;
}
if(smlCheckDuplicateKey(key, keyLen, dumplicateKey)){
smlBuildInvalidDataMsg(msg, "dumplicate key", key);
return TSDB_CODE_TSC_DUP_TAG_NAMES;
}
// parse value
i++;
const char *value = data + i;
while(i < len){
if(data[i] == SPACE){
break;
}
i++;
}
int32_t valueLen = data + i - value;
if(valueLen == 0){
smlBuildInvalidDataMsg(msg, "invalid value", value);
return TSDB_CODE_SML_INVALID_DATA;
}
// add kv to SSmlKv
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
kv->key = key;
kv->keyLen = keyLen;
kv->value = value;
kv->valueLen = valueLen;
kv->type = TSDB_DATA_TYPE_NCHAR;
if(cols) taosArrayPush(cols, &kv);
}
return TSDB_CODE_SUCCESS;
}
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTableInfo *tinfo, SArray *cols){
if(!sql) return TSDB_CODE_SML_INVALID_DATA;
// parse metric
smlParseTelnetElement(&sql, &tinfo->sTableName, &tinfo->sTableNameLen);
if (!(tinfo->sTableName) || tinfo->sTableNameLen == 0) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
return TSDB_CODE_SML_INVALID_DATA;
}
// parse timestamp
const char *timestamp = NULL;
int32_t tLen = 0;
smlParseTelnetElement(&sql, &timestamp, &tLen);
if (!timestamp || tLen == 0) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
return TSDB_CODE_SML_INVALID_DATA;
}
int32_t ret = smlParseTS(info, timestamp, tLen, cols);
if (ret != TSDB_CODE_SUCCESS) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
return TSDB_CODE_SML_INVALID_DATA;
}
// parse value
const char *value = NULL;
int32_t valueLen = 0;
smlParseTelnetElement(&sql, &value, &valueLen);
if (!value || valueLen == 0) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
return TSDB_CODE_SML_INVALID_DATA;
}
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
taosArrayPush(cols, &kv);
kv->key = VALUE;
kv->keyLen = VALUE_LEN;
kv->value = value;
kv->valueLen = valueLen;
if(!smlParseValue(kv, &info->msgBuf) || kv->type == TSDB_DATA_TYPE_BINARY
|| kv->type == TSDB_DATA_TYPE_NCHAR || kv->type == TSDB_DATA_TYPE_BOOL){
return TSDB_CODE_SML_INVALID_DATA;
}
// parse tags
while(*sql == SPACE){
sql++;
}
ret = smlParseTelnetTags(sql, strlen(sql), tinfo->tags, info->dumplicateKey, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
return TSDB_CODE_SML_INVALID_DATA;
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){
if(isTag && len == 0){ if(isTag && len == 0){
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
kv->key = TAG; kv->key = TAG;
kv->keyLen = strlen(TAG); kv->keyLen = TAG_LEN;
kv->value = TAG; kv->value = TAG;
kv->valueLen = strlen(TAG); kv->valueLen = TAG_LEN;
kv->type = TSDB_DATA_TYPE_NCHAR; kv->type = TSDB_DATA_TYPE_NCHAR;
if(cols) taosArrayPush(cols, &kv); if(cols) taosArrayPush(cols, &kv);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1017,11 +1050,9 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is ...@@ -1017,11 +1050,9 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
if(taosHashGet(dumplicateKey, key, keyLen)){ if(smlCheckDuplicateKey(key, keyLen, dumplicateKey)){
smlBuildInvalidDataMsg(msg, "dumplicate key", key); smlBuildInvalidDataMsg(msg, "dumplicate key", key);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_TSC_DUP_TAG_NAMES;
}else{
taosHashPut(dumplicateKey, key, keyLen, key, CHAR_BYTES);
} }
// parse value // parse value
...@@ -1029,7 +1060,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is ...@@ -1029,7 +1060,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
const char *value = data + i; const char *value = data + i;
bool isInQuote = false; bool isInQuote = false;
while(i < len){ while(i < len){
if(data[i] == QUOTE && data[i-1] != SLASH){ if(!isTag && data[i] == QUOTE && data[i-1] != SLASH){
isInQuote = !isInQuote; isInQuote = !isInQuote;
} }
if(!isInQuote && data[i] == COMMA && i > 0 && data[i-1] != SLASH){ if(!isInQuote && data[i] == COMMA && i > 0 && data[i-1] != SLASH){
...@@ -1037,7 +1068,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is ...@@ -1037,7 +1068,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
} }
i++; i++;
} }
if(isInQuote){ if(!isTag && isInQuote){
smlBuildInvalidDataMsg(msg, "only one quote", value); smlBuildInvalidDataMsg(msg, "only one quote", value);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
...@@ -1049,6 +1080,9 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is ...@@ -1049,6 +1080,9 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
// add kv to SSmlKv // add kv to SSmlKv
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
if(cols) taosArrayPush(cols, &kv);
kv->key = key; kv->key = key;
kv->keyLen = keyLen; kv->keyLen = keyLen;
kv->value = value; kv->value = value;
...@@ -1060,386 +1094,833 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is ...@@ -1060,386 +1094,833 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
} }
if(cols) taosArrayPush(cols, &kv);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) { //static int32_t parseSmlCols(const char* data, SArray *cols){
char *endPtr = NULL; // while(*data != '\0'){
double ts = (double)strtoll(value, &endPtr, 10); // if(*data == EQUAL) return TSDB_CODE_SML_INVALID_DATA;
if(value + len != endPtr){ // const char *key = data;
return -1; // int32_t keyLen = 0;
} // while(*data != '\0'){
switch (type) { // if(*data == EQUAL && *(data-1) != SLASH){
case TSDB_TIME_PRECISION_HOURS: // keyLen = data - key;
ts *= (3600 * 1e9); // data ++;
break; // break;
case TSDB_TIME_PRECISION_MINUTES: // }
ts *= (60 * 1e9); // data++;
break; // }
case TSDB_TIME_PRECISION_SECONDS: // if(keyLen == 0){
ts *= (1e9); // return TSDB_CODE_SML_INVALID_DATA;
break; // }
case TSDB_TIME_PRECISION_MILLI: //
ts *= (1e6); // if(*data == COMMA) return TSDB_CODE_SML_INVALID_DATA;
break; // const char *value = data;
case TSDB_TIME_PRECISION_MICRO: // int32_t valueLen = 0;
ts *= (1e3); // while(*data != '\0'){
break; // if(*data == COMMA && *(data-1) != SLASH){
case TSDB_TIME_PRECISION_NANO: // valueLen = data - value;
break; // data ++;
default: // break;
ASSERT(0); // }
} // data++;
if(ts > (double)INT64_MAX || ts < 0){ // }
return -1; // if(valueLen == 0){
} // return TSDB_CODE_SML_INVALID_DATA;
// }
//
// TAOS_SML_KV *kv = taosMemoryCalloc(sizeof(TAOS_SML_KV), 1);
// kv->key = key;
// kv->keyLen = keyLen;
// kv->value = value;
// kv->valueLen = valueLen;
// kv->type = TSDB_DATA_TYPE_NCHAR;
// if(cols) taosArrayPush(cols, &kv);
// }
// return TSDB_CODE_SUCCESS;
//}
return (int64_t)ts; static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, SSmlMsgBuf *msg){
} if(tags){
for (int i = 0; i < taosArrayGetSize(tags); ++i) {
SSmlKv *kv = (SSmlKv *)taosArrayGetP(tags, i);
ASSERT(kv->type == TSDB_DATA_TYPE_NCHAR);
static int64_t smlGetTimeNow(int8_t precision) { uint8_t *index = (uint8_t *)taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen);
switch (precision) { if(index){
case TSDB_TIME_PRECISION_HOURS: SSmlKv **value = (SSmlKv **)taosArrayGet(tableMeta->tags, *index);
return taosGetTimestampMs()/1000/3600; ASSERT((*value)->type == TSDB_DATA_TYPE_NCHAR);
case TSDB_TIME_PRECISION_MINUTES: if(kv->valueLen > (*value)->valueLen){ // tags type is nchar
return taosGetTimestampMs()/1000/60; *value = kv;
case TSDB_TIME_PRECISION_SECONDS: }
return taosGetTimestampMs()/1000; }else{
case TSDB_TIME_PRECISION_MILLI: size_t tmp = taosArrayGetSize(tableMeta->tags);
case TSDB_TIME_PRECISION_MICRO: ASSERT(tmp <= UINT8_MAX);
case TSDB_TIME_PRECISION_NANO: uint8_t size = tmp;
return taosGetTimestamp(precision); taosArrayPush(tableMeta->tags, &kv);
default: taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &size, CHAR_BYTES);
ASSERT(0); }
}
} }
}
static int8_t smlGetTsTypeByLen(int32_t len) { if(cols){
if (len == TSDB_TIME_PRECISION_SEC_DIGITS) { for (int i = 1; i < taosArrayGetSize(cols); ++i) { //jump timestamp
return TSDB_TIME_PRECISION_SECONDS; SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
} else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
return TSDB_TIME_PRECISION_MILLI_DIGITS;
} else {
return -1;
}
}
static int8_t smlGetTsTypeByPrecision(int8_t precision) { int16_t *index = (int16_t *)taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen);
switch (precision) { if(index){
case TSDB_SML_TIMESTAMP_HOURS: SSmlKv **value = (SSmlKv **)taosArrayGet(tableMeta->cols, *index);
return TSDB_TIME_PRECISION_HOURS; if(kv->type != (*value)->type){
case TSDB_SML_TIMESTAMP_MILLI_SECONDS: smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
return TSDB_TIME_PRECISION_MILLI; return false;
case TSDB_SML_TIMESTAMP_NANO_SECONDS: }else{
case TSDB_SML_TIMESTAMP_NOT_CONFIGURED: if(IS_VAR_DATA_TYPE(kv->type)){ // update string len, if bigger
return TSDB_TIME_PRECISION_NANO; if(kv->valueLen > (*value)->valueLen){
case TSDB_SML_TIMESTAMP_MICRO_SECONDS: *value = kv;
return TSDB_TIME_PRECISION_MICRO; }
case TSDB_SML_TIMESTAMP_SECONDS: }
return TSDB_TIME_PRECISION_SECONDS; }
case TSDB_SML_TIMESTAMP_MINUTES: }else{
return TSDB_TIME_PRECISION_MINUTES; size_t tmp = taosArrayGetSize(tableMeta->cols);
default: ASSERT(tmp <= INT16_MAX);
return -1; int16_t size = tmp;
taosArrayPush(tableMeta->cols, &kv);
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
}
}
} }
return true;
} }
static int64_t smlParseInfluxTime(SSmlHandle* info, const char* data, int32_t len){ static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){
int8_t tsType = smlGetTsTypeByPrecision(info->precision); if(tags){
if (tsType == -1) { for (uint8_t i = 0; i < taosArrayGetSize(tags); ++i) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL); SSmlKv *kv = (SSmlKv *)taosArrayGetP(tags, i);
return -1; taosArrayPush(tableMeta->tags, &kv);
} taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &i, CHAR_BYTES);
if(!data){ }
return smlGetTimeNow(tsType);
} }
int64_t ts = smlGetTimeValue(data, len, tsType); if(cols){
if(ts == -1){ for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data); SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
return -1; taosArrayPush(tableMeta->cols, &kv);
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
}
} }
return ts;
} }
static int64_t smlParseOpenTsdbTime(SSmlHandle* info, const char* data, int32_t len){ static SSmlTableInfo* smlBuildTableInfo(){
if(!data){ SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL); if(!tag){
return -1; return NULL;
} }
int8_t tsType = smlGetTsTypeByLen(len);
if (tsType == -1) { tag->cols = taosArrayInit(16, POINTER_BYTES);
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data); if (tag->cols == NULL) {
return -1; uError("SML:smlBuildTableInfo failed to allocate memory");
goto cleanup;
} }
int64_t ts = smlGetTimeValue(data, len, tsType);
if(ts == -1){ tag->tags = taosArrayInit(16, POINTER_BYTES);
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data); if (tag->tags == NULL) {
return -1; uError("SML:smlBuildTableInfo failed to allocate memory");
goto cleanup;
} }
return ts; return tag;
cleanup:
taosMemoryFree(tag);
return NULL;
} }
static int32_t smlParseTS(SSmlHandle* info, const char* data, int32_t len, SArray *cols){ static void smlDestroyTableInfo(SSmlTableInfo *tag, bool format){
int64_t ts = 0; if(format){
if(info->protocol == TSDB_SML_LINE_PROTOCOL){ for(size_t i = 0; i < taosArrayGetSize(tag->cols); i++){
ts = smlParseInfluxTime(info, data, len); SArray *kvArray = (SArray *)taosArrayGetP(tag->cols, i);
for (int j = 0; j < taosArrayGetSize(kvArray); ++j) {
void *p = taosArrayGetP(kvArray, j);
taosMemoryFree(p);
}
taosArrayDestroy(kvArray);
}
}else{ }else{
ts = smlParseOpenTsdbTime(info, data, len); for(size_t i = 0; i < taosArrayGetSize(tag->cols); i++){
SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
void** p1 = (void**)taosHashIterate(kvHash, NULL);
while (p1) {
taosMemoryFree(*p1);
p1 = (void**)taosHashIterate(kvHash, p1);
}
taosHashCleanup(kvHash);
}
}
taosArrayDestroy(tag->cols);
taosArrayDestroy(tag->tags);
taosMemoryFree(tag);
}
static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){
if(dataFormat){
taosArrayPush(oneTable->cols, &cols);
return TSDB_CODE_SUCCESS;
}
SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if(!kvHash){
uError("SML:smlDealCols failed to allocate memory");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for(size_t i = 0; i < taosArrayGetSize(cols); i++){
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); // todo key need escape, like \=, because find by schema name later
}
taosArrayPush(oneTable->cols, &kvHash);
return TSDB_CODE_SUCCESS;
}
static SSmlSTableMeta* smlBuildSTableMeta(){
SSmlSTableMeta* meta = (SSmlSTableMeta*)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
if(!meta){
return NULL;
}
meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (meta->tagHash == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
meta->fieldHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (meta->fieldHash == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
meta->tags = taosArrayInit(32, POINTER_BYTES);
if (meta->tags == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
meta->cols = taosArrayInit(32, POINTER_BYTES);
if (meta->cols == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
return meta;
cleanup:
taosMemoryFree(meta);
return NULL;
}
static void smlDestroySTableMeta(SSmlSTableMeta *meta){
taosHashCleanup(meta->tagHash);
taosHashCleanup(meta->fieldHash);
taosArrayDestroy(meta->tags);
taosArrayDestroy(meta->cols);
taosMemoryFree(meta->tableMeta);
}
static void smlDestroyCols(SArray *cols) {
if (!cols) return;
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
void *kv = taosArrayGet(cols, i);
taosMemoryFree(kv);
}
}
static void smlDestroyInfo(SSmlHandle* info){
if(!info) return;
qDestroyQuery(info->pQuery);
smlDestroyHandle(info->exec);
// destroy info->childTables
void** p1 = (void**)taosHashIterate(info->childTables, NULL);
while (p1) {
smlDestroyTableInfo((SSmlTableInfo*)(*p1), info->dataFormat);
p1 = (void**)taosHashIterate(info->childTables, p1);
}
taosHashCleanup(info->childTables);
// destroy info->superTables
p1 = (void**)taosHashIterate(info->superTables, NULL);
while (p1) {
smlDestroySTableMeta((SSmlSTableMeta*)(*p1));
p1 = (void**)taosHashIterate(info->superTables, p1);
}
taosHashCleanup(info->superTables);
// destroy info->pVgHash
taosHashCleanup(info->pVgHash);
taosHashCleanup(info->dumplicateKey);
taosMemoryFreeClear(info);
}
static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int8_t precision, bool dataFormat){
int32_t code = TSDB_CODE_SUCCESS;
SSmlHandle* info = (SSmlHandle*)taosMemoryCalloc(1, sizeof(SSmlHandle));
if (NULL == info) {
return NULL;
}
info->id = smlGenId();
info->pQuery = (SQuery *)taosMemoryCalloc(1, sizeof(SQuery));
if (NULL == info->pQuery) {
uError("SML:0x%"PRIx64" create info->pQuery error", info->id);
goto cleanup;
}
info->pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
info->pQuery->haveResultSet = false;
info->pQuery->msgType = TDMT_VND_SUBMIT;
info->pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
if(NULL == info->pQuery->pRoot){
uError("SML:0x%"PRIx64" create info->pQuery->pRoot error", info->id);
goto cleanup;
}
((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
info->taos = (STscObj *)taos;
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
if(code != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" get catalog error %d", info->id, code);
goto cleanup;
}
info->precision = precision;
info->protocol = protocol;
info->dataFormat = dataFormat;
info->pRequest = request;
info->msgBuf.buf = info->pRequest->msgBuf;
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
info->exec = smlInitHandle(info->pQuery);
info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if(!dataFormat){
info->colsContainer = taosArrayInit(32, POINTER_BYTES);
if(NULL == info->colsContainer){
uError("SML:0x%"PRIx64" create info failed", info->id);
goto cleanup;
}
}
if(NULL == info->exec || NULL == info->childTables
|| NULL == info->superTables || NULL == info->pVgHash
|| NULL == info->dumplicateKey){
uError("SML:0x%"PRIx64" create info failed", info->id);
goto cleanup;
}
return info;
cleanup:
smlDestroyInfo(info);
return NULL;
}
/************* TSDB_SML_JSON_PROTOCOL function start **************/
static int32_t smlJsonCreateSring(const char **output, char *input, int32_t inputLen){
*output = taosMemoryMalloc(inputLen);
if (*output == NULL){
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
memcpy((void*)(*output), input, inputLen);
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo) {
cJSON *metric = cJSON_GetObjectItem(root, "metric");
if (!cJSON_IsString(metric)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
tinfo->sTableNameLen = strlen(metric->valuestring);
if (tinfo->sTableNameLen >= TSDB_TABLE_NAME_LEN) {
uError("OTD:0x%"PRIx64" Metric cannot exceeds %d characters in JSON", info->id, TSDB_TABLE_NAME_LEN - 1);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
return smlJsonCreateSring(&tinfo->sTableName, metric->valuestring, tinfo->sTableNameLen);
}
static int32_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int64_t *tsVal) {
int32_t size = cJSON_GetArraySize(root);
if (size != OTD_JSON_SUB_FIELDS_NUM) {
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON *value = cJSON_GetObjectItem(root, "value");
if (!cJSON_IsNumber(value)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON *type = cJSON_GetObjectItem(root, "type");
if (!cJSON_IsString(type)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
double timeDouble = value->valuedouble;
if(smlDoubleToInt64OverFlow(timeDouble)){
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
if(timeDouble <= 0){
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
size_t typeLen = strlen(type->valuestring);
if (typeLen == 1 && type->valuestring[0] == 's') {
//seconds
timeDouble = timeDouble * 1e9;
if(smlDoubleToInt64OverFlow(timeDouble)){
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
*tsVal = timeDouble;
} else if (typeLen == 2 && type->valuestring[1] == 's') {
switch (type->valuestring[0]) {
case 'm':
//milliseconds
timeDouble = timeDouble * 1e6;
if(smlDoubleToInt64OverFlow(timeDouble)){
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
*tsVal = timeDouble;
break;
case 'u':
//microseconds
timeDouble = timeDouble * 1e3;
if(smlDoubleToInt64OverFlow(timeDouble)){
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
*tsVal = timeDouble;
break;
case 'n':
//nanoseconds
*tsVal = timeDouble;
break;
default:
return TSDB_CODE_TSC_INVALID_JSON;
}
} else {
return TSDB_CODE_TSC_INVALID_JSON;
}
return TSDB_CODE_SUCCESS;
}
static uint8_t smlGetTimestampLen(int64_t num) {
uint8_t len = 0;
while ((num /= 10) != 0) {
len++;
}
len++;
return len;
}
static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) {
//Timestamp must be the first KV to parse
int64_t tsVal = 0;
cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
if (cJSON_IsNumber(timestamp)) {
//timestamp value 0 indicates current system time
double timeDouble = timestamp->valuedouble;
if(smlDoubleToInt64OverFlow(timeDouble)){
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
if(timeDouble <= 0){
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
if (tsLen == TSDB_TIME_PRECISION_SEC_DIGITS) {
timeDouble = timeDouble * 1e9;
if(smlDoubleToInt64OverFlow(timeDouble)){
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
tsVal = timeDouble;
} else if (tsLen == TSDB_TIME_PRECISION_MILLI_DIGITS) {
timeDouble = timeDouble * 1e6;
if(smlDoubleToInt64OverFlow(timeDouble)){
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
tsVal = timeDouble;
} else {
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
} else if (cJSON_IsObject(timestamp)) {
int32_t ret = smlParseTSFromJSONObj(info, timestamp, &tsVal);
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" Failed to parse timestamp from JSON Obj", info->id);
return ret;
}
} else {
return TSDB_CODE_TSC_INVALID_JSON;
} }
if(ts == -1) return TSDB_CODE_TSC_INVALID_TIME_STAMP;
// add ts to // add ts to
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv){ if(!kv){
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
kv->key = TS; kv->key = TS;
kv->keyLen = strlen(kv->key); kv->keyLen = TS_LEN;
kv->i = ts; kv->i = tsVal;
kv->type = TSDB_DATA_TYPE_TIMESTAMP; kv->type = TSDB_DATA_TYPE_TIMESTAMP;
kv->length = (int16_t)tDataTypes[kv->type].bytes; kv->length = (int16_t)tDataTypes[kv->type].bytes;
if(cols) taosArrayPush(cols, &kv); if(cols) taosArrayPush(cols, &kv);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
//static int32_t parseSmlCols(const char* data, SArray *cols){ static int32_t smlConvertJSONBool(SSmlKv *pVal, char* typeStr, cJSON *value) {
// while(*data != '\0'){ if (strcasecmp(typeStr, "bool") != 0) {
// if(*data == EQUAL) return TSDB_CODE_SML_INVALID_DATA; uError("OTD:invalid type(%s) for JSON Bool", typeStr);
// const char *key = data; return TSDB_CODE_TSC_INVALID_JSON_TYPE;
// int32_t keyLen = 0; }
// while(*data != '\0'){ pVal->type = TSDB_DATA_TYPE_BOOL;
// if(*data == EQUAL && *(data-1) != SLASH){ pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
// keyLen = data - key; pVal->i = value->valueint;
// data ++;
// break;
// }
// data++;
// }
// if(keyLen == 0){
// return TSDB_CODE_SML_INVALID_DATA;
// }
//
// if(*data == COMMA) return TSDB_CODE_SML_INVALID_DATA;
// const char *value = data;
// int32_t valueLen = 0;
// while(*data != '\0'){
// if(*data == COMMA && *(data-1) != SLASH){
// valueLen = data - value;
// data ++;
// break;
// }
// data++;
// }
// if(valueLen == 0){
// return TSDB_CODE_SML_INVALID_DATA;
// }
//
// TAOS_SML_KV *kv = taosMemoryCalloc(sizeof(TAOS_SML_KV), 1);
// kv->key = key;
// kv->keyLen = keyLen;
// kv->value = value;
// kv->valueLen = valueLen;
// kv->type = TSDB_DATA_TYPE_NCHAR;
// if(cols) taosArrayPush(cols, &kv);
// }
// return TSDB_CODE_SUCCESS;
//}
static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, SSmlMsgBuf *msg){ return TSDB_CODE_SUCCESS;
if(tags){ }
for (int i = 0; i < taosArrayGetSize(tags); ++i) {
SSmlKv *kv = (SSmlKv *)taosArrayGetP(tags, i);
ASSERT(kv->type == TSDB_DATA_TYPE_NCHAR);
uint8_t *index = (uint8_t *)taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen); static int32_t smlConvertJSONNumber(SSmlKv *pVal, char* typeStr, cJSON *value) {
if(index){ //tinyint
SSmlKv **value = (SSmlKv **)taosArrayGet(tableMeta->tags, *index); if (strcasecmp(typeStr, "i8") == 0 ||
ASSERT((*value)->type == TSDB_DATA_TYPE_NCHAR); strcasecmp(typeStr, "tinyint") == 0) {
if(kv->valueLen > (*value)->valueLen){ // tags type is nchar if (!IS_VALID_TINYINT(value->valuedouble)) {
*value = kv; uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
} return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}else{ }
size_t tmp = taosArrayGetSize(tableMeta->tags); pVal->type = TSDB_DATA_TYPE_TINYINT;
ASSERT(tmp <= UINT8_MAX); pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
uint8_t size = tmp; pVal->i = value->valuedouble;
taosArrayPush(tableMeta->tags, &kv); return TSDB_CODE_SUCCESS;
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &size, CHAR_BYTES); }
} //smallint
if (strcasecmp(typeStr, "i16") == 0 ||
strcasecmp(typeStr, "smallint") == 0) {
if (!IS_VALID_SMALLINT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_SMALLINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->i = value->valuedouble;
return TSDB_CODE_SUCCESS;
}
//int
if (strcasecmp(typeStr, "i32") == 0 ||
strcasecmp(typeStr, "int") == 0) {
if (!IS_VALID_INT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_INT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->i = value->valuedouble;
return TSDB_CODE_SUCCESS;
}
//bigint
if (strcasecmp(typeStr, "i64") == 0 ||
strcasecmp(typeStr, "bigint") == 0) {
pVal->type = TSDB_DATA_TYPE_BIGINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
if(smlDoubleToInt64OverFlow(value->valuedouble)){
uError("OTD:JSON value(%f) cannot fit in type(big int)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->i = value->valuedouble;
return TSDB_CODE_SUCCESS;
}
//float
if (strcasecmp(typeStr, "f32") == 0 ||
strcasecmp(typeStr, "float") == 0) {
if (!IS_VALID_FLOAT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
} }
pVal->type = TSDB_DATA_TYPE_FLOAT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->f = value->valuedouble;
return TSDB_CODE_SUCCESS;
}
//double
if (strcasecmp(typeStr, "f64") == 0 ||
strcasecmp(typeStr, "double") == 0) {
pVal->type = TSDB_DATA_TYPE_DOUBLE;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->d = value->valuedouble;
return TSDB_CODE_SUCCESS;
} }
if(cols){ //if reach here means type is unsupported
for (int i = 1; i < taosArrayGetSize(cols); ++i) { //jump timestamp uError("OTD:invalid type(%s) for JSON Number", typeStr);
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i); return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
int16_t *index = (int16_t *)taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen); static int32_t smlConvertJSONString(SSmlKv *pVal, char* typeStr, cJSON *value) {
if(index){ if (strcasecmp(typeStr, "binary") == 0) {
SSmlKv **value = (SSmlKv **)taosArrayGet(tableMeta->cols, *index); pVal->type = TSDB_DATA_TYPE_BINARY;
if(kv->type != (*value)->type){ } else if (strcasecmp(typeStr, "nchar") == 0) {
smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key); pVal->type = TSDB_DATA_TYPE_NCHAR;
return false; } else {
}else{ uError("OTD:invalid type(%s) for JSON String", typeStr);
if(IS_VAR_DATA_TYPE(kv->type)){ // update string len, if bigger return TSDB_CODE_TSC_INVALID_JSON_TYPE;
if(kv->valueLen > (*value)->valueLen){ }
*value = kv; pVal->length = (int16_t)strlen(value->valuestring);
} pVal->valueLen = pVal->length;
} return smlJsonCreateSring(&pVal->value, value->valuestring, pVal->valueLen);
} }
}else{
size_t tmp = taosArrayGetSize(tableMeta->cols); static int32_t smlParseValueFromJSONObj(cJSON *root, SSmlKv *kv) {
ASSERT(tmp <= INT16_MAX); int32_t ret = TSDB_CODE_SUCCESS;
int16_t size = tmp; int32_t size = cJSON_GetArraySize(root);
taosArrayPush(tableMeta->cols, &kv);
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &size, SHORT_BYTES); if (size != OTD_JSON_SUB_FIELDS_NUM) {
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON *value = cJSON_GetObjectItem(root, "value");
if (value == NULL) {
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON *type = cJSON_GetObjectItem(root, "type");
if (!cJSON_IsString(type)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
switch (value->type) {
case cJSON_True:
case cJSON_False: {
ret = smlConvertJSONBool(kv, type->valuestring, value);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
} }
break;
} }
case cJSON_Number: {
ret = smlConvertJSONNumber(kv, type->valuestring, value);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
break;
}
case cJSON_String: {
ret = smlConvertJSONString(kv, type->valuestring, value);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
break;
}
default:
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
} }
return true;
return TSDB_CODE_SUCCESS;
} }
static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
if(tags){ switch (root->type) {
for (uint8_t i = 0; i < taosArrayGetSize(tags); ++i) { case cJSON_True:
SSmlKv *kv = (SSmlKv *)taosArrayGetP(tags, i); case cJSON_False: {
taosArrayPush(tableMeta->tags, &kv); kv->type = TSDB_DATA_TYPE_BOOL;
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &i, CHAR_BYTES); kv->length = (int16_t)tDataTypes[kv->type].bytes;
kv->i = root->valueint;
break;
} }
} case cJSON_Number: {
kv->type = TSDB_DATA_TYPE_DOUBLE;
kv->length = (int16_t)tDataTypes[kv->type].bytes;
kv->d = root->valuedouble;
break;
}
case cJSON_String: {
/* set default JSON type to binary/nchar according to
* user configured parameter tsDefaultJSONStrType
*/
if(cols){ char *tsDefaultJSONStrType = "nchar"; //todo
for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) { smlConvertJSONString(kv, tsDefaultJSONStrType, root);
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i); break;
taosArrayPush(tableMeta->cols, &kv);
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
} }
case cJSON_Object: {
int32_t ret = smlParseValueFromJSONObj(root, kv);
if (ret != TSDB_CODE_SUCCESS) {
uError("OTD:Failed to parse value from JSON Obj");
return ret;
}
break;
}
default:
return TSDB_CODE_TSC_INVALID_JSON;
} }
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
cJSON *metricVal = cJSON_GetObjectItem(root, "value");
if (metricVal == NULL) {
return TSDB_CODE_TSC_INVALID_JSON;
}
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv){
return TSDB_CODE_OUT_OF_MEMORY;
}
if(cols) taosArrayPush(cols, &kv);
kv->key = VALUE;
kv->keyLen = VALUE_LEN;
int32_t ret = smlParseValueFromJSON(metricVal, kv);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
return TSDB_CODE_SUCCESS;
} }
static SSmlTableInfo* smlBuildTableInfo(bool format){ static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, SHashObj *dumplicateKey, SSmlMsgBuf *msg) {
SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1); int32_t ret = TSDB_CODE_SUCCESS;
if(!tag){
return NULL; cJSON *tags = cJSON_GetObjectItem(root, "tags");
} if (tags == NULL || tags->type != cJSON_Object) {
return TSDB_CODE_TSC_INVALID_JSON;
}
//handle child table name todo
// size_t childTableNameLen = strlen(tsSmlChildTableName);
// char childTbName[TSDB_TABLE_NAME_LEN] = {0};
// if (childTableNameLen != 0) {
// memcpy(childTbName, tsSmlChildTableName, childTableNameLen);
// cJSON *id = cJSON_GetObjectItem(tags, childTbName);
// if (id != NULL) {
// if (!cJSON_IsString(id)) {
// tscError("OTD:0x%"PRIx64" ID must be JSON string", info->id);
// return TSDB_CODE_TSC_INVALID_JSON;
// }
// size_t idLen = strlen(id->valuestring);
// *childTableName = tcalloc(idLen + TS_BACKQUOTE_CHAR_SIZE + 1, sizeof(char));
// memcpy(*childTableName, id->valuestring, idLen);
// addEscapeCharToString(*childTableName, (int32_t)idLen);
//
// //check duplicate IDs
// cJSON_DeleteItemFromObject(tags, childTbName);
// id = cJSON_GetObjectItem(tags, childTbName);
// if (id != NULL) {
// return TSDB_CODE_TSC_DUP_TAG_NAMES;
// }
// }
// }
if(format){ int32_t tagNum = cJSON_GetArraySize(tags);
tag->colsFormat = taosArrayInit(16, POINTER_BYTES); for (int32_t i = 0; i < tagNum; ++i) {
if (tag->colsFormat == NULL) { cJSON *tag = cJSON_GetArrayItem(tags, i);
uError("SML:smlParseLine failed to allocate memory"); if (tag == NULL) {
goto cleanup; return TSDB_CODE_TSC_INVALID_JSON;
} }
}else{ //check duplicate keys
tag->cols = taosArrayInit(16, POINTER_BYTES); if (smlCheckDuplicateKey(tag->string, strlen(tag->string), dumplicateKey)) {
if (tag->cols == NULL) { return TSDB_CODE_TSC_DUP_TAG_NAMES;
uError("SML:smlParseLine failed to allocate memory");
goto cleanup;
} }
}
tag->tags = taosArrayInit(16, POINTER_BYTES); // add kv to SSmlKv
if (tag->tags == NULL) { SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
uError("SML:smlParseLine failed to allocate memory"); if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
goto cleanup; if(pKVs) taosArrayPush(pKVs, &kv);
//key
kv->keyLen = strlen(tag->string);
if (kv->keyLen >= TSDB_COL_NAME_LEN) {
uError("OTD:Tag key cannot exceeds %d characters in JSON", TSDB_COL_NAME_LEN - 1);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
ret = smlJsonCreateSring(&kv->key, tag->string, kv->keyLen);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
//value
ret = smlParseValueFromJSON(tag, kv);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
} }
return tag;
cleanup: return ret;
taosMemoryFree(tag);
return NULL;
}
static void smlDestroyBuildTableInfo(SSmlTableInfo *tag, bool format){
if(format){
taosArrayDestroy(tag->colsFormat);
}else{
tag->cols = taosArrayInit(16, POINTER_BYTES);
for(size_t i = 0; i < taosArrayGetSize(tag->cols); i++){
SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
void** p1 = (void**)taosHashIterate(kvHash, NULL);
while (p1) {
taosMemoryFree(*p1);
p1 = (void**)taosHashIterate(kvHash, p1);
}
taosHashCleanup(kvHash);
}
}
taosArrayDestroy(tag->tags);
taosMemoryFree(tag);
} }
static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo, SArray *cols) {
if(dataFormat){ int32_t ret = TSDB_CODE_SUCCESS;
taosArrayPush(oneTable->colsFormat, &cols);
return TSDB_CODE_SUCCESS;
}
SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (!cJSON_IsObject(root)) {
if(!kvHash){ uError("OTD:0x%"PRIx64" data point needs to be JSON object", info->id);
uError("SML:smlDealCols failed to allocate memory"); return TSDB_CODE_TSC_INVALID_JSON;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for(size_t i = 0; i < taosArrayGetSize(cols); i++){
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); // todo key need escape, like \=, because find by schema name later
} }
taosArrayPush(oneTable->cols, &kvHash);
return TSDB_CODE_SUCCESS; int32_t size = cJSON_GetArraySize(root);
} //outmost json fields has to be exactly 4
if (size != OTD_JSON_FIELDS_NUM) {
static SSmlSTableMeta* smlBuildSTableMeta(){ uError("OTD:0x%"PRIx64" Invalid number of JSON fields in data point %d", info->id, size);
SSmlSTableMeta* meta = (SSmlSTableMeta*)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1); return TSDB_CODE_TSC_INVALID_JSON;
if(!meta){
return NULL;
} }
meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (meta->tagHash == NULL) { //Parse metric
uError("SML:smlBuildSTableMeta failed to allocate memory"); ret = smlParseMetricFromJSON(info, root, tinfo);
goto cleanup; if (ret != TSDB_CODE_SUCCESS) {
uError("OTD:0x%"PRIx64" Unable to parse metric from JSON payload", info->id);
return ret;
} }
uDebug("OTD:0x%"PRIx64" Parse metric from JSON payload finished", info->id);
meta->fieldHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); //Parse timestamp
if (meta->fieldHash == NULL) { ret = smlParseTSFromJSON(info, root, cols);
uError("SML:smlBuildSTableMeta failed to allocate memory"); if (ret) {
goto cleanup; uError("OTD:0x%"PRIx64" Unable to parse timestamp from JSON payload", info->id);
return ret;
} }
uDebug("OTD:0x%"PRIx64" Parse timestamp from JSON payload finished", info->id);
meta->tags = taosArrayInit(32, POINTER_BYTES); //Parse metric value
if (meta->tags == NULL) { ret = smlParseColsFromJSON(root, cols);
uError("SML:smlBuildSTableMeta failed to allocate memory"); if (ret) {
goto cleanup; uError("OTD:0x%"PRIx64" Unable to parse metric value from JSON payload", info->id);
return ret;
} }
uDebug("OTD:0x%"PRIx64" Parse metric value from JSON payload finished", info->id);
meta->cols = taosArrayInit(32, POINTER_BYTES); //Parse tags
if (meta->cols == NULL) { ret = smlParseTagsFromJSON(root, tinfo->tags, info->dumplicateKey, &info->msgBuf);
uError("SML:smlBuildSTableMeta failed to allocate memory"); if (ret) {
goto cleanup; uError("OTD:0x%"PRIx64" Unable to parse tags from JSON payload", info->id);
return ret;
} }
return meta; uDebug("OTD:0x%"PRIx64" Parse tags from JSON payload finished", info->id);
cleanup: return TSDB_CODE_SUCCESS;
taosMemoryFree(meta);
return NULL;
} }
/************* TSDB_SML_JSON_PROTOCOL function end **************/
static void smlDestroySTableMeta(SSmlSTableMeta *meta){
taosHashCleanup(meta->tagHash);
taosHashCleanup(meta->fieldHash);
taosArrayDestroy(meta->tags);
taosArrayDestroy(meta->cols);
taosMemoryFree(meta->tableMeta);
}
static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
static int32_t smlParseInfluxLine(SSmlHandle* info, const char* sql) {
SSmlLineInfo elements = {0}; SSmlLineInfo elements = {0};
int ret = smlParseString(sql, &elements, &info->msgBuf); int ret = smlParseInfluxString(sql, &elements, &info->msgBuf);
if(ret != TSDB_CODE_SUCCESS){ if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseString failed", info->id); uError("SML:0x%"PRIx64" smlParseInfluxLine failed", info->id);
return ret; return ret;
} }
...@@ -1447,7 +1928,7 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) { ...@@ -1447,7 +1928,7 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
if(info->dataFormat){ // if dataFormat, cols need new memory to save data if(info->dataFormat){ // if dataFormat, cols need new memory to save data
cols = taosArrayInit(16, POINTER_BYTES); cols = taosArrayInit(16, POINTER_BYTES);
if (cols == NULL) { if (cols == NULL) {
uError("SML:0x%"PRIx64" smlParseLine failed to allocate memory", info->id); uError("SML:0x%"PRIx64" smlParseInfluxLine failed to allocate memory", info->id);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
}else{ // if dataFormat is false, cols do not need to save data, there is another new memory to save data }else{ // if dataFormat is false, cols do not need to save data, there is another new memory to save data
...@@ -1457,11 +1938,14 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) { ...@@ -1457,11 +1938,14 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols); ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols);
if(ret != TSDB_CODE_SUCCESS){ if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseTS failed", info->id); uError("SML:0x%"PRIx64" smlParseTS failed", info->id);
if(info->dataFormat) taosArrayDestroy(cols);
return ret; return ret;
} }
ret = smlParseCols(elements.cols, elements.colsLen, cols, false, info->dumplicateKey, &info->msgBuf); ret = smlParseCols(elements.cols, elements.colsLen, cols, false, info->dumplicateKey, &info->msgBuf);
if(ret != TSDB_CODE_SUCCESS){ if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseCols parse cloums fields failed", info->id); uError("SML:0x%"PRIx64" smlParseCols parse cloums fields failed", info->id);
smlDestroyCols(cols);
if(info->dataFormat) taosArrayDestroy(cols);
return ret; return ret;
} }
if(taosArrayGetSize(cols) > TSDB_MAX_COLUMNS){ if(taosArrayGetSize(cols) > TSDB_MAX_COLUMNS){
...@@ -1469,61 +1953,54 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) { ...@@ -1469,61 +1953,54 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
bool hasTable = true;
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, elements.measure, elements.measureTagsLen); SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, elements.measure, elements.measureTagsLen);
if(oneTable){ if(!oneTable){
SSmlSTableMeta** tableMeta = (SSmlSTableMeta**)taosHashGet(info->superTables, elements.measure, elements.measureLen); SSmlTableInfo *tinfo = smlBuildTableInfo();
ASSERT(tableMeta);
ret = smlUpdateMeta(*tableMeta, NULL, cols, &info->msgBuf); // update meta cols
if(!ret){
uError("SML:0x%"PRIx64" smlUpdateMeta cols failed", info->id);
return TSDB_CODE_SML_INVALID_DATA;
}
ret = smlDealCols(*oneTable, info->dataFormat, cols);
if(ret != TSDB_CODE_SUCCESS){
return ret;
}
}else{
SSmlTableInfo *tinfo = smlBuildTableInfo(info->dataFormat);
if(!tinfo){ if(!tinfo){
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
ret = smlDealCols(tinfo, info->dataFormat, cols); taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES);
if(ret != TSDB_CODE_SUCCESS){ *oneTable = tinfo;
return ret; hasTable = false;
} }
ret = smlDealCols(*oneTable, info->dataFormat, cols);
if(ret != TSDB_CODE_SUCCESS){
return ret;
}
ret = smlParseCols(elements.tags, elements.tagsLen, tinfo->tags, true, info->dumplicateKey, &info->msgBuf); if(!hasTable){
ret = smlParseCols(elements.tags, elements.tagsLen, (*oneTable)->tags, true, info->dumplicateKey, &info->msgBuf);
if(ret != TSDB_CODE_SUCCESS){ if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id); uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id);
return ret; return ret;
} }
if(taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS){ if(taosArrayGetSize((*oneTable)->tags) > TSDB_MAX_TAGS){
smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL); smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
tinfo->sTableName = elements.measure; (*oneTable)->sTableName = elements.measure;
tinfo->sTableNameLen = elements.measureLen; (*oneTable)->sTableNameLen = elements.measureLen;
RandTableName rName = {.tags=tinfo->tags, .sTableName=tinfo->sTableName, .sTableNameLen=tinfo->sTableNameLen, RandTableName rName = {.tags=(*oneTable)->tags, .sTableName=(*oneTable)->sTableName, .sTableNameLen=(*oneTable)->sTableNameLen,
.childTableName=tinfo->childTableName}; .childTableName=(*oneTable)->childTableName};
buildChildTableName(&rName); buildChildTableName(&rName);
tinfo->uid = rName.uid; (*oneTable)->uid = rName.uid;
}
SSmlSTableMeta** tableMeta = (SSmlSTableMeta**)taosHashGet(info->superTables, elements.measure, elements.measureLen); SSmlSTableMeta** tableMeta = (SSmlSTableMeta**)taosHashGet(info->superTables, elements.measure, elements.measureLen);
if(tableMeta){ // update meta if(tableMeta){ // update meta
ret = smlUpdateMeta(*tableMeta, tinfo->tags, cols, &info->msgBuf); ret = smlUpdateMeta(*tableMeta, hasTable ? NULL : (*oneTable)->tags, cols, &info->msgBuf);
if(!ret){ if(!ret){
uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id); uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
}
}else{
SSmlSTableMeta *meta = smlBuildSTableMeta();
smlInsertMeta(meta, tinfo->tags, cols);
taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES);
} }
}else{
taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES); SSmlSTableMeta *meta = smlBuildSTableMeta();
smlInsertMeta(meta, (*oneTable)->tags, cols);
taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES);
} }
if(!info->dataFormat){ if(!info->dataFormat){
...@@ -1533,96 +2010,111 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) { ...@@ -1533,96 +2010,111 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void smlDestroyInfo(SSmlHandle* info){ static int32_t smlParseTelnetLine(SSmlHandle* info, void *data) {
if(!info) return; int ret = TSDB_CODE_SUCCESS;
qDestroyQuery(info->pQuery); SSmlTableInfo *tinfo = smlBuildTableInfo();
smlDestroyHandle(info->exec); if(!tinfo){
return TSDB_CODE_TSC_OUT_OF_MEMORY;
// destroy info->childTables
void** p1 = (void**)taosHashIterate(info->childTables, NULL);
while (p1) {
smlDestroyBuildTableInfo((SSmlTableInfo*)(*p1), info->dataFormat);
p1 = (void**)taosHashIterate(info->childTables, p1);
} }
taosHashCleanup(info->childTables);
// destroy info->superTables SArray *cols = taosArrayInit(16, POINTER_BYTES);
p1 = (void**)taosHashIterate(info->superTables, NULL); if (cols == NULL) {
while (p1) { uError("SML:0x%"PRIx64" smlParseTelnetLine failed to allocate memory", info->id);
smlDestroySTableMeta((SSmlSTableMeta*)(*p1)); return TSDB_CODE_TSC_OUT_OF_MEMORY;
p1 = (void**)taosHashIterate(info->superTables, p1);
} }
taosHashCleanup(info->superTables);
// destroy info->pVgHash
taosHashCleanup(info->pVgHash);
taosHashCleanup(info->dumplicateKey);
taosMemoryFreeClear(info); if(info->protocol == TSDB_SML_TELNET_PROTOCOL){
} smlParseTelnetString(info, (const char*)data, tinfo, cols);
}else if(info->protocol == TSDB_SML_JSON_PROTOCOL){
static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int8_t precision, bool dataFormat){ smlParseJSONString(info, (cJSON *)data, tinfo, cols);
int32_t code = TSDB_CODE_SUCCESS; }else{
SSmlHandle* info = (SSmlHandle*)taosMemoryCalloc(1, sizeof(SSmlHandle)); ASSERT(0);
if (NULL == info) { }
return NULL; if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseInflux failed", info->id);
smlDestroyTableInfo(tinfo, true);
taosArrayDestroy(cols);
return ret;
} }
info->id = smlGenId();
info->pQuery = (SQuery *)taosMemoryCalloc(1, sizeof(SQuery)); if(taosArrayGetSize(tinfo->tags) <= 0 || taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS){
if (NULL == info->pQuery) { smlBuildInvalidDataMsg(&info->msgBuf, "invalidate tags length:[1,128]", NULL);
uError("SML:0x%"PRIx64" create info->pQuery error", info->id); return TSDB_CODE_SML_INVALID_DATA;
goto cleanup;
} }
info->pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; taosHashClear(info->dumplicateKey);
info->pQuery->haveResultSet = false;
info->pQuery->msgType = TDMT_VND_SUBMIT; RandTableName rName = {.tags=tinfo->tags, .sTableName=tinfo->sTableName, .sTableNameLen=tinfo->sTableNameLen,
info->pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); .childTableName=tinfo->childTableName};
if(NULL == info->pQuery->pRoot){ buildChildTableName(&rName);
uError("SML:0x%"PRIx64" create info->pQuery->pRoot error", info->id); tinfo->uid = rName.uid;
goto cleanup;
bool hasTable = true;
SSmlTableInfo **oneTable = taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName));
if(!oneTable) {
taosHashPut(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName), &tinfo, POINTER_BYTES);
*oneTable = tinfo;
hasTable = false;
}else{
smlDestroyTableInfo(tinfo, true);
} }
((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
info->taos = (STscObj *)taos; taosArrayPush((*oneTable)->cols, &cols);
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog); SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen);
if(code != TSDB_CODE_SUCCESS){ if(tableMeta){ // update meta
uError("SML:0x%"PRIx64" get catalog error %d", info->id, code); ret = smlUpdateMeta(*tableMeta, hasTable ? NULL : (*oneTable)->tags, cols, &info->msgBuf);
goto cleanup; if(!ret){
uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id);
return TSDB_CODE_SML_INVALID_DATA;
}
}else{
SSmlSTableMeta *meta = smlBuildSTableMeta();
smlInsertMeta(meta, (*oneTable)->tags, cols);
taosHashPut(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen, &meta, POINTER_BYTES);
} }
info->precision = precision; return TSDB_CODE_SUCCESS;
info->protocol = protocol; }
info->dataFormat = dataFormat;
info->pRequest = request;
info->msgBuf.buf = info->pRequest->msgBuf;
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
info->exec = smlInitHandle(info->pQuery); static int32_t smlParseJSON(SSmlHandle *info, char* payload) {
info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); int32_t payloadNum = 0;
info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); int32_t ret = TSDB_CODE_SUCCESS;
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (payload == NULL) {
if(!dataFormat){ uError("SML:0x%"PRIx64" empty JSON Payload", info->id);
info->colsContainer = taosArrayInit(32, POINTER_BYTES); return TSDB_CODE_TSC_INVALID_JSON;
if(NULL == info->colsContainer){
uError("SML:0x%"PRIx64" create info failed", info->id);
goto cleanup;
}
} }
if(NULL == info->exec || NULL == info->childTables
|| NULL == info->superTables || NULL == info->pVgHash cJSON *root = cJSON_Parse(payload);
|| NULL == info->dumplicateKey){ if (root == NULL) {
uError("SML:0x%"PRIx64" create info failed", info->id); uError("SML:0x%"PRIx64" parse json failed:%s", info->id, payload);
goto cleanup; return TSDB_CODE_TSC_INVALID_JSON;
}
//multiple data points must be sent in JSON array
if (cJSON_IsObject(root)) {
payloadNum = 1;
} else if (cJSON_IsArray(root)) {
payloadNum = cJSON_GetArraySize(root);
} else {
uError("SML:0x%"PRIx64" Invalid JSON Payload", info->id);
ret = TSDB_CODE_TSC_INVALID_JSON;
goto end;
} }
return info; for (int32_t i = 0; i < payloadNum; ++i) {
cleanup: cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(root)) ? root : cJSON_GetArrayItem(root, i);
smlDestroyInfo(info); ret = smlParseTelnetLine(info, dataPoint);
return NULL; if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" Invalid JSON Payload", info->id);
goto end;
}
}
end:
cJSON_Delete(root);
return ret;
} }
static int32_t smlInsertData(SSmlHandle* info) { static int32_t smlInsertData(SSmlHandle* info) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -1636,7 +2128,7 @@ static int32_t smlInsertData(SSmlHandle* info) { ...@@ -1636,7 +2128,7 @@ static int32_t smlInsertData(SSmlHandle* info) {
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp); SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
SVgroupInfo vg; SVgroupInfo vg;
code = catalogGetTableHashVgroup(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &vg); code = catalogGetTableHashVgroup(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &vg);
if (code != 0) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName); uError("SML:0x%"PRIx64" catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
return code; return code;
} }
...@@ -1649,15 +2141,19 @@ static int32_t smlInsertData(SSmlHandle* info) { ...@@ -1649,15 +2141,19 @@ static int32_t smlInsertData(SSmlHandle* info) {
(*pMeta)->tableMeta->vgId = vg.vgId; (*pMeta)->tableMeta->vgId = vg.vgId;
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
code = smlBindData(info->exec, tableData->tags, tableData->colsFormat, (*pMeta)->cols, code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat,
tableData->cols, info->dataFormat, (*pMeta)->tableMeta, tableData->childTableName, info->msgBuf.buf, info->msgBuf.len); (*pMeta)->tableMeta, tableData->childTableName, info->msgBuf.buf, info->msgBuf.len);
if(code != TSDB_CODE_SUCCESS){ if(code != TSDB_CODE_SUCCESS){
return code; return code;
} }
oneTable = (SSmlTableInfo**)taosHashIterate(info->childTables, oneTable); oneTable = (SSmlTableInfo**)taosHashIterate(info->childTables, oneTable);
} }
smlBuildOutput(info->exec, info->pVgHash); code = smlBuildOutput(info->exec, info->pVgHash);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" smlBuildOutput failed", info->id);
return code;
}
info->cost.insertRpcTime = taosGetTimestampUs(); info->cost.insertRpcTime = taosGetTimestampUs();
launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true); launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true);
...@@ -1675,23 +2171,41 @@ static void smlPrintStatisticInfo(SSmlHandle *info){ ...@@ -1675,23 +2171,41 @@ static void smlPrintStatisticInfo(SSmlHandle *info){
info->cost.endTime-info->cost.parseTime); info->cost.endTime-info->cost.parseTime);
} }
static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) { static int32_t smlParseLine(SSmlHandle *info, char* lines[], int numLines){
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
if (numLines <= 0 || numLines > 65536) { code = smlParseJSON(info, *lines);
uError("SML:0x%"PRIx64" smlInsertLines numLines should be between 1 and 65536. numLines: %d", info->id, numLines); if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_APP_ERROR; uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, *lines);
goto cleanup; return code;
}
} }
info->cost.parseTime = taosGetTimestampUs();
for (int32_t i = 0; i < numLines; ++i) { for (int32_t i = 0; i < numLines; ++i) {
code = smlParseLine(info, lines[i]); if(info->protocol == TSDB_SML_LINE_PROTOCOL){
code = smlParseInfluxLine(info, lines[i]);
}else if(info->protocol == TSDB_SML_TELNET_PROTOCOL){
code = smlParseTelnetLine(info, lines[i]);
}else{
ASSERT(0);
}
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" smlParseLine failed. line %d : %s", info->id, i, lines[i]); uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, lines[i]);
goto cleanup; return code;
} }
} }
return code;
}
static int smlProcess(SSmlHandle *info, char* lines[], int numLines) {
int32_t code = TSDB_CODE_SUCCESS;
info->cost.parseTime = taosGetTimestampUs();
code = smlParseLine(info, lines, numLines);
if (code != 0) {
uError("SML:0x%"PRIx64" smlParseLine error : %s", info->id, tstrerror(code));
goto cleanup;
}
info->cost.lineNum = numLines; info->cost.lineNum = numLines;
info->cost.numOfSTables = taosHashGetSize(info->superTables); info->cost.numOfSTables = taosHashGetSize(info->superTables);
...@@ -1742,6 +2256,7 @@ cleanup: ...@@ -1742,6 +2256,7 @@ cleanup:
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) { TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
SRequestObj* request = (SRequestObj*)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT); SRequestObj* request = (SRequestObj*)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT);
if(!request){ if(!request){
uError("SML:taos_schemaless_insert error request is null");
return NULL; return NULL;
} }
...@@ -1750,22 +2265,28 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr ...@@ -1750,22 +2265,28 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
return (TAOS_RES*)request; return (TAOS_RES*)request;
} }
switch (protocol) { if (numLines <= 0 || numLines > 65536) {
case TSDB_SML_LINE_PROTOCOL:{ request->code = TSDB_CODE_SML_INVALID_DATA;
smlInsertLines(info, lines, numLines); smlBuildInvalidDataMsg(&info->msgBuf, "numLines should be between 1 and 65536", NULL);
break; goto end;
} }
case TSDB_SML_TELNET_PROTOCOL:
//code = taos_insert_telnet_lines(taos, lines, numLines, protocol, tsType, &affected_rows); if(protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL){
break; request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
case TSDB_SML_JSON_PROTOCOL: smlBuildInvalidDataMsg(&info->msgBuf, "protocol invalidate", NULL);
//code = taos_insert_json_payload(taos, *lines, protocol, tsType, &affected_rows); goto end;
break;
default:
break;
} }
smlDestroyInfo(info);
if(protocol == TSDB_SML_LINE_PROTOCOL && (precision < TSDB_SML_TIMESTAMP_HOURS || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)){
request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
smlBuildInvalidDataMsg(&info->msgBuf, "precision invalidate for line protocol", NULL);
goto end;
}
info->pRequest->code = smlProcess(info, lines, numLines);
end:
smlDestroyInfo(info);
return (TAOS_RES*)request; return (TAOS_RES*)request;
} }
...@@ -33,7 +33,7 @@ int main(int argc, char **argv) { ...@@ -33,7 +33,7 @@ int main(int argc, char **argv) {
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
} }
TEST(testCase, smlParseString_Test) { TEST(testCase, smlParseInfluxString_Test) {
char msg[256] = {0}; char msg[256] = {0};
SSmlMsgBuf msgBuf; SSmlMsgBuf msgBuf;
msgBuf.buf = msg; msgBuf.buf = msg;
...@@ -42,7 +42,7 @@ TEST(testCase, smlParseString_Test) { ...@@ -42,7 +42,7 @@ TEST(testCase, smlParseString_Test) {
// case 1 // case 1
char *sql = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 ,32,c=3"; char *sql = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 ,32,c=3";
int ret = smlParseString(sql, &elements, &msgBuf); int ret = smlParseInfluxString(sql, &elements, &msgBuf);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(elements.measure, sql); ASSERT_EQ(elements.measure, sql);
ASSERT_EQ(elements.measureLen, strlen("st")); ASSERT_EQ(elements.measureLen, strlen("st"));
...@@ -60,13 +60,13 @@ TEST(testCase, smlParseString_Test) { ...@@ -60,13 +60,13 @@ TEST(testCase, smlParseString_Test) {
// case 2 false // case 2 false
sql = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000"; sql = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000";
memset(&elements, 0, sizeof(SSmlLineInfo)); memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseString(sql, &elements, &msgBuf); ret = smlParseInfluxString(sql, &elements, &msgBuf);
ASSERT_NE(ret, 0); ASSERT_NE(ret, 0);
// case 3 false // case 3 false
sql = "st, t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000"; sql = "st, t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000";
memset(&elements, 0, sizeof(SSmlLineInfo)); memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseString(sql, &elements, &msgBuf); ret = smlParseInfluxString(sql, &elements, &msgBuf);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(elements.cols, sql + elements.measureTagsLen + 2); ASSERT_EQ(elements.cols, sql + elements.measureTagsLen + 2);
ASSERT_EQ(elements.colsLen, strlen("t1=3,t2=4,t3=t3")); ASSERT_EQ(elements.colsLen, strlen("t1=3,t2=4,t3=t3"));
...@@ -74,7 +74,7 @@ TEST(testCase, smlParseString_Test) { ...@@ -74,7 +74,7 @@ TEST(testCase, smlParseString_Test) {
// case 4 tag is null // case 4 tag is null
sql = "st, c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000"; sql = "st, c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000";
memset(&elements, 0, sizeof(SSmlLineInfo)); memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseString(sql, &elements, &msgBuf); ret = smlParseInfluxString(sql, &elements, &msgBuf);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(elements.measure, sql); ASSERT_EQ(elements.measure, sql);
ASSERT_EQ(elements.measureLen, strlen("st")); ASSERT_EQ(elements.measureLen, strlen("st"));
...@@ -92,7 +92,7 @@ TEST(testCase, smlParseString_Test) { ...@@ -92,7 +92,7 @@ TEST(testCase, smlParseString_Test) {
// case 5 tag is null // case 5 tag is null
sql = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 "; sql = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 ";
memset(&elements, 0, sizeof(SSmlLineInfo)); memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseString(sql, &elements, &msgBuf); ret = smlParseInfluxString(sql, &elements, &msgBuf);
sql++; sql++;
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(elements.measure, sql); ASSERT_EQ(elements.measure, sql);
...@@ -111,13 +111,13 @@ TEST(testCase, smlParseString_Test) { ...@@ -111,13 +111,13 @@ TEST(testCase, smlParseString_Test) {
// case 6 // case 6
sql = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 "; sql = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 ";
memset(&elements, 0, sizeof(SSmlLineInfo)); memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseString(sql, &elements, &msgBuf); ret = smlParseInfluxString(sql, &elements, &msgBuf);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
// case 7 // case 7
sql = " st , "; sql = " st , ";
memset(&elements, 0, sizeof(SSmlLineInfo)); memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseString(sql, &elements, &msgBuf); ret = smlParseInfluxString(sql, &elements, &msgBuf);
sql++; sql++;
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(elements.cols, sql + elements.measureTagsLen + 3); ASSERT_EQ(elements.cols, sql + elements.measureTagsLen + 3);
...@@ -126,7 +126,7 @@ TEST(testCase, smlParseString_Test) { ...@@ -126,7 +126,7 @@ TEST(testCase, smlParseString_Test) {
// case 8 false // case 8 false
sql = ", st , "; sql = ", st , ";
memset(&elements, 0, sizeof(SSmlLineInfo)); memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseString(sql, &elements, &msgBuf); ret = smlParseInfluxString(sql, &elements, &msgBuf);
ASSERT_NE(ret, 0); ASSERT_NE(ret, 0);
} }
...@@ -467,7 +467,7 @@ TEST(testCase, smlParseCols_Test) { ...@@ -467,7 +467,7 @@ TEST(testCase, smlParseCols_Test) {
taosHashCleanup(dumplicateKey); taosHashCleanup(dumplicateKey);
} }
TEST(testCase, smlParseLine_Test) { TEST(testCase, smlProcess_influx_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr); ASSERT_NE(taos, nullptr);
...@@ -483,7 +483,7 @@ TEST(testCase, smlParseLine_Test) { ...@@ -483,7 +483,7 @@ TEST(testCase, smlParseLine_Test) {
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, nullptr); ASSERT_NE(info, nullptr);
const char *sql[9] = { const char *sql[11] = {
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0 1451606400000000000", "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0 1451606400000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451607400000000000", "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451607400000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,heading=221,grade=0,fuel_consumption=25 1451608400000000000", "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,heading=221,grade=0,fuel_consumption=25 1451608400000000000",
...@@ -492,14 +492,33 @@ TEST(testCase, smlParseLine_Test) { ...@@ -492,14 +492,33 @@ TEST(testCase, smlParseLine_Test) {
"readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 1451606400000000000", "readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 1451606400000000000",
"readings,name=truck_2,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606400000000000", "readings,name=truck_2,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606400000000000",
"readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609400000000000", "readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609400000000000",
"readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629400000000000" "readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629400000000000",
"stable,t1=t1,t2=t2,t3=t3 c1=1,c2=2,c3=3,c4=4 1451629500000000000",
"stable,t2=t2,t1=t1,t3=t3 c1=1,c3=3,c4=4 1451629600000000000"
}; };
smlInsertLines(info, (char**)sql, 9); smlProcess(info, (char**)sql, 11);
// for (int i = 0; i < 3; i++) {
// smlParseLine(info, sql[i]); TAOS_RES *res = taos_query(taos, "select * from");
// } ASSERT_NE(res, nullptr);
int fieldNum = taos_field_count(res);
ASSERT_EQ(fieldNum, 11);
int rowNum = taos_affected_rows(res);
for (int i = 0; i < rowNum; ++i) {
TAOS_ROW rows = taos_fetch_row(res);
}
res = taos_query(taos, "select * from");
ASSERT_NE(res, nullptr);
fieldNum = taos_field_count(res);
ASSERT_EQ(fieldNum, 4);
rowNum = taos_affected_rows(res);
ASSERT_EQ(rowNum, 2);
for (int i = 0; i < rowNum; ++i) {
TAOS_ROW rows = taos_fetch_row(res);
}
} }
// different types
TEST(testCase, smlParseLine_error_Test) { TEST(testCase, smlParseLine_error_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr); ASSERT_NE(taos, nullptr);
...@@ -520,24 +539,251 @@ TEST(testCase, smlParseLine_error_Test) { ...@@ -520,24 +539,251 @@ TEST(testCase, smlParseLine_error_Test) {
"measure,t1=3 c1=8", "measure,t1=3 c1=8",
"measure,t2=3 c1=8u8" "measure,t2=3 c1=8u8"
}; };
int ret = smlInsertLines(info, (char **)sql, 2); int ret = smlProcess(info, (char **)sql, 2);
ASSERT_NE(ret, 0); ASSERT_NE(ret, 0);
} }
// TEST(testCase, smlParseTS_Test) { TEST(testCase, smlGetTimestampLen_Test) {
// char msg[256] = {0}; uint8_t len = smlGetTimestampLen(0);
// SSmlMsgBuf msgBuf; ASSERT_EQ(len, 1);
// msgBuf.buf = msg;
// msgBuf.len = 256; len = smlGetTimestampLen(1);
// SSmlLineInfo elements = {0}; ASSERT_EQ(len, 1);
//
// SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, dataFormat); len = smlGetTimestampLen(10);
// if(!info){ ASSERT_EQ(len, 2);
// return (TAOS_RES*)request;
// } len = smlGetTimestampLen(390);
// ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols); ASSERT_EQ(len, 3);
// if(ret != TSDB_CODE_SUCCESS){
// uError("SML:0x%"PRIx64" smlParseTS failed", info->id); len = smlGetTimestampLen(-1);
// return ret; ASSERT_EQ(len, 1);
// }
// } len = smlGetTimestampLen(-10);
ASSERT_EQ(len, 2);
len = smlGetTimestampLen(-390);
ASSERT_EQ(len, 3);
}
TEST(testCase, smlProcess_telnet_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, nullptr);
const char *sql[11] = {
"sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0"
"sys.if.bytes.out 1479496200 1.3E3 interface=eth0 host=web01 ",
"sys.if.bytes.out 1479496300 1.3E3 network=tcp"
"sys.procs.running 1479496400 42 host=web01",
};
int ret = smlProcess(info, (char**)sql, 11);
ASSERT_EQ(ret, 0);
TAOS_RES *res = taos_query(taos, "select * from");
ASSERT_NE(res, nullptr);
int fieldNum = taos_field_count(res);
ASSERT_EQ(fieldNum, 11);
int rowNum = taos_affected_rows(res);
for (int i = 0; i < rowNum; ++i) {
TAOS_ROW rows = taos_fetch_row(res);
}
res = taos_query(taos, "select * from");
ASSERT_NE(res, nullptr);
fieldNum = taos_field_count(res);
ASSERT_EQ(fieldNum, 4);
rowNum = taos_affected_rows(res);
ASSERT_EQ(rowNum, 2);
for (int i = 0; i < rowNum; ++i) {
TAOS_ROW rows = taos_fetch_row(res);
}
}
TEST(testCase, smlProcess_json_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, nullptr);
const char *sql = "[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400,\n"
" \"value\": 18,\n"
" \"tags\": {\n"
" \"host\": \"web01\",\n"
" \"dc\": \"lga\"\n"
" }\n"
" },\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400,\n"
" \"value\": 9,\n"
" \"tags\": {\n"
" \"host\": \"web02\",\n"
" \"dc\": \"lga\"\n"
" }\n"
" }\n"
"]";
smlProcess(info, (char**)sql, 11);
TAOS_RES *res = taos_query(taos, "select * from");
ASSERT_NE(res, nullptr);
int fieldNum = taos_field_count(res);
ASSERT_EQ(fieldNum, 11);
int rowNum = taos_affected_rows(res);
for (int i = 0; i < rowNum; ++i) {
TAOS_ROW rows = taos_fetch_row(res);
}
res = taos_query(taos, "select * from");
ASSERT_NE(res, nullptr);
fieldNum = taos_field_count(res);
ASSERT_EQ(fieldNum, 4);
rowNum = taos_affected_rows(res);
ASSERT_EQ(rowNum, 2);
for (int i = 0; i < rowNum; ++i) {
TAOS_ROW rows = taos_fetch_row(res);
}
sql = "{\n"
" \"metric\": \"meter_current\",\n"
" \"timestamp\": {\n"
" \"value\" : 1346846400,\n"
" \"type\" : \"s\"\n"
" },\n"
" \"value\": {\n"
" \"value\" : 10.3,\n"
" \"type\" : \"i64\"\n"
" },\n"
" \"tags\": {\n"
" \"groupid\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"bigint\"\n"
" },\n"
" \"location\": { \n"
" \"value\" : \"北京\",\n"
" \"type\" : \"nchar\"\n"
" },\n"
" \"id\": \"d1001\"\n"
" }\n"
"}";
smlProcess(info, (char**)sql, 11);
sql = "{\n"
" \"metric\": \"meter_current\",\n"
" \"timestamp\": {\n"
" \"value\" : 1346846400,\n"
" \"type\" : \"s\"\n"
" },\n"
" \"value\": {\n"
" \"value\" : 10.3,\n"
" \"type\" : \"i64\"\n"
" },\n"
" \"tags\": {\n"
" \"t1\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"bigint\"\n"
" },\n"
" \"t2\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"int\"\n"
" },\n"
" \"t3\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"i16\"\n"
" },\n"
" \"t4\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"i8\"\n"
" },\n"
" \"t5\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"f32\"\n"
" },\n"
" \"t6\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"double\"\n"
" },\n"
" \"t7\": { \n"
" \"value\" : \"8323\",\n"
" \"type\" : \"binary\"\n"
" },\n"
" \"t8\": { \n"
" \"value\" : \"北京\",\n"
" \"type\" : \"nchar\"\n"
" },\n"
" \"t9\": { \n"
" \"value\" : true,\n"
" \"type\" : \"bool\"\n"
" },\n"
" \"id\": \"d1001\"\n"
" }\n"
"}";
smlProcess(info, (char**)sql, 11);
sql = "{\n"
" \"metric\": \"meter_current\",\n"
" \"timestamp\": {\n"
" \"value\" : 1346846400000,\n"
" \"type\" : \"ms\"\n"
" },\n"
" \"value\": \"ni\",\n"
" \"tags\": {\n"
" \"t1\": { \n"
" \"value\" : 20,\n"
" \"type\" : \"i64\"\n"
" },\n"
" \"t2\": { \n"
" \"value\" : 25,\n"
" \"type\" : \"i32\"\n"
" },\n"
" \"t3\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"smallint\"\n"
" },\n"
" \"t4\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"tinyint\"\n"
" },\n"
" \"t5\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"float\"\n"
" },\n"
" \"t6\": { \n"
" \"value\" : 0.2,\n"
" \"type\" : \"f64\"\n"
" },\n"
" \"t7\": \"nsj\",\n"
" \"t8\": { \n"
" \"value\" : \"北京\",\n"
" \"type\" : \"nchar\"\n"
" },\n"
" \"t9\": false,\n"
" \"id\": \"d1001\"\n"
" }\n"
"}";
smlProcess(info, (char**)sql, 11);
}
...@@ -1652,7 +1652,7 @@ static int32_t smlBuildTagRow(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedD ...@@ -1652,7 +1652,7 @@ static int32_t smlBuildTagRow(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedD
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *colsSchema, SArray *cols, bool format, int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols, bool format,
STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen) { STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen) {
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
...@@ -1695,7 +1695,7 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *cols ...@@ -1695,7 +1695,7 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *cols
initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo); initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo);
int32_t rowNum = format ? taosArrayGetSize(colsFormat) : taosArrayGetSize(cols); int32_t rowNum = taosArrayGetSize(cols);
if(rowNum <= 0) { if(rowNum <= 0) {
return buildInvalidOperationMsg(&pBuf, "cols size <= 0"); return buildInvalidOperationMsg(&pBuf, "cols size <= 0");
} }
...@@ -1707,13 +1707,10 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *cols ...@@ -1707,13 +1707,10 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *cols
for (int32_t r = 0; r < rowNum; ++r) { for (int32_t r = 0; r < rowNum; ++r) {
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
tdSRowResetBuf(pBuilder, row); tdSRowResetBuf(pBuilder, row);
void *rowData = NULL; void *rowData = taosArrayGetP(cols, r);
size_t rowDataSize = 0; size_t rowDataSize = 0;
if(format){ if(format){
rowData = taosArrayGetP(colsFormat, r);
rowDataSize = taosArrayGetSize(rowData); rowDataSize = taosArrayGetSize(rowData);
}else{
rowData = taosArrayGetP(cols, r);
} }
// 1. set the parsed value from sql string // 1. set the parsed value from sql string
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册