未验证 提交 4ebdb936 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #12448 from taosdata/feature/TD-14761

fea: add test case for schemaless
...@@ -201,18 +201,17 @@ typedef struct SExprInfo { ...@@ -201,18 +201,17 @@ typedef struct SExprInfo {
} SExprInfo; } SExprInfo;
typedef struct { typedef struct {
const char* key; const char* key;
int32_t keyLen; int32_t keyLen;
uint8_t type; uint8_t type;
int16_t length;
union{ union{
const char* value; const char* value;
int64_t i; int64_t i;
uint64_t u; uint64_t u;
double d; double d;
float f; float f;
}; };
int32_t valueLen; int32_t length;
} SSmlKv; } SSmlKv;
#define QUERY_ASC_FORWARD_STEP 1 #define QUERY_ASC_FORWARD_STEP 1
......
...@@ -54,29 +54,29 @@ typedef enum { ...@@ -54,29 +54,29 @@ typedef enum {
} ESchemaAction; } ESchemaAction;
typedef struct { typedef struct {
char sTableName[TSDB_TABLE_NAME_LEN]; char sTableName[TSDB_TABLE_NAME_LEN];
SArray *tags; SArray *tags;
SArray *fields; SArray *fields;
} SCreateSTableActionInfo; } SCreateSTableActionInfo;
typedef struct { typedef struct {
char sTableName[TSDB_TABLE_NAME_LEN]; char sTableName[TSDB_TABLE_NAME_LEN];
SSmlKv * field; SSmlKv *field;
} SAlterSTableActionInfo; } SAlterSTableActionInfo;
typedef struct { typedef struct {
ESchemaAction action; ESchemaAction action;
union { union {
SCreateSTableActionInfo createSTable; SCreateSTableActionInfo createSTable;
SAlterSTableActionInfo alterSTable; SAlterSTableActionInfo alterSTable;
}; };
} SSchemaAction; } SSchemaAction;
typedef struct { typedef struct {
const char* measure; const char *measure;
const char* tags; const char *tags;
const char* cols; const char *cols;
const char* timestamp; const char *timestamp;
int32_t measureLen; int32_t measureLen;
int32_t measureTagsLen; int32_t measureTagsLen;
...@@ -103,7 +103,7 @@ typedef struct { ...@@ -103,7 +103,7 @@ typedef struct {
SHashObj *tagHash; // elements are <key, index in tags> SHashObj *tagHash; // elements are <key, index in tags>
SArray *cols; SArray *cols;
SHashObj *fieldHash; SHashObj *colHash;
STableMeta *tableMeta; STableMeta *tableMeta;
} SSmlSTableMeta; } SSmlSTableMeta;
...@@ -180,6 +180,7 @@ static inline bool smlCheckDuplicateKey(const char *key, int32_t keyLen, SHashOb ...@@ -180,6 +180,7 @@ static inline bool smlCheckDuplicateKey(const char *key, int32_t keyLen, SHashOb
} }
static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const char *msg2) { static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const char *msg2) {
memset(pBuf->buf, 0 , pBuf->len);
if(msg1) strncat(pBuf->buf, msg1, pBuf->len); if(msg1) strncat(pBuf->buf, msg1, pBuf->len);
int32_t left = pBuf->len - strlen(pBuf->buf); int32_t left = pBuf->len - strlen(pBuf->buf);
if(left > 2 && msg2) { if(left > 2 && msg2) {
...@@ -189,47 +190,39 @@ static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const ...@@ -189,47 +190,39 @@ static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
static int32_t smlGenerateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[], static int32_t smlGenerateSchemaAction(SSchema* colField, SHashObj* colHash, SSmlKv* kv, bool isTag,
SSchemaAction* action, bool* actionNeeded, SSmlHandle* info) { SSchemaAction* action, bool* actionNeeded, SSmlHandle* info) {
// char fieldName[TSDB_COL_NAME_LEN] = {0}; uint16_t *index = (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen);
// strcpy(fieldName, pointColField->name); if (index) {
// if (colField[*index].type != kv->type) {
// size_t* pDbIndex = taosHashGet(dbAttrHash, fieldName, strlen(fieldName)); uError("SML:0x%"PRIx64" point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id, kv->key,
// if (pDbIndex) { colField[*index].type, kv->type);
// SSchema* dbAttr = taosArrayGet(dbAttrArray, *pDbIndex); return TSDB_CODE_TSC_INVALID_VALUE;
// assert(strcasecmp(dbAttr->name, pointColField->name) == 0); }
// if (pointColField->type != dbAttr->type) {
// uError("SML:0x%"PRIx64" point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id, pointColField->name, if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR && (colField[*index].bytes - VARSTR_HEADER_SIZE) < kv->length) ||
// pointColField->type, dbAttr->type); (colField[*index].type == TSDB_DATA_TYPE_NCHAR &&((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
// return TSDB_CODE_TSC_INVALID_VALUE; if (isTag) {
// } action->action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
// } else {
// if (IS_VAR_DATA_TYPE(pointColField->type) && (pointColField->bytes > dbAttr->bytes)) { action->action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
// if (isTag) { }
// action->action = SCHEMA_ACTION_CHANGE_TAG_SIZE; action->alterSTable.field = kv;
// } else { *actionNeeded = true;
// action->action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE; }
// } } else {
// memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo)); if (isTag) {
// memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN); action->action = SCHEMA_ACTION_ADD_TAG;
// action->alterSTable.field = pointColField; } else {
// *actionNeeded = true; action->action = SCHEMA_ACTION_ADD_COLUMN;
// } }
// } else { action->alterSTable.field = kv;
// if (isTag) { *actionNeeded = true;
// action->action = SCHEMA_ACTION_ADD_TAG; }
// } else { if (*actionNeeded) {
// action->action = SCHEMA_ACTION_ADD_COLUMN; uDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, colField->name,
// } action->action);
// memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo)); }
// memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN);
// action->alterSTable.field = pointColField;
// *actionNeeded = true;
// }
// if (*actionNeeded) {
// uDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, fieldName,
// action->action);
// }
return 0; return 0;
} }
...@@ -238,9 +231,9 @@ static int32_t smlBuildColumnDescription(SSmlKv* field, char* buf, int32_t bufSi ...@@ -238,9 +231,9 @@ static int32_t smlBuildColumnDescription(SSmlKv* field, char* buf, int32_t bufSi
char tname[TSDB_TABLE_NAME_LEN] = {0}; char tname[TSDB_TABLE_NAME_LEN] = {0};
memcpy(tname, field->key, field->keyLen); memcpy(tname, field->key, field->keyLen);
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
int32_t bytes = field->valueLen; // todo int32_t bytes = field->length; // todo
int out = snprintf(buf, bufSize,"`%s` %s(%d)", int out = snprintf(buf, bufSize,"`%s` %s(%d)",
tname,tDataTypes[field->type].name, bytes); tname, tDataTypes[field->type].name, bytes);
*outBytes = out; *outBytes = out;
} else { } else {
int out = snprintf(buf, bufSize, "`%s` %s", tname, tDataTypes[type].name); int out = snprintf(buf, bufSize, "`%s` %s", tname, tDataTypes[type].name);
...@@ -259,7 +252,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { ...@@ -259,7 +252,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
uDebug("SML:0x%"PRIx64" apply schema action. action: %d", info->id, action->action); uDebug("SML:0x%"PRIx64" apply schema action. action: %d", info->id, action->action);
switch (action->action) { switch (action->action) {
case SCHEMA_ACTION_ADD_COLUMN: { case SCHEMA_ACTION_ADD_COLUMN: {
int n = sprintf(result, "alter stable %s add column ", action->alterSTable.sTableName); int n = sprintf(result, "alter stable `%s` add column ", action->alterSTable.sTableName);
smlBuildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes); smlBuildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes);
TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
code = taos_errno(res); code = taos_errno(res);
...@@ -282,7 +275,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { ...@@ -282,7 +275,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
break; break;
} }
case SCHEMA_ACTION_ADD_TAG: { case SCHEMA_ACTION_ADD_TAG: {
int n = sprintf(result, "alter stable %s add tag ", action->alterSTable.sTableName); int n = sprintf(result, "alter stable `%s` add tag ", action->alterSTable.sTableName);
smlBuildColumnDescription(action->alterSTable.field, smlBuildColumnDescription(action->alterSTable.field,
result+n, capacity-n, &outBytes); result+n, capacity-n, &outBytes);
TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
...@@ -306,7 +299,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { ...@@ -306,7 +299,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
break; break;
} }
case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: { case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
int n = sprintf(result, "alter stable %s modify column ", action->alterSTable.sTableName); int n = sprintf(result, "alter stable `%s` modify column ", action->alterSTable.sTableName);
smlBuildColumnDescription(action->alterSTable.field, result+n, smlBuildColumnDescription(action->alterSTable.field, result+n,
capacity-n, &outBytes); capacity-n, &outBytes);
TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
...@@ -329,7 +322,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { ...@@ -329,7 +322,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
break; break;
} }
case SCHEMA_ACTION_CHANGE_TAG_SIZE: { case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
int n = sprintf(result, "alter stable %s modify tag ", action->alterSTable.sTableName); int n = sprintf(result, "alter stable `%s` modify tag ", action->alterSTable.sTableName);
smlBuildColumnDescription(action->alterSTable.field, result+n, smlBuildColumnDescription(action->alterSTable.field, result+n,
capacity-n, &outBytes); capacity-n, &outBytes);
TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
...@@ -408,6 +401,25 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { ...@@ -408,6 +401,25 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
return code; return code;
} }
static int32_t smlProcessSchemaAction(SSmlHandle* info, SSchema* schemaField, SHashObj* schemaHash, SArray *cols, SSchemaAction* action, bool isTag){
int32_t code = TSDB_CODE_SUCCESS;
for (int j = 0; j < taosArrayGetSize(cols); ++j) {
SSmlKv* kv = (SSmlKv*)taosArrayGetP(cols, j);
bool actionNeeded = false;
code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, &actionNeeded, info);
if(code != TSDB_CODE_SUCCESS){
return code;
}
if (actionNeeded) {
code = smlApplySchemaAction(info, action);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlModifyDBSchemas(SSmlHandle* info) { static int32_t smlModifyDBSchemas(SSmlHandle* info) {
int32_t code = 0; int32_t code = 0;
...@@ -427,27 +439,50 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { ...@@ -427,27 +439,50 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta); code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_INVALID_STB) { if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_INVALID_STB) {
SSchemaAction schemaAction = { SCHEMA_ACTION_CREATE_STABLE, 0}; SSchemaAction schemaAction = { .action = SCHEMA_ACTION_CREATE_STABLE, .createSTable = {0}};
memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen); memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
schemaAction.createSTable.tags = sTableData->tags; schemaAction.createSTable.tags = sTableData->tags;
schemaAction.createSTable.fields = sTableData->cols; schemaAction.createSTable.fields = sTableData->cols;
code = smlApplySchemaAction(info, &schemaAction); code = smlApplySchemaAction(info, &schemaAction);
if (code != 0) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" smlApplySchemaAction failed. can not create %s", info->id, schemaAction.createSTable.sTableName); uError("SML:0x%"PRIx64" smlApplySchemaAction failed. can not create %s", info->id, schemaAction.createSTable.sTableName);
return code; return code;
} }
info->cost.numOfCreateSTables++;
}else if (code == TSDB_CODE_SUCCESS) {
SHashObj *hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
for(uint16_t i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++){
taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
}
code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta); SSchemaAction schemaAction = {.alterSTable = {0}};
if (code != 0) { memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, schemaAction.createSTable.sTableName); code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &schemaAction, true);
if (code != TSDB_CODE_SUCCESS) {
taosHashCleanup(hashTmp);
return code;
}
taosHashClear(hashTmp);
for(uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++){
taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
}
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &schemaAction, false);
taosHashCleanup(hashTmp);
if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
info->cost.numOfCreateSTables++;
}else if (code == TSDB_CODE_SUCCESS) {
} else { } else {
uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code)); uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
return code; return code;
} }
if(pTableMeta) taosMemoryFree(pTableMeta);
code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, (char*)superTable);
return code;
}
sTableData->tableMeta = pTableMeta; sTableData->tableMeta = pTableMeta;
tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, tableMetaSml); tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, tableMetaSml);
...@@ -507,7 +542,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { ...@@ -507,7 +542,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg){ static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg){
const char *pVal = kvVal->value; const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen; int32_t len = kvVal->length;
char *endptr = NULL; char *endptr = NULL;
double result = strtod(pVal, &endptr); double result = strtod(pVal, &endptr);
if(pVal == endptr){ if(pVal == endptr){
...@@ -591,7 +626,7 @@ static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg){ ...@@ -591,7 +626,7 @@ static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg){
static bool smlParseBool(SSmlKv *kvVal) { static bool smlParseBool(SSmlKv *kvVal) {
const char *pVal = kvVal->value; const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen; int32_t len = kvVal->length;
if ((len == 1) && pVal[0] == 't') { if ((len == 1) && pVal[0] == 't') {
kvVal->i = true; kvVal->i = true;
return true; return true;
...@@ -690,7 +725,7 @@ static int8_t smlGetTsTypeByLen(int32_t len) { ...@@ -690,7 +725,7 @@ static int8_t smlGetTsTypeByLen(int32_t len) {
if (len == TSDB_TIME_PRECISION_SEC_DIGITS) { if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
return TSDB_TIME_PRECISION_SECONDS; return TSDB_TIME_PRECISION_SECONDS;
} else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) { } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
return TSDB_TIME_PRECISION_MILLI_DIGITS; return TSDB_TIME_PRECISION_MILLI;
} else { } else {
return -1; return -1;
} }
...@@ -756,9 +791,12 @@ static int32_t smlParseTS(SSmlHandle* info, const char* data, int32_t len, SArra ...@@ -756,9 +791,12 @@ static int32_t smlParseTS(SSmlHandle* info, const char* data, int32_t len, SArra
int64_t ts = 0; int64_t ts = 0;
if(info->protocol == TSDB_SML_LINE_PROTOCOL){ if(info->protocol == TSDB_SML_LINE_PROTOCOL){
ts = smlParseInfluxTime(info, data, len); ts = smlParseInfluxTime(info, data, len);
}else{ }else if(info->protocol == TSDB_SML_TELNET_PROTOCOL){
ts = smlParseOpenTsdbTime(info, data, len); ts = smlParseOpenTsdbTime(info, data, len);
}else{
ASSERT(0);
} }
if(ts == -1) return TSDB_CODE_TSC_INVALID_TIME_STAMP; if(ts == -1) return TSDB_CODE_TSC_INVALID_TIME_STAMP;
// add ts to // add ts to
...@@ -778,18 +816,16 @@ static int32_t smlParseTS(SSmlHandle* info, const char* data, int32_t len, SArra ...@@ -778,18 +816,16 @@ static int32_t smlParseTS(SSmlHandle* info, const char* data, int32_t len, SArra
static bool smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) { static bool smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
//binary //binary
if (smlIsBinary(pVal->value, pVal->valueLen)) { if (smlIsBinary(pVal->value, pVal->length)) {
pVal->type = TSDB_DATA_TYPE_BINARY; pVal->type = TSDB_DATA_TYPE_BINARY;
pVal->valueLen -= BINARY_ADD_LEN; pVal->length -= BINARY_ADD_LEN;
pVal->length = pVal->valueLen;
pVal->value += (BINARY_ADD_LEN - 1); pVal->value += (BINARY_ADD_LEN - 1);
return true; return true;
} }
//nchar //nchar
if (smlIsNchar(pVal->value, pVal->valueLen)) { if (smlIsNchar(pVal->value, pVal->length)) {
pVal->type = TSDB_DATA_TYPE_NCHAR; pVal->type = TSDB_DATA_TYPE_NCHAR;
pVal->valueLen -= NCHAR_ADD_LEN; pVal->length -= NCHAR_ADD_LEN;
pVal->length = pVal->valueLen;
pVal->value += (NCHAR_ADD_LEN - 1); pVal->value += (NCHAR_ADD_LEN - 1);
return true; return true;
} }
...@@ -953,8 +989,8 @@ static int32_t smlParseTelnetTags(const char* data, int32_t len, SArray *cols, S ...@@ -953,8 +989,8 @@ static int32_t smlParseTelnetTags(const char* data, int32_t len, SArray *cols, S
kv->key = key; kv->key = key;
kv->keyLen = keyLen; kv->keyLen = keyLen;
kv->value = value; kv->value = value;
kv->valueLen = valueLen; kv->length = valueLen;
kv->type = TSDB_DATA_TYPE_NCHAR; kv->type = TSDB_DATA_TYPE_NCHAR; //todo
if(cols) taosArrayPush(cols, &kv); if(cols) taosArrayPush(cols, &kv);
} }
...@@ -1002,7 +1038,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable ...@@ -1002,7 +1038,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable
kv->key = VALUE; kv->key = VALUE;
kv->keyLen = VALUE_LEN; kv->keyLen = VALUE_LEN;
kv->value = value; kv->value = value;
kv->valueLen = valueLen; kv->length = valueLen;
if(!smlParseValue(kv, &info->msgBuf) || kv->type == TSDB_DATA_TYPE_BINARY if(!smlParseValue(kv, &info->msgBuf) || kv->type == TSDB_DATA_TYPE_BINARY
|| kv->type == TSDB_DATA_TYPE_NCHAR || kv->type == TSDB_DATA_TYPE_BOOL){ || kv->type == TSDB_DATA_TYPE_NCHAR || kv->type == TSDB_DATA_TYPE_BOOL){
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
...@@ -1028,7 +1064,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is ...@@ -1028,7 +1064,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
kv->key = TAG; kv->key = TAG;
kv->keyLen = TAG_LEN; kv->keyLen = TAG_LEN;
kv->value = TAG; kv->value = TAG;
kv->valueLen = TAG_LEN; kv->length = TAG_LEN;
kv->type = TSDB_DATA_TYPE_NCHAR; kv->type = TSDB_DATA_TYPE_NCHAR;
if(cols) taosArrayPush(cols, &kv); if(cols) taosArrayPush(cols, &kv);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1086,7 +1122,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is ...@@ -1086,7 +1122,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
kv->key = key; kv->key = key;
kv->keyLen = keyLen; kv->keyLen = keyLen;
kv->value = value; kv->value = value;
kv->valueLen = valueLen; kv->length = valueLen;
if(isTag){ if(isTag){
kv->type = TSDB_DATA_TYPE_NCHAR; kv->type = TSDB_DATA_TYPE_NCHAR;
}else{ }else{
...@@ -1142,73 +1178,40 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is ...@@ -1142,73 +1178,40 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
// return TSDB_CODE_SUCCESS; // return TSDB_CODE_SUCCESS;
//} //}
static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, SSmlMsgBuf *msg){ static bool smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, SSmlMsgBuf *msg){
if(tags){ for (int i = 0; i < taosArrayGetSize(cols); ++i) { //jump timestamp
for (int i = 0; i < taosArrayGetSize(tags); ++i) { SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
SSmlKv *kv = (SSmlKv *)taosArrayGetP(tags, i);
ASSERT(kv->type == TSDB_DATA_TYPE_NCHAR);
uint8_t *index = (uint8_t *)taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen);
if(index){
SSmlKv **value = (SSmlKv **)taosArrayGet(tableMeta->tags, *index);
ASSERT((*value)->type == TSDB_DATA_TYPE_NCHAR);
if(kv->valueLen > (*value)->valueLen){ // tags type is nchar
*value = kv;
}
}else{
size_t tmp = taosArrayGetSize(tableMeta->tags);
ASSERT(tmp <= UINT8_MAX);
uint8_t size = tmp;
taosArrayPush(tableMeta->tags, &kv);
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &size, CHAR_BYTES);
}
}
}
if(cols){
for (int i = 1; i < taosArrayGetSize(cols); ++i) { //jump timestamp
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
int16_t *index = (int16_t *)taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen); int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
if(index){ if(index){
SSmlKv **value = (SSmlKv **)taosArrayGet(tableMeta->cols, *index); SSmlKv **value = (SSmlKv **)taosArrayGet(metaArray, *index);
if(kv->type != (*value)->type){ if(kv->type != (*value)->type){
smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key); smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
return false; return false;
}else{ }else{
if(IS_VAR_DATA_TYPE(kv->type)){ // update string len, if bigger if(IS_VAR_DATA_TYPE(kv->type)){ // update string len, if bigger
if(kv->valueLen > (*value)->valueLen){ if(kv->length > (*value)->length){
*value = kv; *value = kv;
}
} }
} }
}else{
size_t tmp = taosArrayGetSize(tableMeta->cols);
ASSERT(tmp <= INT16_MAX);
int16_t size = tmp;
taosArrayPush(tableMeta->cols, &kv);
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
} }
}else{
size_t tmp = taosArrayGetSize(metaArray);
ASSERT(tmp <= INT16_MAX);
int16_t size = tmp;
taosArrayPush(metaArray, &kv);
taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
} }
} }
return true; return true;
} }
static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){ static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols){
if(tags){ for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
for (uint8_t i = 0; i < taosArrayGetSize(tags); ++i) { SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
SSmlKv *kv = (SSmlKv *)taosArrayGetP(tags, i); taosArrayPush(metaArray, &kv);
taosArrayPush(tableMeta->tags, &kv); taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &i, CHAR_BYTES);
}
}
if(cols){
for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
taosArrayPush(tableMeta->cols, &kv);
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
}
} }
} }
...@@ -1236,12 +1239,16 @@ cleanup: ...@@ -1236,12 +1239,16 @@ cleanup:
return NULL; return NULL;
} }
static void smlDestroyTableInfo(SSmlTableInfo *tag, bool format){ static void smlDestroyTableInfo(SSmlHandle* info, SSmlTableInfo *tag){
if(format){ if(info->dataFormat){
for(size_t i = 0; i < taosArrayGetSize(tag->cols); i++){ for(size_t i = 0; i < taosArrayGetSize(tag->cols); i++){
SArray *kvArray = (SArray *)taosArrayGetP(tag->cols, i); SArray *kvArray = (SArray *)taosArrayGetP(tag->cols, i);
for (int j = 0; j < taosArrayGetSize(kvArray); ++j) { for (int j = 0; j < taosArrayGetSize(kvArray); ++j) {
void *p = taosArrayGetP(kvArray, j); SSmlKv *p = (SSmlKv *)taosArrayGetP(kvArray, j);
if(info->protocol == TSDB_SML_JSON_PROTOCOL &&
(p->type == TSDB_DATA_TYPE_NCHAR || p->type == TSDB_DATA_TYPE_BINARY)){
taosMemoryFree((void*)p->value);
}
taosMemoryFree(p); taosMemoryFree(p);
} }
taosArrayDestroy(kvArray); taosArrayDestroy(kvArray);
...@@ -1257,6 +1264,19 @@ static void smlDestroyTableInfo(SSmlTableInfo *tag, bool format){ ...@@ -1257,6 +1264,19 @@ static void smlDestroyTableInfo(SSmlTableInfo *tag, bool format){
taosHashCleanup(kvHash); taosHashCleanup(kvHash);
} }
} }
for(size_t i = 0; i < taosArrayGetSize(tag->tags); i++){
SSmlKv *p = (SSmlKv *)taosArrayGetP(tag->tags, i);
if(info->protocol == TSDB_SML_JSON_PROTOCOL){
taosMemoryFree((void*)p->key);
if(p->type == TSDB_DATA_TYPE_NCHAR || p->type == TSDB_DATA_TYPE_BINARY){
taosMemoryFree((void*)p->value);
}
}
taosMemoryFree(p);
}
if(info->protocol == TSDB_SML_JSON_PROTOCOL && tag->sTableName){
taosMemoryFree((void*)tag->sTableName);
}
taosArrayDestroy(tag->cols); taosArrayDestroy(tag->cols);
taosArrayDestroy(tag->tags); taosArrayDestroy(tag->tags);
taosMemoryFree(tag); taosMemoryFree(tag);
...@@ -1293,8 +1313,8 @@ static SSmlSTableMeta* smlBuildSTableMeta(){ ...@@ -1293,8 +1313,8 @@ static SSmlSTableMeta* smlBuildSTableMeta(){
goto cleanup; goto cleanup;
} }
meta->fieldHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (meta->fieldHash == NULL) { if (meta->colHash == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory"); uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup; goto cleanup;
} }
...@@ -1319,7 +1339,7 @@ cleanup: ...@@ -1319,7 +1339,7 @@ cleanup:
static void smlDestroySTableMeta(SSmlSTableMeta *meta){ static void smlDestroySTableMeta(SSmlSTableMeta *meta){
taosHashCleanup(meta->tagHash); taosHashCleanup(meta->tagHash);
taosHashCleanup(meta->fieldHash); taosHashCleanup(meta->colHash);
taosArrayDestroy(meta->tags); taosArrayDestroy(meta->tags);
taosArrayDestroy(meta->cols); taosArrayDestroy(meta->cols);
taosMemoryFree(meta->tableMeta); taosMemoryFree(meta->tableMeta);
...@@ -1341,7 +1361,7 @@ static void smlDestroyInfo(SSmlHandle* info){ ...@@ -1341,7 +1361,7 @@ static void smlDestroyInfo(SSmlHandle* info){
// destroy info->childTables // destroy info->childTables
void** p1 = (void**)taosHashIterate(info->childTables, NULL); void** p1 = (void**)taosHashIterate(info->childTables, NULL);
while (p1) { while (p1) {
smlDestroyTableInfo((SSmlTableInfo*)(*p1), info->dataFormat); smlDestroyTableInfo(info, (SSmlTableInfo*)(*p1));
p1 = (void**)taosHashIterate(info->childTables, p1); p1 = (void**)taosHashIterate(info->childTables, p1);
} }
taosHashCleanup(info->childTables); taosHashCleanup(info->childTables);
...@@ -1357,7 +1377,9 @@ static void smlDestroyInfo(SSmlHandle* info){ ...@@ -1357,7 +1377,9 @@ static void smlDestroyInfo(SSmlHandle* info){
// destroy info->pVgHash // destroy info->pVgHash
taosHashCleanup(info->pVgHash); taosHashCleanup(info->pVgHash);
taosHashCleanup(info->dumplicateKey); taosHashCleanup(info->dumplicateKey);
if(!info->dataFormat){
taosArrayDestroy(info->colsContainer);
}
taosMemoryFreeClear(info); taosMemoryFreeClear(info);
} }
...@@ -1682,8 +1704,7 @@ static int32_t smlConvertJSONString(SSmlKv *pVal, char* typeStr, cJSON *value) { ...@@ -1682,8 +1704,7 @@ static int32_t smlConvertJSONString(SSmlKv *pVal, char* typeStr, cJSON *value) {
return TSDB_CODE_TSC_INVALID_JSON_TYPE; return TSDB_CODE_TSC_INVALID_JSON_TYPE;
} }
pVal->length = (int16_t)strlen(value->valuestring); pVal->length = (int16_t)strlen(value->valuestring);
pVal->valueLen = pVal->length; return smlJsonCreateSring(&pVal->value, value->valuestring, pVal->length);
return smlJsonCreateSring(&pVal->value, value->valuestring, pVal->valueLen);
} }
static int32_t smlParseValueFromJSONObj(cJSON *root, SSmlKv *kv) { static int32_t smlParseValueFromJSONObj(cJSON *root, SSmlKv *kv) {
...@@ -1754,7 +1775,7 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) { ...@@ -1754,7 +1775,7 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
* user configured parameter tsDefaultJSONStrType * user configured parameter tsDefaultJSONStrType
*/ */
char *tsDefaultJSONStrType = "binary"; //todo char *tsDefaultJSONStrType = "nchar"; //todo
smlConvertJSONString(kv, tsDefaultJSONStrType, root); smlConvertJSONString(kv, tsDefaultJSONStrType, root);
break; break;
} }
...@@ -1994,14 +2015,18 @@ static int32_t smlParseInfluxLine(SSmlHandle* info, const char* sql) { ...@@ -1994,14 +2015,18 @@ static int32_t smlParseInfluxLine(SSmlHandle* info, const char* sql) {
SSmlSTableMeta** tableMeta = (SSmlSTableMeta**)taosHashGet(info->superTables, elements.measure, elements.measureLen); SSmlSTableMeta** tableMeta = (SSmlSTableMeta**)taosHashGet(info->superTables, elements.measure, elements.measureLen);
if(tableMeta){ // update meta if(tableMeta){ // update meta
ret = smlUpdateMeta(*tableMeta, hasTable ? NULL : (*oneTable)->tags, cols, &info->msgBuf); ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf);
if(!hasTable && ret){
ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf);
}
if(!ret){ if(!ret){
uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id); uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
}else{ }else{
SSmlSTableMeta *meta = smlBuildSTableMeta(); SSmlSTableMeta *meta = smlBuildSTableMeta();
smlInsertMeta(meta, (*oneTable)->tags, cols); smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
smlInsertMeta(meta->colHash, meta->cols, cols);
taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES); taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES);
} }
...@@ -2026,15 +2051,15 @@ static int32_t smlParseTelnetLine(SSmlHandle* info, void *data) { ...@@ -2026,15 +2051,15 @@ static int32_t smlParseTelnetLine(SSmlHandle* info, void *data) {
} }
if(info->protocol == TSDB_SML_TELNET_PROTOCOL){ if(info->protocol == TSDB_SML_TELNET_PROTOCOL){
smlParseTelnetString(info, (const char*)data, tinfo, cols); ret = smlParseTelnetString(info, (const char*)data, tinfo, cols);
}else if(info->protocol == TSDB_SML_JSON_PROTOCOL){ }else if(info->protocol == TSDB_SML_JSON_PROTOCOL){
smlParseJSONString(info, (cJSON *)data, tinfo, cols); ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
}else{ }else{
ASSERT(0); ASSERT(0);
} }
if(ret != TSDB_CODE_SUCCESS){ if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseTelnetLine failed", info->id); uError("SML:0x%"PRIx64" smlParseTelnetLine failed", info->id);
smlDestroyTableInfo(tinfo, true); smlDestroyTableInfo(info, tinfo);
taosArrayDestroy(cols); taosArrayDestroy(cols);
return ret; return ret;
} }
...@@ -2057,20 +2082,24 @@ static int32_t smlParseTelnetLine(SSmlHandle* info, void *data) { ...@@ -2057,20 +2082,24 @@ static int32_t smlParseTelnetLine(SSmlHandle* info, void *data) {
oneTable = &tinfo; oneTable = &tinfo;
hasTable = false; hasTable = false;
}else{ }else{
smlDestroyTableInfo(tinfo, true); smlDestroyTableInfo(info, tinfo);
} }
taosArrayPush((*oneTable)->cols, &cols); taosArrayPush((*oneTable)->cols, &cols);
SSmlSTableMeta** tableMeta = (SSmlSTableMeta** )taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen); SSmlSTableMeta** tableMeta = (SSmlSTableMeta** )taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen);
if(tableMeta){ // update meta if(tableMeta){ // update meta
ret = smlUpdateMeta(*tableMeta, hasTable ? NULL : (*oneTable)->tags, cols, &info->msgBuf); ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf);
if(!hasTable && ret){
ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf);
}
if(!ret){ if(!ret){
uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id); uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
}else{ }else{
SSmlSTableMeta *meta = smlBuildSTableMeta(); SSmlSTableMeta *meta = smlBuildSTableMeta();
smlInsertMeta(meta, (*oneTable)->tags, cols); smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
smlInsertMeta(meta->colHash, meta->cols, cols);
taosHashPut(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen, &meta, POINTER_BYTES); taosHashPut(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen, &meta, POINTER_BYTES);
} }
......
...@@ -228,7 +228,7 @@ TEST(testCase, smlParseCols_tag_Test) { ...@@ -228,7 +228,7 @@ TEST(testCase, smlParseCols_tag_Test) {
ASSERT_EQ(strncasecmp(kv->key, "cbin", 4), 0); ASSERT_EQ(strncasecmp(kv->key, "cbin", 4), 0);
ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->keyLen, 4);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR);
ASSERT_EQ(kv->valueLen, 17); ASSERT_EQ(kv->length, 17);
ASSERT_EQ(strncasecmp(kv->value, "\"passit", 7), 0); ASSERT_EQ(strncasecmp(kv->value, "\"passit", 7), 0);
taosMemoryFree(kv); taosMemoryFree(kv);
...@@ -237,7 +237,7 @@ TEST(testCase, smlParseCols_tag_Test) { ...@@ -237,7 +237,7 @@ TEST(testCase, smlParseCols_tag_Test) {
ASSERT_EQ(strncasecmp(kv->key, "cf64", 4), 0); ASSERT_EQ(strncasecmp(kv->key, "cf64", 4), 0);
ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->keyLen, 4);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR);
ASSERT_EQ(kv->valueLen, 7); ASSERT_EQ(kv->length, 7);
ASSERT_EQ(strncasecmp(kv->value, "4.31f64", 7), 0); ASSERT_EQ(strncasecmp(kv->value, "4.31f64", 7), 0);
taosMemoryFree(kv); taosMemoryFree(kv);
...@@ -259,7 +259,7 @@ TEST(testCase, smlParseCols_tag_Test) { ...@@ -259,7 +259,7 @@ TEST(testCase, smlParseCols_tag_Test) {
ASSERT_EQ(strncasecmp(kv->key, TAG, strlen(TAG)), 0); ASSERT_EQ(strncasecmp(kv->key, TAG, strlen(TAG)), 0);
ASSERT_EQ(kv->keyLen, strlen(TAG)); ASSERT_EQ(kv->keyLen, strlen(TAG));
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR);
ASSERT_EQ(kv->valueLen, strlen(TAG)); ASSERT_EQ(kv->length, strlen(TAG));
ASSERT_EQ(strncasecmp(kv->value, TAG, strlen(TAG)), 0); ASSERT_EQ(strncasecmp(kv->value, TAG, strlen(TAG)), 0);
taosMemoryFree(kv); taosMemoryFree(kv);
...@@ -499,12 +499,14 @@ TEST(testCase, smlProcess_influx_Test) { ...@@ -499,12 +499,14 @@ TEST(testCase, smlProcess_influx_Test) {
TAOS_RES *res = taos_query(taos, "select * from t_6885c584b98481584ee13dac399e173d"); TAOS_RES *res = taos_query(taos, "select * from t_6885c584b98481584ee13dac399e173d");
ASSERT_NE(res, nullptr); ASSERT_NE(res, nullptr);
int fieldNum = taos_field_count(res); int fieldNum = taos_field_count(res);
ASSERT_EQ(fieldNum, 11); ASSERT_EQ(fieldNum, 5);
int rowNum = taos_affected_rows(res); int rowNum = taos_affected_rows(res);
ASSERT_EQ(rowNum, 2); ASSERT_EQ(rowNum, 2);
for (int i = 0; i < rowNum; ++i) { for (int i = 0; i < rowNum; ++i) {
TAOS_ROW rows = taos_fetch_row(res); TAOS_ROW rows = taos_fetch_row(res);
} }
taos_free_result(res);
smlDestroyInfo(info);
} }
// different types // different types
...@@ -530,245 +532,542 @@ TEST(testCase, smlParseLine_error_Test) { ...@@ -530,245 +532,542 @@ TEST(testCase, smlParseLine_error_Test) {
}; };
int ret = smlProcess(info, (char **)sql, sizeof(sql)/sizeof(sql[0])); int ret = smlProcess(info, (char **)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_NE(ret, 0); ASSERT_NE(ret, 0);
destroyRequest(request);
smlDestroyInfo(info);
}
TEST(testCase, smlGetTimestampLen_Test) {
uint8_t len = smlGetTimestampLen(0);
ASSERT_EQ(len, 1);
len = smlGetTimestampLen(1);
ASSERT_EQ(len, 1);
len = smlGetTimestampLen(10);
ASSERT_EQ(len, 2);
len = smlGetTimestampLen(390);
ASSERT_EQ(len, 3);
len = smlGetTimestampLen(-1);
ASSERT_EQ(len, 1);
len = smlGetTimestampLen(-10);
ASSERT_EQ(len, 2);
len = smlGetTimestampLen(-390);
ASSERT_EQ(len, 3);
}
TEST(testCase, smlProcess_telnet_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, nullptr);
const char *sql[5] = {
"sys.if.bytes.out 1479496100 1.3E0 host=web01 interface=eth0",
"sys.if.bytes.out 1479496101 1.3E1 interface=eth0 host=web01",
"sys.if.bytes.out 1479496102 1.3E3 network=tcp",
"sys.procs.running 1479496100 42 host=web01 ",
" sys.procs.running 1479496200 42 host=web01=4"
};
int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_EQ(ret, 0);
TAOS_RES *res = taos_query(taos, "select * from t_8c30283b3c4131a071d1e16cf6d7094a");
ASSERT_NE(res, nullptr);
int fieldNum = taos_field_count(res);
ASSERT_EQ(fieldNum, 2);
int rowNum = taos_affected_rows(res);
ASSERT_EQ(rowNum, 1);
for (int i = 0; i < rowNum; ++i) {
TAOS_ROW rows = taos_fetch_row(res);
}
taos_free_result(pRes);
res = taos_query(taos, "select * from t_6931529054e5637ca92c78a1ad441961");
ASSERT_NE(res, nullptr);
fieldNum = taos_field_count(res);
ASSERT_EQ(fieldNum, 2);
rowNum = taos_affected_rows(res);
ASSERT_EQ(rowNum, 2);
for (int i = 0; i < rowNum; ++i) {
TAOS_ROW rows = taos_fetch_row(res);
}
taos_free_result(pRes);
smlDestroyInfo(info);
}
TEST(testCase, smlProcess_json1_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, nullptr);
const char *sql =
"[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400,\n"
" \"value\": 18,\n"
" \"tags\": {\n"
" \"host\": \"web01\",\n"
" \"dc\": \"lga\"\n"
" }\n"
" },\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400,\n"
" \"value\": 9,\n"
" \"tags\": {\n"
" \"host\": \"web02\",\n"
" \"dc\": \"lga\"\n"
" }\n"
" }\n"
"]";
int ret = smlProcess(info, (char **)(&sql), -1);
ASSERT_EQ(ret, 0);
TAOS_RES *res = taos_query(taos, "select * from t_cb27a7198d637b4f1c6464bd73f756a7");
ASSERT_NE(res, nullptr);
int fieldNum = taos_field_count(res);
ASSERT_EQ(fieldNum, 2);
// int rowNum = taos_affected_rows(res);
// ASSERT_EQ(rowNum, 1);
// for (int i = 0; i < rowNum; ++i) {
// TAOS_ROW rows = taos_fetch_row(res);
// }
taos_free_result(pRes);
smlDestroyInfo(info);
}
TEST(testCase, smlProcess_json2_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, nullptr);
const char *sql =
"{\n"
" \"metric\": \"meter_current0\",\n"
" \"timestamp\": {\n"
" \"value\" : 1346846400,\n"
" \"type\" : \"s\"\n"
" },\n"
" \"value\": {\n"
" \"value\" : 10.3,\n"
" \"type\" : \"i64\"\n"
" },\n"
" \"tags\": {\n"
" \"groupid\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"bigint\"\n"
" },\n"
" \"location\": { \n"
" \"value\" : \"北京\",\n"
" \"type\" : \"binary\"\n"
" },\n"
" \"id\": \"d1001\"\n"
" }\n"
"}";
int32_t ret = smlProcess(info, (char **)(&sql), -1);
ASSERT_EQ(ret, 0);
taos_free_result(pRes);
smlDestroyInfo(info);
} }
TEST(testCase, smlGetTimestampLen_Test) { TEST(testCase, smlProcess_json3_Test) {
uint8_t len = smlGetTimestampLen(0); TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_EQ(len, 1); ASSERT_NE(taos, nullptr);
len = smlGetTimestampLen(1); TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db");
ASSERT_EQ(len, 1); taos_free_result(pRes);
len = smlGetTimestampLen(10); pRes = taos_query(taos, "use sml_db");
ASSERT_EQ(len, 2); taos_free_result(pRes);
len = smlGetTimestampLen(390); SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_EQ(len, 3); ASSERT_NE(request, nullptr);
len = smlGetTimestampLen(-1); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_EQ(len, 1); ASSERT_NE(info, nullptr);
const char *sql =
len = smlGetTimestampLen(-10); "{\n"
ASSERT_EQ(len, 2); " \"metric\": \"meter_current1\",\n"
" \"timestamp\": {\n"
len = smlGetTimestampLen(-390); " \"value\" : 1346846400,\n"
ASSERT_EQ(len, 3); " \"type\" : \"s\"\n"
} " },\n"
" \"value\": {\n"
TEST(testCase, smlProcess_telnet_Test) { " \"value\" : 10.3,\n"
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); " \"type\" : \"i64\"\n"
ASSERT_NE(taos, nullptr); " },\n"
" \"tags\": {\n"
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db"); " \"t1\": { \n"
taos_free_result(pRes); " \"value\" : 2,\n"
" \"type\" : \"bigint\"\n"
pRes = taos_query(taos, "use sml_db"); " },\n"
taos_free_result(pRes); " \"t2\": { \n"
" \"value\" : 2,\n"
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); " \"type\" : \"int\"\n"
ASSERT_NE(request, nullptr); " },\n"
" \"t3\": { \n"
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); " \"value\" : 2,\n"
ASSERT_NE(info, nullptr); " \"type\" : \"i16\"\n"
" },\n"
const char *sql[4] = { " \"t4\": { \n"
"sys.if.bytes.out 1479496100 1.3E0 host=web01 interface=eth0", " \"value\" : 2,\n"
"sys.if.bytes.out 1479496101 1.3E1 interface=eth0 host=web01 ", " \"type\" : \"i8\"\n"
"sys.if.bytes.out 1479496102 1.3E3 network=tcp", " },\n"
"sys.procs.running 1479496100 42 host=web01" " \"t5\": { \n"
}; " \"value\" : 2,\n"
int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0])); " \"type\" : \"f32\"\n"
ASSERT_EQ(ret, 0); " },\n"
" \"t6\": { \n"
TAOS_RES *res = taos_query(taos, "select * from t_8c30283b3c4131a071d1e16cf6d7094a"); " \"value\" : 2,\n"
ASSERT_NE(res, nullptr); " \"type\" : \"double\"\n"
int fieldNum = taos_field_count(res); " },\n"
ASSERT_EQ(fieldNum, 2); " \"t7\": { \n"
int rowNum = taos_affected_rows(res); " \"value\" : \"8323\",\n"
ASSERT_EQ(rowNum, 1); " \"type\" : \"binary\"\n"
for (int i = 0; i < rowNum; ++i) { " },\n"
TAOS_ROW rows = taos_fetch_row(res); " \"t8\": { \n"
} " \"value\" : \"北京\",\n"
" \"type\" : \"nchar\"\n"
res = taos_query(taos, "select * from t_6931529054e5637ca92c78a1ad441961"); " },\n"
ASSERT_NE(res, nullptr); " \"t9\": { \n"
fieldNum = taos_field_count(res); " \"value\" : true,\n"
ASSERT_EQ(fieldNum, 2); " \"type\" : \"bool\"\n"
rowNum = taos_affected_rows(res); " },\n"
ASSERT_EQ(rowNum, 2); " \"id\": \"d1001\"\n"
for (int i = 0; i < rowNum; ++i) { " }\n"
TAOS_ROW rows = taos_fetch_row(res); "}";
} int32_t ret = smlProcess(info, (char **)(&sql), -1);
} ASSERT_EQ(ret, 0);
taos_free_result(pRes);
TEST(testCase, smlProcess_json_Test) { smlDestroyInfo(info);
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); }
ASSERT_NE(taos, nullptr);
TEST(testCase, smlProcess_json4_Test) {
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db"); TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
taos_free_result(pRes); ASSERT_NE(taos, nullptr);
pRes = taos_query(taos, "use sml_db"); TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes); taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); pRes = taos_query(taos, "use sml_db");
ASSERT_NE(request, nullptr); taos_free_result(pRes);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(info, nullptr); ASSERT_NE(request, nullptr);
const char *sql = "[\n" SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
" {\n" ASSERT_NE(info, nullptr);
" \"metric\": \"sys.cpu.nice\",\n" const char *sql = "{\n"
" \"timestamp\": 1346846400,\n" " \"metric\": \"meter_current2\",\n"
" \"value\": 18,\n" " \"timestamp\": {\n"
" \"tags\": {\n" " \"value\" : 1346846500000,\n"
" \"host\": \"web01\",\n" " \"type\" : \"ms\"\n"
" \"dc\": \"lga\"\n" " },\n"
" }\n" " \"value\": \"ni\",\n"
" },\n" " \"tags\": {\n"
" {\n" " \"t1\": { \n"
" \"metric\": \"sys.cpu.nice\",\n" " \"value\" : 20,\n"
" \"timestamp\": 1346846400,\n" " \"type\" : \"i64\"\n"
" \"value\": 9,\n" " },\n"
" \"tags\": {\n" " \"t2\": { \n"
" \"host\": \"web02\",\n" " \"value\" : 25,\n"
" \"dc\": \"lga\"\n" " \"type\" : \"i32\"\n"
" }\n" " },\n"
" }\n" " \"t3\": { \n"
"]"; " \"value\" : 2,\n"
int ret = smlProcess(info, (char**)(&sql), -1); " \"type\" : \"smallint\"\n"
ASSERT_EQ(ret, 0); " },\n"
" \"t4\": { \n"
TAOS_RES *res = taos_query(taos, "select * from t_cb27a7198d637b4f1c6464bd73f756a7"); " \"value\" : 2,\n"
ASSERT_NE(res, nullptr); " \"type\" : \"tinyint\"\n"
int fieldNum = taos_field_count(res); " },\n"
ASSERT_EQ(fieldNum, 2); " \"t5\": { \n"
// int rowNum = taos_affected_rows(res); " \"value\" : 2,\n"
// ASSERT_EQ(rowNum, 1); " \"type\" : \"float\"\n"
// for (int i = 0; i < rowNum; ++i) { " },\n"
// TAOS_ROW rows = taos_fetch_row(res); " \"t6\": { \n"
// } " \"value\" : 0.2,\n"
" \"type\" : \"f64\"\n"
sql = "{\n" " },\n"
" \"metric\": \"meter_current\",\n" " \"t7\": \"nsj\",\n"
" \"timestamp\": {\n" " \"t8\": { \n"
" \"value\" : 1346846400,\n" " \"value\" : \"北京\",\n"
" \"type\" : \"s\"\n" " \"type\" : \"nchar\"\n"
" },\n" " },\n"
" \"value\": {\n" " \"t9\": false,\n"
" \"value\" : 10.3,\n" " \"id\": \"d1001\"\n"
" \"type\" : \"i64\"\n" " }\n"
" },\n" "}";
" \"tags\": {\n" int32_t ret = smlProcess(info, (char**)(&sql), -1);
" \"groupid\": { \n" ASSERT_EQ(ret, 0);
" \"value\" : 2,\n" taos_free_result(pRes);
" \"type\" : \"bigint\"\n" smlDestroyInfo(info);
" },\n" }
" \"location\": { \n"
" \"value\" : \"北京\",\n" TEST(testCase, smlParseTelnetLine_error_Test) {
" \"type\" : \"binary\"\n" TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
" },\n" ASSERT_NE(taos, nullptr);
" \"id\": \"d1001\"\n"
" }\n" TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
"}"; taos_free_result(pRes);
ret = smlProcess(info, (char**)(&sql), -1);
ASSERT_EQ(ret, 0); pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
sql = "{\n"
" \"metric\": \"meter_current\",\n" SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
" \"timestamp\": {\n" ASSERT_NE(request, nullptr);
" \"value\" : 1346846400,\n"
" \"type\" : \"s\"\n" SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
" },\n" ASSERT_NE(info, nullptr);
" \"value\": {\n"
" \"value\" : 10.3,\n" int32_t ret = 0;
" \"type\" : \"i64\"\n" const char *sql[19] = {
" },\n" "sys.procs.running 14794961040 42 host=web01",
" \"tags\": {\n" "sys.procs.running 14791040 42 host=web01",
" \"t1\": { \n" "sys.procs.running erere 42 host=web01",
" \"value\" : 2,\n" "sys.procs.running 1.6e10 42 host=web01",
" \"type\" : \"bigint\"\n" "sys.procs.running 1.47949610 42 host=web01",
" },\n" "sys.procs.running 147949610i 42 host=web01",
" \"t2\": { \n" "sys.procs.running -147949610 42 host=web01",
" \"value\" : 2,\n" "",
" \"type\" : \"int\"\n" " ",
" },\n" "sys ",
" \"t3\": { \n" "sys.procs.running 1479496100 42 ",
" \"value\" : 2,\n" "sys.procs.running 1479496100 42 host= ",
" \"type\" : \"i16\"\n" "sys.procs.running 1479496100 42or host=web01",
" },\n" "sys.procs.running 1479496100 true host=web01",
" \"t4\": { \n" "sys.procs.running 1479496100 \"binary\" host=web01",
" \"value\" : 2,\n" "sys.procs.running 1479496100 L\"rfr\" host=web01",
" \"type\" : \"i8\"\n" "sys.procs.running 1479496100 42 host=web01 cpu= ",
" },\n" "sys.procs.running 1479496100 42 host=web01 host=w2",
" \"t5\": { \n" "sys.procs.running 1479496100 42 host=web01 host",
" \"value\" : 2,\n" };
" \"type\" : \"f32\"\n" for(int i = 0; i < sizeof(sql)/sizeof(sql[0]); i++){
" },\n" ret = smlParseTelnetLine(info, (void*)sql[i]);
" \"t6\": { \n" ASSERT_NE(ret, 0);
" \"value\" : 2,\n" }
" \"type\" : \"double\"\n"
" },\n" destroyRequest(request);
" \"t7\": { \n" smlDestroyInfo(info);
" \"value\" : \"8323\",\n" }
" \"type\" : \"binary\"\n"
" },\n" TEST(testCase, smlParseTelnetLine_diff_type_Test) {
" \"t8\": { \n" TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
" \"value\" : \"北京\",\n" ASSERT_NE(taos, nullptr);
" \"type\" : \"binary\"\n"
" },\n" TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
" \"t9\": { \n" taos_free_result(pRes);
" \"value\" : true,\n"
" \"type\" : \"bool\"\n" pRes = taos_query(taos, "use sml_db");
" },\n" taos_free_result(pRes);
" \"id\": \"d1001\"\n"
" }\n" SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
"}"; ASSERT_NE(request, nullptr);
ret = smlProcess(info, (char**)(&sql), -1);
ASSERT_EQ(ret, 0); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, nullptr);
sql = "{\n"
" \"metric\": \"meter_current\",\n" const char *sql[2] = {
" \"timestamp\": {\n" "sys.procs.running 1479496104000 42 host=web01",
" \"value\" : 1346846400000,\n" "sys.procs.running 1479496104000 42u8 host=web01"
" \"type\" : \"ms\"\n" };
" },\n" int32_t ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
" \"value\": \"ni\",\n" ASSERT_NE(ret, 0);
" \"tags\": {\n"
" \"t1\": { \n" destroyRequest(request);
" \"value\" : 20,\n" smlDestroyInfo(info);
" \"type\" : \"i64\"\n" }
" },\n"
" \"t2\": { \n" TEST(testCase, smlParseTelnetLine_json_error_Test) {
" \"value\" : 25,\n" TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
" \"type\" : \"i32\"\n" ASSERT_NE(taos, nullptr);
" },\n"
" \"t3\": { \n" TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
" \"value\" : 2,\n" taos_free_result(pRes);
" \"type\" : \"smallint\"\n"
" },\n" pRes = taos_query(taos, "use sml_db");
" \"t4\": { \n" taos_free_result(pRes);
" \"value\" : 2,\n"
" \"type\" : \"tinyint\"\n" SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
" },\n" ASSERT_NE(request, nullptr);
" \"t5\": { \n"
" \"value\" : 2,\n" SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
" \"type\" : \"float\"\n" ASSERT_NE(info, nullptr);
" },\n"
" \"t6\": { \n" int32_t ret = 0;
" \"value\" : 0.2,\n" const char *sql[] = {
" \"type\" : \"f64\"\n" "[\n"
" },\n" " {\n"
" \"t7\": \"nsj\",\n" " \"metric\": \"sys.cpu.nice\",\n"
" \"t8\": { \n" " \"timestamp\": 13468464009999333322222223,\n"
" \"value\" : \"北京\",\n" " \"value\": 18,\n"
" \"type\" : \"binary\"\n" " \"tags\": {\n"
" },\n" " \"host\": \"web01\",\n"
" \"t9\": false,\n" " \"dc\": \"lga\"\n"
" \"id\": \"d1001\"\n" " }\n"
" }\n" " },\n"
"}"; "]",
ret = smlProcess(info, (char**)(&sql), -1); "[\n"
ASSERT_EQ(ret, 0); " {\n"
} " \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400i,\n"
" \"value\": 18,\n"
" \"tags\": {\n"
" \"host\": \"web01\",\n"
" \"dc\": \"lga\"\n"
" }\n"
" },\n"
"]",
"[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400,\n"
" \"value\": 18,\n"
" \"tags\": {\n"
" \"groupid\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"nchar\"\n"
" },\n"
" \"location\": { \n"
" \"value\" : \"北京\",\n"
" \"type\" : \"binary\"\n"
" },\n"
" \"id\": \"d1001\"\n"
" }\n"
" },\n"
"]",
};
for(int i = 0; i < sizeof(sql)/sizeof(sql[0]); i++){
ret = smlParseTelnetLine(info, (void*)sql[i]);
ASSERT_NE(ret, 0);
}
destroyRequest(request);
smlDestroyInfo(info);
}
TEST(testCase, smlParseTelnetLine_diff_json_type1_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, nullptr);
const char *sql[2] = {
"[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400,\n"
" \"value\": 18,\n"
" \"tags\": {\n"
" \"host\": \"lga\"\n"
" }\n"
" },\n"
"]",
"[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400,\n"
" \"value\": 18,\n"
" \"tags\": {\n"
" \"host\": 8\n"
" }\n"
" },\n"
"]",
};
int32_t ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_NE(ret, 0);
destroyRequest(request);
smlDestroyInfo(info);
}
TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, nullptr);
const char *sql[2] = {
"[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400,\n"
" \"value\": 18,\n"
" \"tags\": {\n"
" \"host\": \"lga\"\n"
" }\n"
" },\n"
"]",
"[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400,\n"
" \"value\": \"18\",\n"
" \"tags\": {\n"
" \"host\": \"fff\"\n"
" }\n"
" },\n"
"]",
};
int32_t ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_NE(ret, 0);
destroyRequest(request);
smlDestroyInfo(info);
}
...@@ -317,7 +317,11 @@ void buildChildTableName(RandTableName* rName) { ...@@ -317,7 +317,11 @@ void buildChildTableName(RandTableName* rName) {
for (int j = 0; j < size; ++j) { for (int j = 0; j < size; ++j) {
SSmlKv* tagKv = taosArrayGetP(rName->tags, j); SSmlKv* tagKv = taosArrayGetP(rName->tags, j);
taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen); taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen);
taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->valueLen); if(IS_VAR_DATA_TYPE(tagKv->type)){
taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->length);
}else{
taosStringBuilderAppendStringLen(&sb, (char*)(&(tagKv->value)), tagKv->length);
}
} }
size_t len = 0; size_t len = 0;
char* keyJoined = taosStringBuilderGetResult(&sb, &len); char* keyJoined = taosStringBuilderGetResult(&sb, &len);
......
...@@ -758,7 +758,7 @@ static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, voi ...@@ -758,7 +758,7 @@ static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, voi
int32_t output = 0; int32_t output = 0;
if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) { if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
char buf[512] = {0}; char buf[512] = {0};
snprintf(buf, tListLen(buf), "%s", strerror(errno)); snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
return buildSyntaxErrMsg(pMsgBuf, buf, value); return buildSyntaxErrMsg(pMsgBuf, buf, value);
} }
...@@ -1668,7 +1668,11 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD ...@@ -1668,7 +1668,11 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD
SSchema* pTagSchema = &pSchema[tags->boundColumns[i] - 1]; // colId starts with 1 SSchema* pTagSchema = &pSchema[tags->boundColumns[i] - 1]; // colId starts with 1
param.schema = pTagSchema; param.schema = pTagSchema;
SSmlKv* kv = taosArrayGetP(cols, i); SSmlKv* kv = taosArrayGetP(cols, i);
KvRowAppend(msg, kv->value, kv->valueLen, &param); if(IS_VAR_DATA_TYPE(kv->type)){
KvRowAppend(msg, kv->value, kv->length, &param);
}else{
KvRowAppend(msg, &(kv->value), kv->length, &param);
}
} }
*row = tdGetKVRowFromBuilder(tagsBuilder); *row = tdGetKVRowFromBuilder(tagsBuilder);
...@@ -1766,14 +1770,16 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols ...@@ -1766,14 +1770,16 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols
if (!kv || kv->length == 0) { if (!kv || kv->length == 0) {
MemRowAppend(&pBuf, NULL, 0, &param); MemRowAppend(&pBuf, NULL, 0, &param);
} else { } else {
int32_t colLen = pColSchema->bytes; int32_t colLen = kv->length;
if (IS_VAR_DATA_TYPE(pColSchema->type)) { if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
colLen = kv->length;
} else if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision); kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
} }
MemRowAppend(&pBuf, &(kv->value), colLen, &param); if(IS_VAR_DATA_TYPE(kv->type)){
MemRowAppend(&pBuf, kv->value, colLen, &param);
}else{
MemRowAppend(&pBuf, &(kv->value), colLen, &param);
}
} }
if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) { if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册