未验证 提交 544f1323 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #8054 from taosdata/enhance/TD-10445

[TD-10445]<enhance>: Unify OpenTSDB APIs to taos_schemaless_insert
......@@ -73,7 +73,7 @@ typedef struct cJSON
char *string;
//Keep the original string of number
char numberstring[13];
char numberstring[64];
} cJSON;
typedef struct cJSON_Hooks
......
......@@ -290,7 +290,7 @@ loop_end:
input_buffer->offset += (size_t)(after_end - number_c_string);
strncpy(item->numberstring, (const char *)number_c_string, 12);
strncpy(item->numberstring, (const char *)number_c_string, strlen((const char*)number_c_string));
return true;
}
......
......@@ -47,6 +47,12 @@ typedef enum {
SML_TIME_STAMP_NANO_SECONDS
} SMLTimeStampType;
typedef enum {
SML_LINE_PROTOCOL = 0,
SML_TELNET_PROTOCOL = 1,
SML_JSON_PROTOCOL = 2,
} SMLProtocolType;
typedef struct {
uint64_t id;
SHashObj* smlDataToSchema;
......@@ -57,7 +63,7 @@ bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info);
bool isValidInteger(char *str);
bool isValidFloat(char *str);
int32_t isValidChildTableName(const char *pTbName, int16_t len);
int32_t isValidChildTableName(const char *pTbName, int16_t len, SSmlLinesInfo* info);
bool convertSmlValueType(TAOS_SML_KV *pVal, char *value,
uint16_t len, SSmlLinesInfo* info);
......@@ -66,6 +72,11 @@ int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
void destroySmlDataPoint(TAOS_SML_DATA_POINT* point);
int taos_insert_sml_lines(TAOS* taos, char* lines[], int numLines);
int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines);
int taos_insert_json_payload(TAOS* taos, char* payload);
#ifdef __cplusplus
}
#endif
......
......@@ -17,6 +17,7 @@
#include "taos.h"
#include "tlog.h"
#include "tscUtil.h"
#include "tscParseLine.h"
#include "com_taosdata_jdbc_TSDBJNIConnector.h"
......@@ -1070,7 +1071,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(J
c_lines[i] = (char *)(*env)->GetStringUTFChars(env, line, 0);
}
int code = taos_insert_lines(taos, c_lines, numLines);
int code = taos_schemaless_insert(taos, c_lines, numLines, SML_LINE_PROTOCOL);
for (int i = 0; i < numLines; ++i) {
jstring line = (jstring)((*env)->GetObjectArrayElement(env, lines, i));
......@@ -1084,4 +1085,4 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(J
return JNI_TDENGINE_ERROR;
}
return code;
}
\ No newline at end of file
}
......@@ -1811,8 +1811,8 @@ static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **index, SHashObj *pHash
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
while (*cur != '\0') {
if (len > TSDB_COL_NAME_LEN) {
tscError("SML:0x%"PRIx64" Key field cannot exceeds 65 characters", info->id);
if (len >= TSDB_COL_NAME_LEN - 1) {
tscError("SML:0x%"PRIx64" Key field cannot exceeds %d characters", info->id, TSDB_COL_NAME_LEN - 1);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
//unescaped '=' identifies a tag key
......@@ -1898,8 +1898,8 @@ static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **index
}
while (*cur != '\0') {
if (len > TSDB_TABLE_NAME_LEN) {
tscError("SML:0x%"PRIx64" Measurement field cannot exceeds 193 characters", info->id);
if (len >= TSDB_TABLE_NAME_LEN - 1) {
tscError("SML:0x%"PRIx64" Measurement field cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1);
free(pSml->stableName);
pSml->stableName = NULL;
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
......@@ -1917,7 +1917,7 @@ static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **index
if (*cur == '\\') {
escapeSpecialCharacter(1, &cur);
}
pSml->stableName[len] = *cur;
pSml->stableName[len] = tolower(*cur);
cur++;
len++;
}
......@@ -1929,7 +1929,11 @@ static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **index
}
//Table name can only contain digits(0-9),alphebet(a-z),underscore(_)
int32_t isValidChildTableName(const char *pTbName, int16_t len) {
int32_t isValidChildTableName(const char *pTbName, int16_t len, SSmlLinesInfo* info) {
if (len > TSDB_TABLE_NAME_LEN - 1) {
tscError("SML:0x%"PRIx64" child table name cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
const char *cur = pTbName;
for (int i = 0; i < len; ++i) {
if(!isdigit(cur[i]) && !isalpha(cur[i]) && (cur[i] != '_')) {
......@@ -1975,12 +1979,13 @@ static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
}
if (!isField &&
(strcasecmp(pkv->key, "ID") == 0) && pkv->type == TSDB_DATA_TYPE_BINARY) {
ret = isValidChildTableName(pkv->value, pkv->length);
ret = isValidChildTableName(pkv->value, pkv->length, info);
if (ret) {
goto error;
}
smlData->childTableName = malloc( pkv->length + 1);
memcpy(smlData->childTableName, pkv->value, pkv->length);
strntolower_s(smlData->childTableName, smlData->childTableName, (int32_t)pkv->length);
smlData->childTableName[pkv->length] = '\0';
free(pkv->key);
free(pkv->value);
......@@ -2184,3 +2189,43 @@ cleanup:
return code;
}
/**
* taos_schemaless_insert() parse and insert data points into database according to
* different protocol.
*
* @param $lines input array may contain multiple lines, each line indicates a data point.
* If protocol=2 is used input array should contain single JSON
* string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
* multiple data points in JSON format, should include them in $JSON_string
* as a JSON array.
* @param $numLines indicates how many data points in $lines.
* If protocol = 2 is used this param will be ignored as $lines should
* contain single JSON string.
* @param $protocol indicates which protocol to use for parsing:
* 0 - influxDB line protocol
* 1 - OpenTSDB telnet line protocol
* 2 - OpenTSDB JSON format protocol
* @return return zero for successful insertion. Otherwise return none-zero error code of
* failure reason.
*
*/
int taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol) {
int code;
switch (protocol) {
case SML_LINE_PROTOCOL:
code = taos_insert_lines(taos, lines, numLines);
break;
case SML_TELNET_PROTOCOL:
code = taos_insert_telnet_lines(taos, lines, numLines);
break;
case SML_JSON_PROTOCOL:
code = taos_insert_json_payload(taos, *lines);
break;
default:
code = TSDB_CODE_TSC_INVALID_PROTOCOL_TYPE;
break;
}
return code;
}
......@@ -37,7 +37,7 @@ static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **index,
const char *cur = *index;
uint16_t len = 0;
pSml->stableName = tcalloc(TSDB_TABLE_NAME_LEN + 1, 1); // +1 to avoid 1772 line over write
pSml->stableName = tcalloc(TSDB_TABLE_NAME_LEN, 1);
if (pSml->stableName == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -48,8 +48,8 @@ static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **index,
}
while (*cur != '\0') {
if (len > TSDB_TABLE_NAME_LEN) {
tscError("OTD:0x%"PRIx64" Metric cannot exceeds 193 characters", info->id);
if (len >= TSDB_TABLE_NAME_LEN - 1) {
tscError("OTD:0x%"PRIx64" Metric cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1);
tfree(pSml->stableName);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
......@@ -62,7 +62,7 @@ static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **index,
if (*cur == '.') {
pSml->stableName[len] = '_';
} else {
pSml->stableName[len] = *cur;
pSml->stableName[len] = tolower(*cur);
}
cur++;
......@@ -171,7 +171,7 @@ static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const ch
static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **index, SHashObj *pHash, SSmlLinesInfo* info) {
const char *cur = *index;
char key[TSDB_COL_NAME_LEN + 1]; // +1 to avoid key[len] over write
char key[TSDB_COL_NAME_LEN];
uint16_t len = 0;
//key field cannot start with digit
......@@ -180,8 +180,8 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **index, SHashObj
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
while (*cur != '\0') {
if (len > TSDB_COL_NAME_LEN) {
tscError("OTD:0x%"PRIx64" Tag key cannot exceeds 65 characters", info->id);
if (len >= TSDB_COL_NAME_LEN - 1) {
tscError("OTD:0x%"PRIx64" Tag key cannot exceeds %d characters", info->id, TSDB_COL_NAME_LEN - 1);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
if (*cur == ' ') {
......@@ -276,13 +276,14 @@ static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs,
return ret;
}
if ((strcasecmp(pkv->key, "ID") == 0) && pkv->type == TSDB_DATA_TYPE_BINARY) {
ret = isValidChildTableName(pkv->value, pkv->length);
ret = isValidChildTableName(pkv->value, pkv->length, info);
if (ret) {
return ret;
}
*childTableName = malloc(pkv->length + 1);
memcpy(*childTableName, pkv->value, pkv->length);
(*childTableName)[pkv->length] = '\0';
strntolower_s(*childTableName, *childTableName, (int32_t)pkv->length);
tfree(pkv->key);
tfree(pkv->value);
} else {
......@@ -311,7 +312,7 @@ static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs,
return ret;
}
int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) {
static int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) {
const char* index = line;
int32_t ret = TSDB_CODE_SUCCESS;
......@@ -354,7 +355,7 @@ int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData, SSmlL
return TSDB_CODE_SUCCESS;
}
int32_t tscParseTelnetLines(char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) {
static int32_t tscParseTelnetLines(char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) {
for (int32_t i = 0; i < numLines; ++i) {
TAOS_SML_DATA_POINT point = {0};
int32_t code = tscParseTelnetLine(lines[i], &point, info);
......@@ -438,15 +439,15 @@ int taos_telnet_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
/* telnet style API parser */
int32_t parseMetricFromJSON(cJSON *root, TAOS_SML_DATA_POINT* pSml, SSmlLinesInfo* info) {
static int32_t parseMetricFromJSON(cJSON *root, TAOS_SML_DATA_POINT* pSml, SSmlLinesInfo* info) {
cJSON *metric = cJSON_GetObjectItem(root, "metric");
if (!cJSON_IsString(metric)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
size_t stableLen = strlen(metric->valuestring);
if (stableLen > TSDB_TABLE_NAME_LEN) {
tscError("OTD:0x%"PRIx64" Metric cannot exceeds 193 characters in JSON", info->id);
if (stableLen > TSDB_TABLE_NAME_LEN - 1) {
tscError("OTD:0x%"PRIx64" Metric cannot exceeds %d characters in JSON", info->id, TSDB_TABLE_NAME_LEN - 1);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
......@@ -462,19 +463,20 @@ int32_t parseMetricFromJSON(cJSON *root, TAOS_SML_DATA_POINT* pSml, SSmlLinesInf
}
//convert dot to underscore for now, will be removed once dot is allowed in tbname.
for (int i = 0; i < strlen(metric->valuestring); ++i) {
for (int i = 0; i < stableLen; ++i) {
if (metric->valuestring[i] == '.') {
metric->valuestring[i] = '_';
}
}
tstrncpy(pSml->stableName, metric->valuestring, stableLen + 1);
strntolower_s(pSml->stableName, pSml->stableName, (int32_t)stableLen);
return TSDB_CODE_SUCCESS;
}
int32_t parseTimestampFromJSONObj(cJSON *root, int64_t *tsVal, SSmlLinesInfo* info) {
static int32_t parseTimestampFromJSONObj(cJSON *root, int64_t *tsVal, SSmlLinesInfo* info) {
int32_t size = cJSON_GetArraySize(root);
if (size != OTD_JSON_SUB_FIELDS_NUM) {
return TSDB_CODE_TSC_INVALID_JSON;
......@@ -490,7 +492,7 @@ int32_t parseTimestampFromJSONObj(cJSON *root, int64_t *tsVal, SSmlLinesInfo* in
return TSDB_CODE_TSC_INVALID_JSON;
}
*tsVal = value->valueint;
*tsVal = strtoll(value->numberstring, NULL, 10);
//if timestamp value is 0 use current system time
if (*tsVal == 0) {
*tsVal = taosGetTimestampNs();
......@@ -526,7 +528,7 @@ int32_t parseTimestampFromJSONObj(cJSON *root, int64_t *tsVal, SSmlLinesInfo* in
return TSDB_CODE_SUCCESS;
}
int32_t parseTimestampFromJSON(cJSON *root, TAOS_SML_KV **pTS, int *num_kvs, SSmlLinesInfo* info) {
static int32_t parseTimestampFromJSON(cJSON *root, TAOS_SML_KV **pTS, int *num_kvs, SSmlLinesInfo* info) {
//Timestamp must be the first KV to parse
assert(*num_kvs == 0);
int64_t tsVal;
......@@ -538,7 +540,8 @@ int32_t parseTimestampFromJSON(cJSON *root, TAOS_SML_KV **pTS, int *num_kvs, SSm
if (timestamp->valueint == 0) {
tsVal = taosGetTimestampNs();
} else {
tsVal = convertTimePrecision(timestamp->valueint, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
tsVal = strtoll(timestamp->numberstring, NULL, 10);
tsVal = convertTimePrecision(tsVal, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
}
} else if (cJSON_IsObject(timestamp)) {
int32_t ret = parseTimestampFromJSONObj(timestamp, &tsVal, info);
......@@ -567,7 +570,7 @@ int32_t parseTimestampFromJSON(cJSON *root, TAOS_SML_KV **pTS, int *num_kvs, SSm
}
int32_t convertJSONBool(TAOS_SML_KV *pVal, char* typeStr, int64_t valueInt, SSmlLinesInfo* info) {
static int32_t convertJSONBool(TAOS_SML_KV *pVal, char* typeStr, int64_t valueInt, SSmlLinesInfo* info) {
if (strcasecmp(typeStr, "bool") != 0) {
tscError("OTD:0x%"PRIx64" invalid type(%s) for JSON Bool", info->id, typeStr);
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
......@@ -580,7 +583,7 @@ int32_t convertJSONBool(TAOS_SML_KV *pVal, char* typeStr, int64_t valueInt, SSml
return TSDB_CODE_SUCCESS;
}
int32_t convertJSONNumber(TAOS_SML_KV *pVal, char* typeStr, cJSON *value, SSmlLinesInfo* info) {
static int32_t convertJSONNumber(TAOS_SML_KV *pVal, char* typeStr, cJSON *value, SSmlLinesInfo* info) {
//tinyint
if (strcasecmp(typeStr, "i8") == 0 ||
strcasecmp(typeStr, "tinyint") == 0) {
......@@ -623,14 +626,19 @@ int32_t convertJSONNumber(TAOS_SML_KV *pVal, char* typeStr, cJSON *value, SSmlLi
//bigint
if (strcasecmp(typeStr, "i64") == 0 ||
strcasecmp(typeStr, "bigint") == 0) {
if (!IS_VALID_BIGINT(value->valueint)) {
tscError("OTD:0x%"PRIx64" JSON value(%"PRId64") cannot fit in type(bigint)", info->id, value->valueint);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_BIGINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = tcalloc(pVal->length, 1);
*(int64_t *)(pVal->value) = (int64_t)(value->valueint);
/* cJSON conversion of legit BIGINT may overflow,
* use original string to do the conversion.
*/
errno = 0;
int64_t val = (int64_t)strtoll(value->numberstring, NULL, 10);
if (errno == ERANGE || !IS_VALID_BIGINT(val)) {
tscError("OTD:0x%"PRIx64" JSON value(%s) cannot fit in type(bigint)", info->id, value->numberstring);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
*(int64_t *)(pVal->value) = val;
return TSDB_CODE_SUCCESS;
}
//float
......@@ -665,7 +673,7 @@ int32_t convertJSONNumber(TAOS_SML_KV *pVal, char* typeStr, cJSON *value, SSmlLi
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
int32_t convertJSONString(TAOS_SML_KV *pVal, char* typeStr, cJSON *value, SSmlLinesInfo* info) {
static int32_t convertJSONString(TAOS_SML_KV *pVal, char* typeStr, cJSON *value, SSmlLinesInfo* info) {
if (strcasecmp(typeStr, "binary") == 0) {
pVal->type = TSDB_DATA_TYPE_BINARY;
} else if (strcasecmp(typeStr, "nchar") == 0) {
......@@ -680,7 +688,7 @@ int32_t convertJSONString(TAOS_SML_KV *pVal, char* typeStr, cJSON *value, SSmlLi
return TSDB_CODE_SUCCESS;
}
int32_t parseValueFromJSONObj(cJSON *root, TAOS_SML_KV *pVal, SSmlLinesInfo* info) {
static int32_t parseValueFromJSONObj(cJSON *root, TAOS_SML_KV *pVal, SSmlLinesInfo* info) {
int32_t ret = TSDB_CODE_SUCCESS;
int32_t size = cJSON_GetArraySize(root);
......@@ -728,7 +736,7 @@ int32_t parseValueFromJSONObj(cJSON *root, TAOS_SML_KV *pVal, SSmlLinesInfo* inf
return TSDB_CODE_SUCCESS;
}
int32_t parseValueFromJSON(cJSON *root, TAOS_SML_KV *pVal, SSmlLinesInfo* info) {
static int32_t parseValueFromJSON(cJSON *root, TAOS_SML_KV *pVal, SSmlLinesInfo* info) {
int type = root->type;
switch (type) {
......@@ -746,7 +754,16 @@ int32_t parseValueFromJSON(cJSON *root, TAOS_SML_KV *pVal, SSmlLinesInfo* info)
pVal->type = TSDB_DATA_TYPE_BIGINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = tcalloc(pVal->length, 1);
*(int64_t *)(pVal->value) = (int64_t)(root->valuedouble);
/* cJSON conversion of legit BIGINT may overflow,
* use original string to do the conversion.
*/
errno = 0;
int64_t val = (int64_t)strtoll(root->numberstring, NULL, 10);
if (errno == ERANGE || !IS_VALID_BIGINT(val)) {
tscError("OTD:0x%"PRIx64" JSON value(%s) cannot fit in type(bigint)", info->id, root->numberstring);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
*(int64_t *)(pVal->value) = val;
} else if (isValidFloat(root->numberstring)) {
pVal->type = TSDB_DATA_TYPE_DOUBLE;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
......@@ -790,7 +807,7 @@ int32_t parseValueFromJSON(cJSON *root, TAOS_SML_KV *pVal, SSmlLinesInfo* info)
return TSDB_CODE_SUCCESS;
}
int32_t parseMetricValueFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, SSmlLinesInfo* info) {
static int32_t parseMetricValueFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, SSmlLinesInfo* info) {
//skip timestamp
TAOS_SML_KV *pVal = *pKVs + 1;
char key[] = OTD_METRIC_VALUE_COLUMN_NAME;
......@@ -813,7 +830,9 @@ int32_t parseMetricValueFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs,
}
int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, char **childTableName, SSmlLinesInfo* info) {
static int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, char **childTableName,
SHashObj *pHash, SSmlLinesInfo* info) {
int32_t ret = TSDB_CODE_SUCCESS;
cJSON *tags = cJSON_GetObjectItem(root, "tags");
......@@ -825,16 +844,19 @@ int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, char **
cJSON *id = cJSON_GetObjectItem(tags, "ID");
if (id != NULL) {
size_t idLen = strlen(id->valuestring);
ret = isValidChildTableName(id->valuestring, (int16_t)idLen);
ret = isValidChildTableName(id->valuestring, (int16_t)idLen, info);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
*childTableName = tcalloc(idLen + 1, sizeof(char));
memcpy(*childTableName, id->valuestring, idLen);
//remove all ID fields from tags list no case sensitive
while (id != NULL) {
cJSON_DeleteItemFromObject(tags, "ID");
id = cJSON_GetObjectItem(tags, "ID");
strntolower_s(*childTableName, *childTableName, (int32_t)idLen);
//check duplicate IDs
cJSON_DeleteItemFromObject(tags, "ID");
id = cJSON_GetObjectItem(tags, "ID");
if (id != NULL) {
return TSDB_CODE_TSC_DUP_TAG_NAMES;
}
}
......@@ -853,8 +875,16 @@ int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, char **
if (tag == NULL) {
return TSDB_CODE_TSC_INVALID_JSON;
}
//check duplicate keys
if (checkDuplicateKey(tag->string, pHash, info)) {
return TSDB_CODE_TSC_DUP_TAG_NAMES;
}
//key
size_t keyLen = strlen(tag->string);
if (keyLen > TSDB_COL_NAME_LEN - 1) {
tscError("OTD:0x%"PRIx64" Tag key cannot exceeds %d characters in JSON", info->id, TSDB_COL_NAME_LEN - 1);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
pkv->key = tcalloc(keyLen + 1, sizeof(char));
strncpy(pkv->key, tag->string, keyLen);
//value
......@@ -864,13 +894,14 @@ int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, char **
}
*num_kvs += 1;
pkv++;
}
return ret;
}
int32_t tscParseJSONPayload(cJSON *root, TAOS_SML_DATA_POINT* pSml, SSmlLinesInfo* info) {
static int32_t tscParseJSONPayload(cJSON *root, TAOS_SML_DATA_POINT* pSml, SSmlLinesInfo* info) {
int32_t ret = TSDB_CODE_SUCCESS;
if (!cJSON_IsObject(root)) {
......@@ -910,17 +941,20 @@ int32_t tscParseJSONPayload(cJSON *root, TAOS_SML_DATA_POINT* pSml, SSmlLinesInf
tscDebug("OTD:0x%"PRIx64" Parse metric value from JSON payload finished", info->id);
//Parse tags
ret = parseTagsFromJSON(root, &pSml->tags, &pSml->tagNum, &pSml->childTableName, info);
SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
ret = parseTagsFromJSON(root, &pSml->tags, &pSml->tagNum, &pSml->childTableName, keyHashTable, info);
if (ret) {
tscError("OTD:0x%"PRIx64" Unable to parse tags from JSON payload", info->id);
taosHashCleanup(keyHashTable);
return ret;
}
tscDebug("OTD:0x%"PRIx64" Parse tags from JSON payload finished", info->id);
taosHashCleanup(keyHashTable);
return TSDB_CODE_SUCCESS;
}
int32_t tscParseMultiJSONPayload(char* payload, SArray* points, SSmlLinesInfo* info) {
static int32_t tscParseMultiJSONPayload(char* payload, SArray* points, SSmlLinesInfo* info) {
int32_t payloadNum, ret;
ret = TSDB_CODE_SUCCESS;
......
......@@ -404,13 +404,13 @@ lines = [
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns',
'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
conn.schemaless_insert(lines, 0)
print("inserted")
lines = [
'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
conn.schemaless_insert(lines, 0)
result = conn.query("show tables")
for row in result:
......
......@@ -9,10 +9,10 @@ conn.select_db(dbname)
lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000ns',
]
conn.insert_lines(lines)
conn.schemaless_insert(lines, 0)
print("inserted")
conn.insert_lines(lines)
conn.schemaless_insert(lines, 0)
result = conn.query("show tables")
for row in result:
......
......@@ -406,13 +406,13 @@ lines = [
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns',
'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
conn.schemaless_insert(lines, 0)
print("inserted")
lines = [
'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
conn.schemaless_insert(lines, 0)
result = conn.query("show tables")
for row in result:
......
......@@ -809,40 +809,23 @@ def taos_stmt_use_result(stmt):
return result
try:
_libtaos.taos_insert_lines.restype = c_int
_libtaos.taos_insert_lines.argstype = c_void_p, c_void_p, c_int
_libtaos.taos_schemaless_insert.restype = c_int
_libtaos.taos_schemaless_insert.argstype = c_void_p, c_void_p, c_int
except AttributeError:
print("WARNING: libtaos(%s) does not support insert_lines" % taos_get_client_info())
print("WARNING: libtaos(%s) does not support schemaless_insert" % taos_get_client_info())
def taos_insert_lines(connection, lines):
def taos_schemaless_insert(connection, lines, protocol):
# type: (c_void_p, list[str] | tuple(str)) -> None
num_of_lines = len(lines)
lines = (c_char_p(line.encode("utf-8")) for line in lines)
lines_type = ctypes.c_char_p * num_of_lines
p_lines = lines_type(*lines)
errno = _libtaos.taos_insert_lines(connection, p_lines, num_of_lines)
errno = _libtaos.taos_schemaless_insert(connection, p_lines, num_of_lines, protocol)
if errno != 0:
raise LinesError("insert lines error", errno)
def taos_insert_telnet_lines(connection, lines):
# type: (c_void_p, list[str] | tuple(str)) -> None
num_of_lines = len(lines)
lines = (c_char_p(line.encode("utf-8")) for line in lines)
lines_type = ctypes.c_char_p * num_of_lines
p_lines = lines_type(*lines)
errno = _libtaos.taos_insert_telnet_lines(connection, p_lines, num_of_lines)
if errno != 0:
raise TelnetLinesError("insert telnet lines error", errno)
def taos_insert_json_payload(connection, payload):
# type: (c_void_p, list[str] | tuple(str)) -> None
payload = payload.encode("utf-8")
errno = _libtaos.taos_insert_json_payload(connection, payload)
if errno != 0:
raise JsonPayloadError("insert json payload error", errno)
raise SchemalessError("schemaless insert error", errno)
class CTaosInterface(object):
def __init__(self, config=None):
......
......@@ -117,9 +117,10 @@ class TaosConnection(object):
stream = taos_open_stream(self._conn, sql, callback, stime, param, callback2)
return TaosStream(stream)
def insert_lines(self, lines):
def schemaless_insert(self, lines, protocol):
# type: (list[str]) -> None
"""Line protocol and schemaless support
"""
1.Line protocol and schemaless support
## Example
......@@ -131,34 +132,31 @@ class TaosConnection(object):
lines = [
'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532',
]
conn.insert_lines(lines)
conn.schemaless_insert(lines, 0)
```
## Exception
```python
try:
conn.insert_lines(lines)
except SchemalessError as err:
print(err)
```
"""
return taos_insert_lines(self._conn, lines)
def insert_telnet_lines(self, lines):
"""OpenTSDB telnet style API format support
2.OpenTSDB telnet style API format support
## Example
cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"
import taos
conn = taos.connect()
conn.exec("drop database if exists test")
conn.select_db("test")
lines = [
'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"',
]
conn.schemaless_insert(lines, 1)
"""
return taos_insert_telnet_lines(self._conn, lines)
def insert_json_payload(self, payload):
"""OpenTSDB HTTP JSON format support
3.OpenTSDB HTTP JSON format support
## Example
"{
import taos
conn = taos.connect()
conn.exec("drop database if exists test")
conn.select_db("test")
payload = ['''
{
"metric": "cpu_load_0",
"timestamp": 1626006833610123,
"value": 55.5,
......@@ -168,10 +166,13 @@ class TaosConnection(object):
"interface": "eth0",
"Id": "tb0"
}
}"
}
''']
conn.schemaless_insert(lines, 2)
"""
return taos_insert_json_payload(self._conn, payload)
return taos_schemaless_insert(self._conn, lines, protocol)
def cursor(self):
# type: () -> TaosCursor
......
......@@ -80,17 +80,7 @@ class ResultError(DatabaseError):
pass
class LinesError(DatabaseError):
"""taos_insert_lines errors."""
pass
class TelnetLinesError(DatabaseError):
"""taos_insert_telnet_lines errors."""
pass
class JsonPayloadError(DatabaseError):
"""taos_insert_json_payload errors."""
class SchemalessError(DatabaseError):
"""taos_schemaless_insert errors."""
pass
......@@ -13,10 +13,10 @@ def conn():
return connect()
def test_insert_lines(conn):
def test_schemaless_insert(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_insert_lines"
dbname = "pytest_taos_schemaless_insert"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s precision 'us'" % dbname)
......@@ -27,13 +27,13 @@ def test_insert_lines(conn):
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns',
'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
conn.schemaless_insert(lines, 0)
print("inserted")
lines = [
'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
conn.schemaless_insert(lines, 0)
print("inserted")
result = conn.query("select * from st")
print(*result.fields)
......@@ -54,4 +54,4 @@ def test_insert_lines(conn):
if __name__ == "__main__":
test_insert_lines(connect())
test_schemaless_insert(connect())
......@@ -187,11 +187,7 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
DLL_EXPORT int taos_insert_lines(TAOS* taos, char* lines[], int numLines);
DLL_EXPORT int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines);
DLL_EXPORT int taos_insert_json_payload(TAOS* taos, char* payload);
DLL_EXPORT int taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol);
#ifdef __cplusplus
}
......
......@@ -112,6 +112,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_INVALID_JSON_TYPE TAOS_DEF_ERROR_CODE(0, 0x0222) //"Invalid JSON data type")
#define TSDB_CODE_TSC_INVALID_JSON_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0223) //"Invalid JSON configuration")
#define TSDB_CODE_TSC_VALUE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0224) //"Value out of range")
#define TSDB_CODE_TSC_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x0225) //"Invalid line protocol type")
// mnode
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed")
......
......@@ -120,6 +120,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON, "Invalid JSON format")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON_TYPE, "Invalid JSON data type")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON_CONFIG, "Invalid JSON configuration")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_PROTOCOL_TYPE, "Invalid line protocol type")
// mnode
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed")
......
......@@ -980,40 +980,40 @@ int32_t verify_schema_less(TAOS* taos) {
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641000000ns"
};
code = taos_insert_lines(taos, lines , sizeof(lines)/sizeof(char*));
code = taos_schemaless_insert(taos, lines , sizeof(lines)/sizeof(char*), 0);
char* lines2[] = {
"stg,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"stg,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns"
};
code = taos_insert_lines(taos, &lines2[0], 1);
code = taos_insert_lines(taos, &lines2[1], 1);
code = taos_schemaless_insert(taos, &lines2[0], 1, 0);
code = taos_schemaless_insert(taos, &lines2[1], 1, 0);
char* lines3[] = {
"sth,t1=4i64,t2=5f64,t4=5f64,ID=\"childtable\" c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641ms",
"sth,t1=4i64,t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933654ms"
};
code = taos_insert_lines(taos, lines3, 2);
code = taos_schemaless_insert(taos, lines3, 2, 0);
char* lines4[] = {
"st123456,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"dgtyqodr,t2=5f64,t3=L\"ste\" c1=tRue,c2=4i64,c3=\"iam\" 1626056811823316532ns"
};
code = taos_insert_lines(taos, lines4, 2);
code = taos_schemaless_insert(taos, lines4, 2, 0);
char* lines5[] = {
"zqlbgs,id=\"zqlbgs_39302_21680\",t0=f,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000ns",
"zqlbgs,t9=f,id=\"zqlbgs_39302_21680\",t0=f,t1=127i8,t11=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\",t10=L\"ncharTagValue\" c10=f,c0=f,c1=127i8,c12=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64,c11=L\"ncharColValue\" 1626006833639000000ns"
};
code = taos_insert_lines(taos, &lines5[0], 1);
code = taos_insert_lines(taos, &lines5[1], 1);
code = taos_schemaless_insert(taos, &lines5[0], 1, 0);
code = taos_schemaless_insert(taos, &lines5[1], 1, 0);
char* lines6[] = {
"st123456,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"dgtyqodr,t2=5f64,t3=L\"ste\" c1=tRue,c2=4i64,c3=\"iam\" 1626056811823316532ns"
};
code = taos_insert_lines(taos, lines6, 2);
code = taos_schemaless_insert(taos, lines6, 2, 0);
return (code);
}
......
......@@ -77,9 +77,9 @@ int main(int argc, char* argv[]) {
}
//shuffle(lines, numSuperTables * numChildTables * numRowsPerChildTable);
printf("%s\n", "begin taos_insert_lines");
printf("%s\n", "begin taos_schemaless_insert");
int64_t begin = getTimeInUs();
int32_t code = taos_insert_lines(taos, lines, numSuperTables * numChildTables * numRowsPerChildTable);
int32_t code = taos_schemaless_insert(taos, lines, numSuperTables * numChildTables * numRowsPerChildTable, 0);
int64_t end = getTimeInUs();
printf("code: %d, %s. time used: %"PRId64"\n", code, tstrerror(code), end-begin);
......
......@@ -33,7 +33,7 @@ class TDTestCase:
### Default format ###
### metric ###
print("============= step0 : test metric ================")
payload = '''
payload = ['''
{
"metric": ".stb.0.",
"timestamp": 1626006833610123,
......@@ -45,16 +45,16 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe _stb_0_")
tdSql.checkRows(6)
### metric value ###
print("============= step1 : test metric value types ================")
payload = '''
payload = ['''
{
"metric": "stb0_0",
"timestamp": 1626006833610123,
......@@ -66,14 +66,14 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb0_0")
tdSql.checkData(1, 1, "BIGINT")
payload = '''
payload = ['''
{
"metric": "stb0_1",
"timestamp": 1626006833610123,
......@@ -85,14 +85,14 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb0_1")
tdSql.checkData(1, 1, "BOOL")
payload = '''
payload = ['''
{
"metric": "stb0_2",
"timestamp": 1626006833610123,
......@@ -104,14 +104,14 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb0_2")
tdSql.checkData(1, 1, "BOOL")
payload = '''
payload = ['''
{
"metric": "stb0_3",
"timestamp": 1626006833610123,
......@@ -123,14 +123,14 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb0_3")
tdSql.checkData(1, 1, "BINARY")
payload = '''
payload = ['''
{
"metric": "stb0_4",
"timestamp": 1626006833610123,
......@@ -142,14 +142,14 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb0_4")
tdSql.checkData(1, 1, "DOUBLE")
payload = '''
payload = ['''
{
"metric": "stb0_5",
"timestamp": 1626006833610123,
......@@ -161,9 +161,9 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb0_5")
tdSql.checkData(1, 1, "DOUBLE")
......@@ -171,7 +171,7 @@ class TDTestCase:
print("============= step2 : test timestamp ================")
### timestamp 0 ###
payload = '''
payload = ['''
{
"metric": "stb0_6",
"timestamp": 0,
......@@ -183,37 +183,14 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
print("============= step3 : test tags ================")
### ID ###
payload = '''
{
"metric": "stb0_7",
"timestamp": 0,
"value": 123,
"tags": {
"ID": "tb0_7",
"t1": true,
"iD": "tb000",
"t2": false,
"t3": 10,
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>",
"id": "tb555"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
tdSql.query("select tbname from stb0_7")
tdSql.checkData(0, 0, "tb0_7")
### Default tag numeric types ###
payload = '''
payload = ['''
{
"metric": "stb0_8",
"timestamp": 0,
......@@ -222,14 +199,14 @@ class TDTestCase:
"t1": 123
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb0_8")
tdSql.checkData(2, 1, "BIGINT")
payload = '''
payload = ['''
{
"metric": "stb0_9",
"timestamp": 0,
......@@ -238,14 +215,14 @@ class TDTestCase:
"t1": 123.00
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb0_9")
tdSql.checkData(2, 1, "DOUBLE")
payload = '''
payload = ['''
{
"metric": "stb0_10",
"timestamp": 0,
......@@ -254,9 +231,9 @@ class TDTestCase:
"t1": 123E-1
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb0_10")
tdSql.checkData(2, 1, "DOUBLE")
......@@ -265,7 +242,7 @@ class TDTestCase:
print("============= step4 : test nested format ================")
### timestamp ###
#seconds
payload = '''
payload = ['''
{
"metric": "stb1_0",
"timestamp": {
......@@ -280,15 +257,15 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("select ts from stb1_0")
tdSql.checkData(0, 0, "2021-07-11 20:33:53.000000")
#milliseconds
payload = '''
payload = ['''
{
"metric": "stb1_1",
"timestamp": {
......@@ -303,15 +280,15 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("select ts from stb1_1")
tdSql.checkData(0, 0, "2021-07-11 20:33:53.610000")
#microseconds
payload = '''
payload = ['''
{
"metric": "stb1_2",
"timestamp": {
......@@ -326,19 +303,19 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("select ts from stb1_2")
tdSql.checkData(0, 0, "2021-07-11 20:33:53.610123")
#nanoseconds
payload = '''
payload = ['''
{
"metric": "stb1_3",
"timestamp": {
"value": 1.6260068336101233e+18,
"value": 1626006833610123321,
"type": "ns"
},
"value": 10,
......@@ -349,16 +326,16 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("select ts from stb1_3")
tdSql.checkData(0, 0, "2021-07-11 20:33:53.610123")
#now
tdSql.execute('use test')
payload = '''
payload = ['''
{
"metric": "stb1_4",
"timestamp": {
......@@ -373,12 +350,12 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
### metric value ###
payload = '''
payload = ['''
{
"metric": "stb2_0",
"timestamp": {
......@@ -396,14 +373,14 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb2_0")
tdSql.checkData(1, 1, "BOOL")
payload = '''
payload = ['''
{
"metric": "stb2_1",
"timestamp": {
......@@ -421,14 +398,14 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb2_1")
tdSql.checkData(1, 1, "TINYINT")
payload = '''
payload = ['''
{
"metric": "stb2_2",
"timestamp": {
......@@ -446,14 +423,14 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb2_2")
tdSql.checkData(1, 1, "SMALLINT")
payload = '''
payload = ['''
{
"metric": "stb2_3",
"timestamp": {
......@@ -471,14 +448,14 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb2_3")
tdSql.checkData(1, 1, "INT")
payload = '''
payload = ['''
{
"metric": "stb2_4",
"timestamp": {
......@@ -496,14 +473,14 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb2_4")
tdSql.checkData(1, 1, "BIGINT")
payload = '''
payload = ['''
{
"metric": "stb2_5",
"timestamp": {
......@@ -521,14 +498,14 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb2_5")
tdSql.checkData(1, 1, "FLOAT")
payload = '''
payload = ['''
{
"metric": "stb2_6",
"timestamp": {
......@@ -546,14 +523,14 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb2_6")
tdSql.checkData(1, 1, "DOUBLE")
payload = '''
payload = ['''
{
"metric": "stb2_7",
"timestamp": {
......@@ -571,14 +548,14 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb2_7")
tdSql.checkData(1, 1, "BINARY")
payload = '''
payload = ['''
{
"metric": "stb2_8",
"timestamp": {
......@@ -596,16 +573,16 @@ class TDTestCase:
"t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb2_8")
tdSql.checkData(1, 1, "NCHAR")
### tag value ###
payload = '''
payload = ['''
{
"metric": "stb3_0",
"timestamp": {
......@@ -655,9 +632,9 @@ class TDTestCase:
}
}
}
'''
code = self._conn.insert_json_payload(payload)
print("insert_json_payload result {}".format(code))
''']
code = self._conn.schemaless_insert(payload, 2)
print("schemaless_insert result {}".format(code))
tdSql.query("describe stb3_0")
tdSql.checkData(2, 1, "BOOL")
......
......@@ -39,8 +39,8 @@ class TDTestCase:
".stb0.3. 1626006833639000000ns 4i8 host=\"host0\" interface=\"eth0\"",
]
code = self._conn.insert_telnet_lines(lines0)
print("insert_telnet_lines result {}".format(code))
code = self._conn.schemaless_insert(lines0, 1)
print("schemaless_insert result {}".format(code))
tdSql.query("show stables")
tdSql.checkRows(4)
......@@ -68,8 +68,8 @@ class TDTestCase:
"stb1 0 6i8 host=\"host0\"",
]
code = self._conn.insert_telnet_lines(lines1)
print("insert_telnet_lines result {}".format(code))
code = self._conn.schemaless_insert(lines1, 1)
print("schemaless_insert result {}".format(code))
tdSql.query("select * from stb1")
tdSql.checkRows(6)
......@@ -82,8 +82,8 @@ class TDTestCase:
"stb2_0 1626006833651ms -127i8 host=\"host0\"",
"stb2_0 1626006833652ms 127i8 host=\"host0\""
]
code = self._conn.insert_telnet_lines(lines2_0)
print("insert_telnet_lines result {}".format(code))
code = self._conn.schemaless_insert(lines2_0, 1)
print("schemaless_insert result {}".format(code))
tdSql.query("select * from stb2_0")
tdSql.checkRows(2)
......@@ -97,8 +97,8 @@ class TDTestCase:
"stb2_1 1626006833651ms -32767i16 host=\"host0\"",
"stb2_1 1626006833652ms 32767i16 host=\"host0\""
]
code = self._conn.insert_telnet_lines(lines2_1)
print("insert_telnet_lines result {}".format(code))
code = self._conn.schemaless_insert(lines2_1, 1)
print("schemaless_insert result {}".format(code))
tdSql.query("select * from stb2_1")
tdSql.checkRows(2)
......@@ -113,8 +113,8 @@ class TDTestCase:
"stb2_2 1626006833652ms 2147483647i32 host=\"host0\""
]
code = self._conn.insert_telnet_lines(lines2_2)
print("insert_telnet_lines result {}".format(code))
code = self._conn.schemaless_insert(lines2_2, 1)
print("schemaless_insert result {}".format(code))
tdSql.query("select * from stb2_2")
tdSql.checkRows(2)
......@@ -130,8 +130,8 @@ class TDTestCase:
"stb2_3 1626006833662ms 9223372036854775807 host=\"host0\""
]
code = self._conn.insert_telnet_lines(lines2_3)
print("insert_telnet_lines result {}".format(code))
code = self._conn.schemaless_insert(lines2_3, 1)
print("schemaless_insert result {}".format(code))
tdSql.query("select * from stb2_3")
tdSql.checkRows(3)
......@@ -154,8 +154,8 @@ class TDTestCase:
"stb2_4 1626006833710ms -3.4E38f32 host=\"host0\""
]
code = self._conn.insert_telnet_lines(lines2_4)
print("insert_telnet_lines result {}".format(code))
code = self._conn.schemaless_insert(lines2_4, 1)
print("schemaless_insert result {}".format(code))
tdSql.query("select * from stb2_4")
tdSql.checkRows(10)
......@@ -179,8 +179,8 @@ class TDTestCase:
"stb2_5 1626006833710ms 3.15 host=\"host0\""
]
code = self._conn.insert_telnet_lines(lines2_5)
print("insert_telnet_lines result {}".format(code))
code = self._conn.schemaless_insert(lines2_5, 1)
print("schemaless_insert result {}".format(code))
tdSql.query("select * from stb2_5")
tdSql.checkRows(11)
......@@ -203,8 +203,8 @@ class TDTestCase:
"stb2_6 1626006833700ms FALSE host=\"host0\""
]
code = self._conn.insert_telnet_lines(lines2_6)
print("insert_telnet_lines result {}".format(code))
code = self._conn.schemaless_insert(lines2_6, 1)
print("schemaless_insert result {}".format(code))
tdSql.query("select * from stb2_6")
tdSql.checkRows(10)
......@@ -220,8 +220,8 @@ class TDTestCase:
"stb2_7 1626006833630ms \"binary_val.()[]{}<>\" host=\"host0\""
]
code = self._conn.insert_telnet_lines(lines2_7)
print("insert_telnet_lines result {}".format(code))
code = self._conn.schemaless_insert(lines2_7, 1)
print("schemaless_insert result {}".format(code))
tdSql.query("select * from stb2_7")
tdSql.checkRows(3)
......@@ -236,8 +236,8 @@ class TDTestCase:
"stb2_8 1626006833620ms L\"nchar_val数值二\" host=\"host0\""
]
code = self._conn.insert_telnet_lines(lines2_8)
print("insert_telnet_lines result {}".format(code))
code = self._conn.schemaless_insert(lines2_8, 1)
print("schemaless_insert result {}".format(code))
tdSql.query("select * from stb2_8")
tdSql.checkRows(2)
......@@ -254,8 +254,8 @@ class TDTestCase:
"stb3_0 1626006833610ms 2 t1=-127i8 t2=-32767i16 t3=-2147483647i32 t4=-9223372036854775807i64 t5=-3.4E38f32 t6=-1.7E308f64 t7=false t8=\"binary_val_2\" t9=L\"标签值2\""
]
code = self._conn.insert_telnet_lines(lines3_0)
print("insert_telnet_lines result {}".format(code))
code = self._conn.schemaless_insert(lines3_0, 1)
print("schemaless_insert result {}".format(code))
tdSql.query("select * from stb3_0")
tdSql.checkRows(2)
......@@ -298,8 +298,8 @@ class TDTestCase:
"stb3_1 1626006833610ms 3 ID=\"child_table3\" host=\"host3\""
]
code = self._conn.insert_telnet_lines(lines3_1)
print("insert_telnet_lines result {}".format(code))
code = self._conn.schemaless_insert(lines3_1, 1)
print("schemaless_insert result {}".format(code))
tdSql.query("select * from stb3_1")
tdSql.checkRows(3)
......
......@@ -42,18 +42,18 @@ class TDTestCase:
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641000000ns"
]
code = self._conn.insert_lines(lines)
print("insert_lines result {}".format(code))
code = self._conn.schemaless_insert(lines, 0)
print("schemaless_insert result {}".format(code))
lines2 = [ "stg,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"stg,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns"
]
code = self._conn.insert_lines([ lines2[0] ])
print("insert_lines result {}".format(code))
self._conn.insert_lines([ lines2[1] ])
print("insert_lines result {}".format(code))
code = self._conn.schemaless_insert([ lines2[0] ], 0)
print("schemaless_insert result {}".format(code))
self._conn.schemaless_insert([ lines2[1] ], 0)
print("schemaless_insert result {}".format(code))
tdSql.query("select * from st")
tdSql.checkRows(4)
......@@ -73,10 +73,10 @@ class TDTestCase:
tdSql.query("describe stf")
tdSql.checkData(2, 2, 14)
self._conn.insert_lines([
self._conn.schemaless_insert([
"sth,t1=4i64,t2=5f64,t4=5f64,ID=\"childtable\" c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641ms",
"sth,t1=4i64,t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933654ms"
])
"sth,t1=4i64,t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933654ms"
], 0)
tdSql.execute('reset query cache')
tdSql.query('select tbname, * from sth')
......
......@@ -14,7 +14,7 @@
import traceback
import random
import string
from taos.error import LinesError
from taos.error import SchemalessError
import datetime
import time
from copy import deepcopy
......@@ -172,28 +172,28 @@ class TDTestCase:
def perfTableInsert(self):
table_generator = self.tableGenerator()
for input_sql in table_generator:
self._conn.insert_lines([input_sql])
self._conn.schemaless_insert([input_sql], 0)
# for i in range(10):
# self._conn.insert_lines([input_sql])
# self._conn.schemaless_insert([input_sql], 0)
def perfDataInsert(self, count=4):
table_generator = self.tableGenerator(count=count)
ts = int(time.time())
for input_sql in table_generator:
print("input_sql-----------", input_sql)
self._conn.insert_lines([input_sql])
self._conn.schemaless_insert([input_sql], 0)
for i in range(100000):
ts -= 1
input_sql_new = self.replaceLastStr(input_sql, str(ts)) + 's'
print("input_sql_new---------", input_sql_new)
self._conn.insert_lines([input_sql_new])
self._conn.schemaless_insert([input_sql_new], 0)
def batchInsertTable(self, batch_list):
for insert_list in batch_list:
print(threading.current_thread().name, "length=", len(insert_list))
print(threading.current_thread().name, 'firstline', insert_list[0])
print(threading.current_thread().name, 'lastline:', insert_list[-1])
self._conn.insert_lines(insert_list)
self._conn.schemaless_insert(insert_list, 0)
print(threading.current_thread().name, 'end')
def genTableThread(self, thread_count=10):
......@@ -218,7 +218,7 @@ class TDTestCase:
def createStb(self, count=4):
input_sql = self.getPerfSql(count=count, init=True)
self._conn.insert_lines([input_sql])
self._conn.schemaless_insert([input_sql], 0)
def threadInsertTable(self, end_list, thread_count=10):
threads = list()
......@@ -238,7 +238,7 @@ class TDTestCase:
# def createTb(self, count=4):
# input_sql = self.getPerfSql(count=count)
# for i in range(10000):
# self._conn.insert_lines([input_sql])
# self._conn.schemaless_insert([input_sql], 0)
# def createTb1(self, count=4):
# start_time = time.time()
......@@ -273,8 +273,8 @@ class TDTestCase:
# def test(self):
# sql1 = 'stb,id="init",t0=14865i32,t1="tvnqbjuqck" c0=37i32,c1=217i32,c2=3i32,c3=88i32 1626006833640ms'
# sql2 = 'stb,id="init",t0=14865i32,t1="tvnqbjuqck" c0=38i32,c1=217i32,c2=3i32,c3=88i32 1626006833641ms'
# self._conn.insert_lines([sql1])
# self._conn.insert_lines([sql2])
# self._conn.schemaless_insert([sql1], 0)
# self._conn.schemaless_insert([sql2], 0)
def run(self):
print("running {}".format(__file__))
......
此差异已折叠。
......@@ -1084,7 +1084,7 @@ bool simExecuteLineInsertCmd(SScript *script, char *rest) {
simInfo("script:%s, %s", script->fileName, rest);
simLogSql(buf, true);
char * lines[] = {rest};
int32_t ret = taos_insert_lines(script->taos, lines, 1);
int32_t ret = taos_schemaless_insert(script->taos, lines, 1, 0);
if (ret == TSDB_CODE_SUCCESS) {
simDebug("script:%s, taos:%p, %s executed. success.", script->fileName, script->taos, rest);
script->linePos++;
......@@ -1107,7 +1107,7 @@ bool simExecuteLineInsertErrorCmd(SScript *script, char *rest) {
simInfo("script:%s, %s", script->fileName, rest);
simLogSql(buf, true);
char * lines[] = {rest};
int32_t ret = taos_insert_lines(script->taos, lines, 1);
int32_t ret = taos_schemaless_insert(script->taos, lines, 1, 0);
if (ret == TSDB_CODE_SUCCESS) {
sprintf(script->error, "script:%s, taos:%p, %s executed. expect failed, but success.", script->fileName, script->taos, rest);
script->linePos++;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册