未验证 提交 30ea017b 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #6861 from taosdata/feature/td-4647-2

[TD-4647] support auto adding columns through line protocol
......@@ -412,6 +412,7 @@ int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* s
int32_t tscValidateSqlInfo(SSqlObj *pSql, struct SSqlInfo *pInfo);
int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows);
extern int32_t sentinel;
extern SHashObj *tscVgroupMap;
extern SHashObj *tscTableMetaInfo;
......
......@@ -946,3 +946,34 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsI
return JNI_SUCCESS;
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(JNIEnv *env, jobject jobj,
jobjectArray lines, jlong conn) {
TAOS *taos = (TAOS *)conn;
if (taos == NULL) {
jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
int numLines = (*env)->GetArrayLength(env, lines);
char** c_lines = calloc(numLines, sizeof(char*));
for (int i = 0; i < numLines; ++i) {
jstring line = (jstring) ((*env)->GetObjectArrayElement(env, lines, i));
c_lines[i] = (char*)(*env)->GetStringUTFChars(env, line, 0);
}
int code = taos_insert_lines(taos, c_lines, numLines);
for (int i = 0; i < numLines; ++i) {
jstring line = (jstring) ((*env)->GetObjectArrayElement(env, lines, i));
(*env)->ReleaseStringUTFChars(env, line, c_lines[i]);
}
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, taos, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
return code;
}
\ No newline at end of file
......@@ -17,6 +17,7 @@
#include "tscLog.h"
#include "taos.h"
typedef struct {
char sTableName[TSDB_TABLE_NAME_LEN];
SHashObj* tagHash;
......@@ -33,7 +34,7 @@ typedef struct {
char* value;
//===================================
SSchema* schema;
size_t fieldSchemaIdx;
} TAOS_SML_KV;
typedef struct {
......@@ -48,9 +49,17 @@ typedef struct {
int fieldNum;
//================================
SSmlSTableSchema* schema;
size_t schemaIdx;
} TAOS_SML_DATA_POINT;
typedef enum {
SML_TIME_STAMP_NOW,
SML_TIME_STAMP_SECONDS,
SML_TIME_STAMP_MILLI_SECONDS,
SML_TIME_STAMP_MICRO_SECONDS,
SML_TIME_STAMP_NANO_SECONDS
} SMLTimeStampType;
//=================================================================================================
int compareSmlColKv(const void* p1, const void* p2) {
......@@ -117,10 +126,12 @@ static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) {
static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array) {
SSchema* pField = NULL;
SSchema** ppField = taosHashGet(hash, smlKv->key, strlen(smlKv->key));
size_t* pFieldIdx = taosHashGet(hash, smlKv->key, strlen(smlKv->key));
size_t fieldIdx = -1;
int32_t code = 0;
if (ppField) {
pField = *ppField;
if (pFieldIdx) {
fieldIdx = *pFieldIdx;
pField = taosArrayGet(array, fieldIdx);
if (pField->type != smlKv->type) {
tscError("type mismatch. key %s, type %d. type before %d", smlKv->key, smlKv->type, pField->type);
......@@ -149,10 +160,11 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra
field.bytes = bytes;
pField = taosArrayPush(array, &field);
taosHashPut(hash, field.name, tagKeyLen, &pField, POINTER_BYTES);
fieldIdx = taosArrayGetSize(array) - 1;
taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx));
}
smlKv->schema = pField;
smlKv->fieldSchemaIdx = fieldIdx;
return 0;
}
......@@ -165,10 +177,12 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
for (int i = 0; i < numPoint; ++i) {
TAOS_SML_DATA_POINT* point = &points[i];
size_t stableNameLen = strlen(point->stableName);
SSmlSTableSchema** ppStableSchema = taosHashGet(sname2shema, point->stableName, stableNameLen);
size_t* pStableIdx = taosHashGet(sname2shema, point->stableName, stableNameLen);
SSmlSTableSchema* pStableSchema = NULL;
if (ppStableSchema) {
pStableSchema= *ppStableSchema;
size_t stableIdx = -1;
if (pStableIdx) {
pStableSchema= taosArrayGet(stableSchemas, *pStableIdx);
stableIdx = *pStableIdx;
} else {
SSmlSTableSchema schema;
strncpy(schema.sTableName, point->stableName, stableNameLen);
......@@ -179,7 +193,8 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
pStableSchema = taosArrayPush(stableSchemas, &schema);
taosHashPut(sname2shema, schema.sTableName, stableNameLen, &pStableSchema, POINTER_BYTES);
stableIdx = taosArrayGetSize(stableSchemas) - 1;
taosHashPut(sname2shema, schema.sTableName, stableNameLen, &stableIdx, sizeof(size_t));
}
for (int j = 0; j < point->tagNum; ++j) {
......@@ -200,7 +215,7 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
}
}
point->schema = pStableSchema;
point->schemaIdx = stableIdx;
}
size_t numStables = taosArrayGetSize(stableSchemas);
......@@ -221,11 +236,12 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
return 0;
}
static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, bool isTag, char sTableName[],
static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[],
SSchemaAction* action, bool* actionNeeded) {
SSchema** ppDbAttr = taosHashGet(dbAttrHash, pointColField->name, strlen(pointColField->name));
if (ppDbAttr) {
SSchema* dbAttr = *ppDbAttr;
size_t* pDbIndex = taosHashGet(dbAttrHash, pointColField->name, strlen(pointColField->name));
if (pDbIndex) {
SSchema* dbAttr = taosArrayGet(dbAttrArray, *pDbIndex);
assert(strcasecmp(dbAttr->name, pointColField->name) == 0);
if (pointColField->type != dbAttr->type) {
tscError("point type and db type mismatch. key: %s. point type: %d, db type: %d", pointColField->name,
pointColField->type, dbAttr->type);
......@@ -437,8 +453,9 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) {
tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)+1);
field.type = tableMeta->schema[i].type;
field.bytes = tableMeta->schema[i].bytes;
SSchema* pField = taosArrayPush(schema->fields, &field);
taosHashPut(schema->fieldHash, field.name, strlen(field.name), &pField, POINTER_BYTES);
taosArrayPush(schema->fields, &field);
size_t fieldIndex = taosArrayGetSize(schema->fields) - 1;
taosHashPut(schema->fieldHash, field.name, strlen(field.name), &fieldIndex, sizeof(fieldIndex));
}
for (int i=0; i<tableMeta->tableInfo.numOfTags; ++i) {
......@@ -447,8 +464,9 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) {
tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)+1);
field.type = tableMeta->schema[j].type;
field.bytes = tableMeta->schema[j].bytes;
SSchema* pField = taosArrayPush(schema->tags, &field);
taosHashPut(schema->tagHash, field.name, strlen(field.name), &pField, POINTER_BYTES);
taosArrayPush(schema->tags, &field);
size_t tagIndex = taosArrayGetSize(schema->tags) - 1;
taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex));
}
tscDebug("load table meta succeed. %s, columns number: %d, tag number: %d, precision: %d",
tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision);
......@@ -476,6 +494,7 @@ static int32_t reconcileDBSchemas(TAOS* taos, SArray* stableSchemas) {
code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema);
if (code != 0) {
tscError("reconcile point schema failed. can not create %s", pointSchema->sTableName);
return code;
} else {
pointSchema->precision = dbSchema.precision;
destroySmlSTableSchema(&dbSchema);
......@@ -491,7 +510,7 @@ static int32_t reconcileDBSchemas(TAOS* taos, SArray* stableSchemas) {
SSchema* pointTag = taosArrayGet(pointSchema->tags, j);
SSchemaAction schemaAction = {0};
bool actionNeeded = false;
generateSchemaAction(pointTag, dbTagHash, true, pointSchema->sTableName, &schemaAction, &actionNeeded);
generateSchemaAction(pointTag, dbTagHash, dbSchema.tags, true, pointSchema->sTableName, &schemaAction, &actionNeeded);
if (actionNeeded) {
applySchemaAction(taos, &schemaAction);
}
......@@ -505,7 +524,7 @@ static int32_t reconcileDBSchemas(TAOS* taos, SArray* stableSchemas) {
SSchema* pointCol = taosArrayGet(pointSchema->fields, j);
SSchemaAction schemaAction = {0};
bool actionNeeded = false;
generateSchemaAction(pointCol, dbFieldHash, false, pointSchema->sTableName, &schemaAction, &actionNeeded);
generateSchemaAction(pointCol, dbFieldHash, dbSchema.fields,false, pointSchema->sTableName, &schemaAction, &actionNeeded);
if (actionNeeded) {
applySchemaAction(taos, &schemaAction);
}
......@@ -522,7 +541,8 @@ static int32_t reconcileDBSchemas(TAOS* taos, SArray* stableSchemas) {
return 0;
}
static int32_t getChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) {
static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) {
tscDebug("taos_sml_insert get child table name through md5");
qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);
SStringBuilder sb; memset(&sb, 0, sizeof(sb));
......@@ -592,8 +612,12 @@ static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, co
return code;
}
taos_stmt_close(stmt);
return 0;
code = taos_stmt_close(stmt);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
return code;
}
return code;
}
static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* rowsBind) {
......@@ -619,7 +643,7 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols
int32_t try = 0;
TAOS_STMT* stmt = taos_stmt_init(taos);
code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
......@@ -665,23 +689,26 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols
return code;
}
static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints, SHashObj* cname2points) {
static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints,
SHashObj* cname2points, SArray* stableSchemas) {
for (int32_t i = 0; i < numPoints; ++i) {
TAOS_SML_DATA_POINT * point = points + i;
if (!point->childTableName) {
char childTableName[TSDB_TABLE_NAME_LEN];
int32_t tableNameLen = TSDB_TABLE_NAME_LEN;
getChildTableName(point, childTableName, &tableNameLen);
getSmlMd5ChildTableName(point, childTableName, &tableNameLen);
point->childTableName = calloc(1, tableNameLen+1);
strncpy(point->childTableName, childTableName, tableNameLen);
point->childTableName[tableNameLen] = '\0';
}
SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
for (int j = 0; j < point->tagNum; ++j) {
TAOS_SML_KV* kv = point->tags + j;
if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t ts = *(int64_t*)(kv->value);
ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, point->schema->precision);
ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
*(int64_t*)(kv->value) = ts;
}
}
......@@ -690,7 +717,7 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
TAOS_SML_KV* kv = point->fields + j;
if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t ts = *(int64_t*)(kv->value);
ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, point->schema->precision);
ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
*(int64_t*)(kv->value) = ts;
}
}
......@@ -709,10 +736,12 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
return 0;
}
static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) {
static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas) {
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
true, false);
arrangePointsByChildTableName(points, numPoints, cname2points);
arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas);
int isNullColBind = TSDB_TRUE;
SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
......@@ -720,8 +749,9 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num
SArray* cTablePoints = *pCTablePoints;
TAOS_SML_DATA_POINT * point = taosArrayGetP(cTablePoints, 0);
size_t numTags = taosArrayGetSize(point->schema->tags);
size_t numCols = taosArrayGetSize(point->schema->fields);
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t numCols = taosArrayGetSize(sTableSchema->fields);
SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
taosArraySetSize(tagBinds, numTags);
......@@ -731,8 +761,7 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num
}
for (int j = 0; j < point->tagNum; ++j) {
TAOS_SML_KV* kv = point->tags + j;
size_t idx = TARRAY_ELEM_IDX(point->schema->tags, kv->schema);
TAOS_BIND* bind = taosArrayGet(tagBinds, idx);
TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
......@@ -747,14 +776,17 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num
point = taosArrayGetP(cTablePoints, i);
TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND));
if (colBinds == NULL) {
tscError("taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, "
"num of rows: %zu, num of cols: %zu", rows, numCols);
}
for (int j = 0; j < numCols; ++j) {
TAOS_BIND* bind = colBinds + j;
bind->is_null = &isNullColBind;
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* kv = point->fields + j;
size_t idx = TARRAY_ELEM_IDX(point->schema->fields, kv->schema);
TAOS_BIND* bind = colBinds + idx;
TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
......@@ -764,14 +796,21 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num
taosArrayPush(rowsBind, &colBinds);
}
creatChildTableIfNotExists(taos, point->childTableName, point->stableName, point->schema->tags, tagBinds);
code = creatChildTableIfNotExists(taos, point->childTableName, point->stableName, sTableSchema->tags, tagBinds);
if (code == 0) {
code = insertChildTableBatch(taos, point->childTableName, sTableSchema->fields, rowsBind);
if (code != 0) {
tscError("insert into child table %s failed. error %s", point->childTableName, tstrerror(code));
}
} else {
tscError("Create Child Table %s failed, error %s", point->childTableName, tstrerror(code));
}
for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
TAOS_BIND* bind = taosArrayGet(tagBinds, i);
free(bind->length);
}
taosArrayDestroy(tagBinds);
insertChildTableBatch(taos, point->childTableName, point->schema->fields, rowsBind);
for (int i = 0; i < rows; ++i) {
TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
for (int j = 0; j < numCols; ++j) {
......@@ -782,12 +821,14 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num
}
taosArrayDestroy(rowsBind);
taosArrayDestroy(cTablePoints);
if (code != 0) {
break;
}
pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
}
taosHashCleanup(cname2points);
return 0;
return code;
}
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
......@@ -808,7 +849,7 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
goto clean_up;
}
code = insertPoints(taos, points, numPoint);
code = insertPoints(taos, points, numPoint, stableSchemas);
if (code != 0) {
tscError("error insert points : %s", tstrerror(code));
}
......@@ -825,305 +866,886 @@ clean_up:
//=========================================================================
typedef enum {
LP_ITEM_TAG,
LP_ITEM_FIELD
} LPItemKind;
/* Field Escape charaters
1: measurement Comma,Space
2: tag_key, tag_value, field_key Comma,Equal Sign,Space
3: field_value Double quote,Backslash
*/
static void escapeSpecialCharacter(uint8_t field, const char **pos) {
const char *cur = *pos;
if (*cur != '\\') {
return;
}
switch (field) {
case 1:
switch (*(cur + 1)) {
case ',':
case ' ':
cur++;
break;
default:
break;
}
break;
case 2:
switch (*(cur + 1)) {
case ',':
case ' ':
case '=':
cur++;
break;
default:
break;
}
break;
case 3:
switch (*(cur + 1)) {
case '"':
case '\\':
cur++;
break;
default:
break;
}
break;
default:
break;
}
*pos = cur;
}
typedef struct {
SStrToken keyToken;
SStrToken valueToken;
static bool isValidInteger(char *str) {
char *c = str;
if (*c != '+' && *c != '-' && !isdigit(*c)) {
return false;
}
c++;
while (*c != '\0') {
if (!isdigit(*c)) {
return false;
}
c++;
}
return true;
}
char key[TSDB_COL_NAME_LEN];
int8_t type;
int16_t length;
static bool isValidFloat(char *str) {
char *c = str;
uint8_t has_dot, has_exp, has_sign;
has_dot = 0;
has_exp = 0;
has_sign = 0;
char* value;
}SLPItem;
if (*c != '+' && *c != '-' && *c != '.' && !isdigit(*c)) {
return false;
}
if (*c == '.' && isdigit(*(c + 1))) {
has_dot = 1;
}
c++;
while (*c != '\0') {
if (!isdigit(*c)) {
switch (*c) {
case '.': {
if (!has_dot && !has_exp && isdigit(*(c + 1))) {
has_dot = 1;
} else {
return false;
}
break;
}
case 'e':
case 'E': {
if (!has_exp && isdigit(*(c - 1)) &&
(isdigit(*(c + 1)) ||
*(c + 1) == '+' ||
*(c + 1) == '-')) {
has_exp = 1;
} else {
return false;
}
break;
}
case '+':
case '-': {
if (!has_sign && has_exp && isdigit(*(c + 1))) {
has_sign = 1;
} else {
return false;
}
break;
}
default: {
return false;
}
}
}
c++;
} //while
return true;
}
typedef struct {
SStrToken measToken;
SStrToken tsToken;
static bool isTinyInt(char *pVal, uint16_t len) {
if (len <= 2) {
return false;
}
if (!strcmp(&pVal[len - 2], "i8")) {
//printf("Type is int8(%s)\n", pVal);
return true;
}
return false;
}
char sTableName[TSDB_TABLE_NAME_LEN];
SArray* tags;
SArray* fields;
int64_t ts;
static bool isTinyUint(char *pVal, uint16_t len) {
if (len <= 2) {
return false;
}
if (pVal[0] == '-') {
return false;
}
if (!strcmp(&pVal[len - 2], "u8")) {
//printf("Type is uint8(%s)\n", pVal);
return true;
}
return false;
}
} SLPPoint;
static bool isSmallInt(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
}
if (!strcmp(&pVal[len - 3], "i16")) {
//printf("Type is int16(%s)\n", pVal);
return true;
}
return false;
}
typedef enum {
LP_MEASUREMENT,
LP_TAG_KEY,
LP_TAG_VALUE,
LP_FIELD_KEY,
LP_FIELD_VALUE
} LPPart;
static bool isSmallUint(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
}
if (pVal[0] == '-') {
return false;
}
if (strcmp(&pVal[len - 3], "u16") == 0) {
//printf("Type is uint16(%s)\n", pVal);
return true;
}
return false;
}
int32_t scanToCommaOrSpace(SStrToken s, int32_t start, int32_t* index, LPPart part) {
for (int32_t i = start; i < s.n; ++i) {
if (s.z[i] == ',' || s.z[i] == ' ') {
*index = i;
return 0;
}
static bool isInt(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
}
if (strcmp(&pVal[len - 3], "i32") == 0) {
//printf("Type is int32(%s)\n", pVal);
return true;
}
return -1;
return false;
}
int32_t scanToEqual(SStrToken s, int32_t start, int32_t* index) {
for (int32_t i = start; i < s.n; ++i) {
if (s.z[i] == '=') {
*index = i;
return 0;
}
static bool isUint(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
}
if (pVal[0] == '-') {
return false;
}
return -1;
if (strcmp(&pVal[len - 3], "u32") == 0) {
//printf("Type is uint32(%s)\n", pVal);
return true;
}
return false;
}
int32_t setPointMeasurement(SLPPoint* point, SStrToken token) {
point->measToken = token;
if (point->measToken.n < TSDB_TABLE_NAME_LEN) {
strncpy(point->sTableName, point->measToken.z, point->measToken.n);
point->sTableName[point->measToken.n] = '\0';
static bool isBigInt(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
}
return 0;
if (strcmp(&pVal[len - 3], "i64") == 0) {
//printf("Type is int64(%s)\n", pVal);
return true;
}
return false;
}
int32_t setItemKey(SLPItem* item, SStrToken key, LPPart part) {
item->keyToken = key;
if (item->keyToken.n < TSDB_COL_NAME_LEN) {
strncpy(item->key, item->keyToken.z, item->keyToken.n);
item->key[item->keyToken.n] = '\0';
static bool isBigUint(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
}
return 0;
if (pVal[0] == '-') {
return false;
}
if (strcmp(&pVal[len - 3], "u64") == 0) {
//printf("Type is uint64(%s)\n", pVal);
return true;
}
return false;
}
int32_t setItemValue(SLPItem* item, SStrToken value, LPPart part) {
item->valueToken = value;
return 0;
static bool isFloat(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
}
if (strcmp(&pVal[len - 3], "f32") == 0) {
//printf("Type is float(%s)\n", pVal);
return true;
}
return false;
}
int32_t parseItemValue(SLPItem* item, LPItemKind kind) {
char* sv = item->valueToken.z;
char* last = item->valueToken.z + item->valueToken.n - 1;
if (isdigit(sv[0]) || sv[0] == '-') {
if (*last == 'i') {
item->type = TSDB_DATA_TYPE_BIGINT;
item->length = (int16_t)tDataTypes[item->type].bytes;
item->value = malloc(item->length);
char* endptr = NULL;
*(int64_t*)(item->value) = strtoll(sv, &endptr, 10);
} else if (*last == 'u') {
item->type = TSDB_DATA_TYPE_UBIGINT;
item->length = (int16_t)tDataTypes[item->type].bytes;
item->value = malloc(item->length);
char* endptr = NULL;
*(uint64_t*)(item->value) = (uint64_t)strtoull(sv, &endptr, 10);
} else if (*last == 'b') {
item->type = TSDB_DATA_TYPE_TINYINT;
item->length = (int16_t)tDataTypes[item->type].bytes;
item->value = malloc(item->length);
char* endptr = NULL;
*(int8_t*)(item->value) = (int8_t)strtoll(sv, &endptr, 10);
} else if (*last == 's') {
item->type = TSDB_DATA_TYPE_SMALLINT;
item->length = (int16_t)tDataTypes[item->type].bytes;
item->value = malloc(item->length);
char* endptr = NULL;
*(int16_t*)(item->value) = (int16_t)strtoll(sv, &endptr, 10);
} else if (*last == 'w') {
item->type = TSDB_DATA_TYPE_INT;
item->length = (int16_t)tDataTypes[item->type].bytes;
item->value = malloc(item->length);
char* endptr = NULL;
*(int32_t*)(item->value) = (int32_t)strtoll(sv, &endptr, 10);
} else if (*last == 'f') {
item->type = TSDB_DATA_TYPE_FLOAT;
item->length = (int16_t)tDataTypes[item->type].bytes;
item->value = malloc(item->length);
char* endptr = NULL;
*(float*)(item->value) = (float)strtold(sv, &endptr);
} else {
item->type = TSDB_DATA_TYPE_DOUBLE;
item->length = (int16_t)tDataTypes[item->type].bytes;
item->value = malloc(item->length);
char* endptr = NULL;
*(double*)(item->value) = strtold(sv, &endptr);
}
} else if ((sv[0] == 'L' && sv[1] =='"') || sv[0] == '"' ) {
if (sv[0] == 'L') {
item->type = TSDB_DATA_TYPE_NCHAR;
uint32_t bytes = item->valueToken.n - 3;
item->length = bytes;
item->value = malloc(bytes);
memcpy(item->value, sv+2, bytes);
} else if (sv[0]=='"'){
item->type = TSDB_DATA_TYPE_BINARY;
uint32_t bytes = item->valueToken.n - 2;
item->length = bytes;
item->value = malloc(bytes);
memcpy(item->value, sv+1, bytes);
}
} else if (sv[0] == 't' || sv[0] == 'f' || sv[0]=='T' || sv[0] == 'F') {
item->type = TSDB_DATA_TYPE_BOOL;
item->length = tDataTypes[item->type].bytes;
item->value = malloc(tDataTypes[item->type].bytes);
*(uint8_t*)(item->value) = tolower(sv[0])=='t' ? TSDB_TRUE : TSDB_FALSE;
static bool isDouble(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
}
return 0;
if (strcmp(&pVal[len - 3], "f64") == 0) {
//printf("Type is double(%s)\n", pVal);
return true;
}
return false;
}
int32_t compareLPItemKey(const void* p1, const void* p2) {
const SLPItem* t1 = p1;
const SLPItem* t2 = p2;
uint32_t min = (t1->keyToken.n < t2->keyToken.n) ? t1->keyToken.n : t2->keyToken.n;
int res = strncmp(t1->keyToken.z, t2->keyToken.z, min);
if (res != 0) {
return res;
} else {
return (int)(t1->keyToken.n) - (int)(t2->keyToken.n);
static bool isBool(char *pVal, uint16_t len, bool *bVal) {
if ((len == 1) &&
(pVal[len - 1] == 't' ||
pVal[len - 1] == 'T')) {
//printf("Type is bool(%c)\n", pVal[len - 1]);
*bVal = true;
return true;
}
if ((len == 1) &&
(pVal[len - 1] == 'f' ||
pVal[len - 1] == 'F')) {
//printf("Type is bool(%c)\n", pVal[len - 1]);
*bVal = false;
return true;
}
if((len == 4) &&
(!strcmp(&pVal[len - 4], "true") ||
!strcmp(&pVal[len - 4], "True") ||
!strcmp(&pVal[len - 4], "TRUE"))) {
//printf("Type is bool(%s)\n", &pVal[len - 4]);
*bVal = true;
return true;
}
if((len == 5) &&
(!strcmp(&pVal[len - 5], "false") ||
!strcmp(&pVal[len - 5], "False") ||
!strcmp(&pVal[len - 5], "FALSE"))) {
//printf("Type is bool(%s)\n", &pVal[len - 5]);
*bVal = false;
return true;
}
return false;
}
int32_t setPointTimeStamp(SLPPoint* point, SStrToken tsToken) {
point->tsToken = tsToken;
return 0;
static bool isBinary(char *pVal, uint16_t len) {
//binary: "abc"
if (len < 2) {
return false;
}
//binary
if (pVal[0] == '"' && pVal[len - 1] == '"') {
//printf("Type is binary(%s)\n", pVal);
return true;
}
return false;
}
static bool isNchar(char *pVal, uint16_t len) {
//nchar: L"abc"
if (len < 3) {
return false;
}
if (pVal[0] == 'L' && pVal[1] == '"' && pVal[len - 1] == '"') {
//printf("Type is nchar(%s)\n", pVal);
return true;
}
return false;
}
static bool isTimeStamp(char *pVal, uint16_t len, SMLTimeStampType *tsType) {
if (len == 0) {
return true;
}
if ((len == 1) && pVal[0] == '0') {
*tsType = SML_TIME_STAMP_NOW;
//printf("Type is timestamp(%s)\n", pVal);
return true;
}
if (len < 2) {
return false;
}
//No appendix use usec as default
if (isdigit(pVal[len - 1]) && isdigit(pVal[len - 2])) {
*tsType = SML_TIME_STAMP_MICRO_SECONDS;
//printf("Type is timestamp(%s)\n", pVal);
return true;
}
if (pVal[len - 1] == 's') {
switch (pVal[len - 2]) {
case 'm':
*tsType = SML_TIME_STAMP_MILLI_SECONDS;
break;
case 'u':
*tsType = SML_TIME_STAMP_MICRO_SECONDS;
break;
case 'n':
*tsType = SML_TIME_STAMP_NANO_SECONDS;
break;
default:
if (isdigit(pVal[len - 2])) {
*tsType = SML_TIME_STAMP_SECONDS;
break;
} else {
return false;
}
}
//printf("Type is timestamp(%s)\n", pVal);
return true;
}
return false;
}
//len does not include '\0' from value.
static bool convertSmlValueType(TAOS_SML_KV *pVal, char *value,
uint16_t len) {
if (len <= 0) {
return false;
}
//integer number
if (isTinyInt(value, len)) {
pVal->type = TSDB_DATA_TYPE_TINYINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 2] = '\0';
if (!isValidInteger(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
int8_t val = (int8_t)strtoll(value, NULL, 10);
memcpy(pVal->value, &val, pVal->length);
return true;
}
if (isTinyUint(value, len)) {
pVal->type = TSDB_DATA_TYPE_UTINYINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 2] = '\0';
if (!isValidInteger(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
uint8_t val = (uint8_t)strtoul(value, NULL, 10);
memcpy(pVal->value, &val, pVal->length);
return true;
}
if (isSmallInt(value, len)) {
pVal->type = TSDB_DATA_TYPE_SMALLINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!isValidInteger(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
int16_t val = (int16_t)strtoll(value, NULL, 10);
memcpy(pVal->value, &val, pVal->length);
return true;
}
if (isSmallUint(value, len)) {
pVal->type = TSDB_DATA_TYPE_USMALLINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!isValidInteger(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
uint16_t val = (uint16_t)strtoul(value, NULL, 10);
memcpy(pVal->value, &val, pVal->length);
//memcpy(pVal->value, &val, pVal->length);
return true;
}
if (isInt(value, len)) {
pVal->type = TSDB_DATA_TYPE_INT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!isValidInteger(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
int32_t val = (int32_t)strtoll(value, NULL, 10);
memcpy(pVal->value, &val, pVal->length);
return true;
}
if (isUint(value, len)) {
pVal->type = TSDB_DATA_TYPE_UINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!isValidInteger(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
uint32_t val = (uint32_t)strtoul(value, NULL, 10);
memcpy(pVal->value, &val, pVal->length);
return true;
}
if (isBigInt(value, len)) {
pVal->type = TSDB_DATA_TYPE_BIGINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!isValidInteger(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
int64_t val = (int64_t)strtoll(value, NULL, 10);
memcpy(pVal->value, &val, pVal->length);
return true;
}
if (isBigUint(value, len)) {
pVal->type = TSDB_DATA_TYPE_UBIGINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!isValidInteger(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
uint64_t val = (uint64_t)strtoul(value, NULL, 10);
memcpy(pVal->value, &val, pVal->length);
return true;
}
//floating number
if (isFloat(value, len)) {
pVal->type = TSDB_DATA_TYPE_FLOAT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!isValidFloat(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
float val = (float)strtold(value, NULL);
memcpy(pVal->value, &val, pVal->length);
return true;
}
if (isDouble(value, len)) {
pVal->type = TSDB_DATA_TYPE_DOUBLE;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!isValidFloat(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
double val = (double)strtold(value, NULL);
memcpy(pVal->value, &val, pVal->length);
return true;
}
//binary
if (isBinary(value, len)) {
pVal->type = TSDB_DATA_TYPE_BINARY;
pVal->length = len - 2;
pVal->value = calloc(pVal->length, 1);
//copy after "
memcpy(pVal->value, value + 1, pVal->length);
return true;
}
//nchar
if (isNchar(value, len)) {
pVal->type = TSDB_DATA_TYPE_NCHAR;
pVal->length = len - 3;
pVal->value = calloc(pVal->length, 1);
//copy after L"
memcpy(pVal->value, value + 2, pVal->length);
return true;
}
//bool
bool bVal;
if (isBool(value, len, &bVal)) {
pVal->type = TSDB_DATA_TYPE_BOOL;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = calloc(pVal->length, 1);
memcpy(pVal->value, &bVal, pVal->length);
return true;
}
//Handle default(no appendix) as float
if (isValidInteger(value) || isValidFloat(value)) {
pVal->type = TSDB_DATA_TYPE_FLOAT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = calloc(pVal->length, 1);
float val = (float)strtold(value, NULL);
memcpy(pVal->value, &val, pVal->length);
return true;
}
return false;
}
int32_t parsePointTime(SLPPoint* point) {
if (point->tsToken.n <= 0) {
point->ts = taosGetTimestampNs();
static int32_t getTimeStampValue(char *value, uint16_t len,
SMLTimeStampType type, int64_t *ts) {
if (len >= 2) {
for (int i = 0; i < len - 2; ++i) {
if(!isdigit(value[i])) {
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
}
}
//No appendix or no timestamp given (len = 0)
if (len >= 1 && isdigit(value[len - 1]) && type != SML_TIME_STAMP_NOW) {
type = SML_TIME_STAMP_MICRO_SECONDS;
}
if (len != 0) {
*ts = (int64_t)strtoll(value, NULL, 10);
} else {
char* endptr = NULL;
point->ts = strtoll(point->tsToken.z, &endptr, 10);
char* last = point->tsToken.z + point->tsToken.n - 1;
if (*last == 's') {
point->ts *= (int64_t)1e9;
} else if (*last == 'a') {
point->ts *= (int64_t)1e6;
} else if (*last == 'u') {
point->ts *= (int64_t)1e3;
} else if (*last == 'b') {
point->ts *= 1;
type = SML_TIME_STAMP_NOW;
}
switch (type) {
case SML_TIME_STAMP_NOW: {
*ts = taosGetTimestampNs();
break;
}
case SML_TIME_STAMP_SECONDS: {
*ts = (int64_t)(*ts * 1e9);
break;
}
case SML_TIME_STAMP_MILLI_SECONDS: {
*ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_NANO);
break;
}
case SML_TIME_STAMP_MICRO_SECONDS: {
*ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
break;
}
case SML_TIME_STAMP_NANO_SECONDS: {
*ts = *ts * 1;
break;
}
default: {
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
}
return 0;
return TSDB_CODE_SUCCESS;
}
int32_t tscParseLine(SStrToken line, SLPPoint* point) {
int32_t pos = 0;
static int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
uint16_t len) {
int32_t ret;
SMLTimeStampType type;
int64_t tsVal;
int32_t start = 0;
int32_t err = scanToCommaOrSpace(line, start, &pos, LP_MEASUREMENT);
if (err != 0) {
tscError("a");
return err;
if (!isTimeStamp(value, len, &type)) {
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
SStrToken measurement = {.z = line.z+start, .n = pos-start};
setPointMeasurement(point, measurement);
point->tags = taosArrayInit(64, sizeof(SLPItem));
start = pos;
while (line.z[start] == ',') {
SLPItem item;
ret = getTimeStampValue(value, len, type, &tsVal);
if (ret) {
return ret;
}
tscDebug("Timestamp after conversion:%"PRId64, tsVal);
start++;
err = scanToEqual(line, start, &pos);
if (err != 0) {
tscError("b");
goto error;
pVal->type = TSDB_DATA_TYPE_TIMESTAMP;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = calloc(pVal->length, 1);
memcpy(pVal->value, &tsVal, pVal->length);
return TSDB_CODE_SUCCESS;
}
static int32_t parseSmlTimeStamp(TAOS_SML_KV **pTS, const char **index) {
const char *start, *cur;
int32_t ret = TSDB_CODE_SUCCESS;
int len = 0;
char key[] = "_ts";
char *value = NULL;
start = cur = *index;
*pTS = calloc(1, sizeof(TAOS_SML_KV));
while(*cur != '\0') {
cur++;
len++;
}
if (len > 0) {
value = calloc(len + 1, 1);
memcpy(value, start, len);
}
ret = convertSmlTimeStamp(*pTS, value, len);
if (ret) {
free(value);
free(*pTS);
return ret;
}
free(value);
(*pTS)->key = calloc(sizeof(key), 1);
memcpy((*pTS)->key, key, sizeof(key));
return ret;
}
static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **index) {
const char *cur = *index;
char key[TSDB_COL_NAME_LEN];
uint16_t len = 0;
//key field cannot start with digit
if (isdigit(*cur)) {
tscError("Tag key cannnot start with digit\n");
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
while (*cur != '\0') {
if (len > TSDB_COL_NAME_LEN) {
tscDebug("Key field cannot exceeds 65 characters");
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
//unescaped '=' identifies a tag key
if (*cur == '=' && *(cur - 1) != '\\') {
break;
}
//Escape special character
if (*cur == '\\') {
escapeSpecialCharacter(2, &cur);
}
key[len] = *cur;
cur++;
len++;
}
key[len] = '\0';
pKV->key = calloc(len + 1, 1);
memcpy(pKV->key, key, len + 1);
//tscDebug("Key:%s|len:%d", pKV->key, len);
*index = cur + 1;
return TSDB_CODE_SUCCESS;
}
SStrToken tagKey = {.z = line.z + start, .n = pos-start};
setItemKey(&item, tagKey, LP_TAG_KEY);
start = pos + 1;
err = scanToCommaOrSpace(line, start, &pos, LP_TAG_VALUE);
if (err != 0) {
tscError("c");
goto error;
static bool parseSmlValue(TAOS_SML_KV *pKV, const char **index,
bool *is_last_kv) {
const char *start, *cur;
char *value = NULL;
uint16_t len = 0;
start = cur = *index;
while (1) {
// unescaped ',' or ' ' or '\0' identifies a value
if ((*cur == ',' || *cur == ' ' || *cur == '\0') && *(cur - 1) != '\\') {
//unescaped ' ' or '\0' indicates end of value
*is_last_kv = (*cur == ' ' || *cur == '\0') ? true : false;
break;
}
//Escape special character
if (*cur == '\\') {
escapeSpecialCharacter(2, &cur);
}
cur++;
len++;
}
SStrToken tagValue = {.z = line.z + start, .n = pos-start};
setItemValue(&item, tagValue, LP_TAG_VALUE);
value = calloc(len + 1, 1);
memcpy(value, start, len);
value[len] = '\0';
if (!convertSmlValueType(pKV, value, len)) {
//free previous alocated key field
free(pKV->key);
free(value);
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
free(value);
parseItemValue(&item, LP_ITEM_TAG);
taosArrayPush(point->tags, &item);
*index = (*cur == '\0') ? cur : cur + 1;
return TSDB_CODE_SUCCESS;
}
start = pos;
static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **index,
uint8_t *has_tags) {
const char *cur = *index;
uint16_t len = 0;
pSml->stableName = calloc(TSDB_TABLE_NAME_LEN, 1);
if (isdigit(*cur)) {
tscError("Measurement field cannnot start with digit");
free(pSml->stableName);
pSml->stableName = NULL;
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
taosArraySort(point->tags, compareLPItemKey);
while (*cur != '\0') {
if (len > TSDB_TABLE_NAME_LEN) {
tscError("Measurement field cannot exceeds 193 characters");
free(pSml->stableName);
pSml->stableName = NULL;
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
//first unescaped comma or space identifies measurement
//if space detected first, meaning no tag in the input
if (*cur == ',' && *(cur - 1) != '\\') {
*has_tags = 1;
break;
}
if (*cur == ' ' && *(cur - 1) != '\\') {
break;
}
//Comma, Space, Backslash needs to be escaped if any
if (*cur == '\\') {
escapeSpecialCharacter(1, &cur);
}
pSml->stableName[len] = *cur;
cur++;
len++;
}
pSml->stableName[len] = '\0';
*index = cur + 1;
tscDebug("Stable name in measurement:%s|len:%d", pSml->stableName, len);
point->fields = taosArrayInit(64, sizeof(SLPItem));
return TSDB_CODE_SUCCESS;
}
start++;
do {
SLPItem item;
static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
const char **index, bool isField, TAOS_SML_DATA_POINT* smlData) {
const char *cur = *index;
int32_t ret = TSDB_CODE_SUCCESS;
TAOS_SML_KV *pkv;
bool is_last_kv = false;
int32_t capacity = 0;
if (isField) {
capacity = 64;
*pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
// leave space for timestamp;
pkv = *pKVs;
pkv++;
} else {
capacity = 8;
*pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
pkv = *pKVs;
}
err = scanToEqual(line, start, &pos);
if (err != 0) {
while (*cur != '\0') {
ret = parseSmlKey(pkv, &cur);
if (ret) {
tscError("Unable to parse key field");
goto error;
}
SStrToken fieldKey = {.z = line.z + start, .n = pos- start};
setItemKey(&item, fieldKey, LP_FIELD_KEY);
start = pos + 1;
err = scanToCommaOrSpace(line, start, &pos, LP_FIELD_VALUE);
if (err != 0) {
ret = parseSmlValue(pkv, &cur, &is_last_kv);
if (ret) {
tscError("Unable to parse value field");
goto error;
}
SStrToken fieldValue = {.z = line.z + start, .n = pos - start};
setItemValue(&item, fieldValue, LP_TAG_VALUE);
parseItemValue(&item, LP_ITEM_FIELD);
taosArrayPush(point->fields, &item);
start = pos + 1;
} while (line.z[pos] == ',');
if (!isField &&
(strcasecmp(pkv->key, "ID") == 0) && pkv->type == TSDB_DATA_TYPE_BINARY) {
smlData->childTableName = malloc( pkv->length + 1);
memcpy(smlData->childTableName, pkv->value, pkv->length);
smlData->childTableName[pkv->length] = '\0';
free(pkv->key);
free(pkv->value);
} else {
*num_kvs += 1;
}
if (is_last_kv) {
//tscDebug("last key-value field detected");
goto done;
}
taosArraySort(point->fields, compareLPItemKey);
//reallocate addtional memory for more kvs
TAOS_SML_KV *more_kvs = NULL;
SStrToken tsToken = {.z = line.z+start, .n = line.n-start};
setPointTimeStamp(point, tsToken);
parsePointTime(point);
if (isField) {
if ((*num_kvs + 2) > capacity) {
capacity *= 3; capacity /= 2;
more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
} else {
more_kvs = *pKVs;
}
} else {
if ((*num_kvs + 1) > capacity) {
capacity *= 3; capacity /= 2;
more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
} else {
more_kvs = *pKVs;
}
}
if (!more_kvs) {
goto error;
}
*pKVs = more_kvs;
//move pKV points to next TAOS_SML_KV block
if (isField) {
pkv = *pKVs + *num_kvs + 1;
} else {
pkv = *pKVs + *num_kvs;
}
}
goto done;
error:
// free array
return err;
done:
return 0;
error:
return ret;
done:
*index = cur;
return ret;
}
static void moveTimeStampToFirstKv(TAOS_SML_DATA_POINT** smlData, TAOS_SML_KV *ts) {
TAOS_SML_KV* tsField = (*smlData)->fields;
tsField->length = ts->length;
tsField->type = ts->type;
tsField->value = malloc(ts->length);
tsField->key = malloc(strlen(ts->key) + 1);
memcpy(tsField->key, ts->key, strlen(ts->key) + 1);
memcpy(tsField->value, ts->value, ts->length);
(*smlData)->fieldNum = (*smlData)->fieldNum + 1;
free(ts->key);
free(ts->value);
free(ts);
}
int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData) {
const char* index = sql;
int32_t ret = TSDB_CODE_SUCCESS;
uint8_t has_tags = 0;
TAOS_SML_KV *timestamp = NULL;
int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) {
for (int32_t i = 0; i < numLines; ++i) {
SStrToken tkLine = {.z = lines[i], .n = (uint32_t)strlen(lines[i])};
SLPPoint point;
tscParseLine(tkLine, &point);
taosArrayPush(points, &point);
ret = parseSmlMeasurement(smlData, &index, &has_tags);
if (ret) {
tscError("Unable to parse measurement");
return ret;
}
return 0;
}
tscDebug("Parse measurement finished, has_tags:%d", has_tags);
//Parse Tags
if (has_tags) {
ret = parseSmlKvPairs(&smlData->tags, &smlData->tagNum, &index, false, smlData);
if (ret) {
tscError("Unable to parse tag");
return ret;
}
}
tscDebug("Parse tags finished, num of tags:%d", smlData->tagNum);
void destroyLPPoint(void* p) {
SLPPoint* lpPoint = p;
for (int i=0; i<taosArrayGetSize(lpPoint->fields); ++i) {
SLPItem* item = taosArrayGet(lpPoint->fields, i);
free(item->value);
//Parse fields
ret = parseSmlKvPairs(&smlData->fields, &smlData->fieldNum, &index, true, smlData);
if (ret) {
tscError("Unable to parse field");
return ret;
}
taosArrayDestroy(lpPoint->fields);
tscDebug("Parse fields finished, num of fields:%d", smlData->fieldNum);
for (int i=0; i<taosArrayGetSize(lpPoint->tags); ++i) {
SLPItem* item = taosArrayGet(lpPoint->tags, i);
free(item->value);
//Parse timestamp
ret = parseSmlTimeStamp(&timestamp, &index);
if (ret) {
tscError("Unable to parse timestamp");
return ret;
}
taosArrayDestroy(lpPoint->tags);
moveTimeStampToFirstKv(&smlData, timestamp);
tscDebug("Parse timestamp finished");
return TSDB_CODE_SUCCESS;
}
//=========================================================================
void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) {
for (int i=0; i<point->tagNum; ++i) {
free((point->tags+i)->key);
......@@ -1139,76 +1761,67 @@ void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) {
free(point->childTableName);
}
int taos_insert_lines(TAOS* taos, char* lines[], int numLines) {
SArray* lpPoints = taosArrayInit(numLines, sizeof(SLPPoint));
tscParseLines(lines, numLines, lpPoints, NULL);
size_t numPoints = taosArrayGetSize(lpPoints);
TAOS_SML_DATA_POINT* points = calloc(numPoints, sizeof(TAOS_SML_DATA_POINT));
for (int i = 0; i < numPoints; ++i) {
SLPPoint* lpPoint = taosArrayGet(lpPoints, i);
TAOS_SML_DATA_POINT* point = points+i;
point->stableName = calloc(1, strlen(lpPoint->sTableName)+1);
strncpy(point->stableName, lpPoint->sTableName, strlen(lpPoint->sTableName));
point->stableName[strlen(lpPoint->sTableName)] = '\0';
size_t lpTagSize = taosArrayGetSize(lpPoint->tags);
point->tags = calloc(lpTagSize, sizeof(TAOS_SML_KV));
point->tagNum = (int)lpTagSize;
for (int j=0; j<lpTagSize; ++j) {
SLPItem* lpTag = taosArrayGet(lpPoint->tags, j);
TAOS_SML_KV* tagKv = point->tags + j;
int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) {
for (int32_t i = 0; i < numLines; ++i) {
TAOS_SML_DATA_POINT point = {0};
int32_t code = tscParseLine(lines[i], &point);
if (code != TSDB_CODE_SUCCESS) {
tscError("data point line parse failed. line %d : %s", i, lines[i]);
destroySmlDataPoint(&point);
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
} else {
tscDebug("data point line parse success. line %d", i);
}
size_t kenLen = strlen(lpTag->key);
tagKv->key = calloc(1, kenLen+1);
strncpy(tagKv->key, lpTag->key, kenLen);
tagKv->key[kenLen] = '\0';
taosArrayPush(points, &point);
}
return 0;
}
tagKv->type = lpTag->type;
tagKv->length = lpTag->length;
tagKv->value = malloc(tagKv->length);
memcpy(tagKv->value, lpTag->value, tagKv->length);
}
int taos_insert_lines(TAOS* taos, char* lines[], int numLines) {
int32_t code = 0;
size_t lpFieldsSize = taosArrayGetSize(lpPoint->fields);
point->fields = calloc(lpFieldsSize + 1, sizeof(TAOS_SML_KV));
point->fieldNum = (int)(lpFieldsSize + 1);
if (numLines <= 0 || numLines > 65536) {
tscError("taos_insert_lines numLines should be between 1 and 65536. numLines: %d", numLines);
code = TSDB_CODE_TSC_APP_ERROR;
return code;
}
TAOS_SML_KV* tsField = point->fields + 0;
char tsKey[256];
snprintf(tsKey, 256, "_%s_ts", point->stableName);
size_t tsKeyLen = strlen(tsKey);
tsField->key = calloc(1, tsKeyLen+1);
strncpy(tsField->key, tsKey, tsKeyLen);
tsField->key[tsKeyLen] = '\0';
tsField->type = TSDB_DATA_TYPE_TIMESTAMP;
tsField->length = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
tsField->value = malloc(tsField->length);
memcpy(tsField->value, &(lpPoint->ts), tsField->length);
for (int i = 0; i < numLines; ++i) {
if (lines[i] == NULL) {
tscError("taos_insert_lines line %d is NULL", i);
code = TSDB_CODE_TSC_APP_ERROR;
return code;
}
}
for (int j=0; j<lpFieldsSize; ++j) {
SLPItem* lpField = taosArrayGet(lpPoint->fields, j);
TAOS_SML_KV* fieldKv = point->fields + j + 1;
SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT));
if (lpPoints == NULL) {
tscError("taos_insert_lines failed to allocate memory");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
size_t kenLen = strlen(lpField->key);
fieldKv->key = calloc(1, kenLen+1);
strncpy(fieldKv->key, lpField->key, kenLen);
fieldKv->key[kenLen] = '\0';
tscDebug("taos_insert_lines begin inserting %d lines, first line: %s", numLines, lines[0]);
code = tscParseLines(lines, numLines, lpPoints, NULL);
size_t numPoints = taosArrayGetSize(lpPoints);
fieldKv->type = lpField->type;
fieldKv->length = lpField->length;
fieldKv->value = malloc(fieldKv->length);
memcpy(fieldKv->value, lpField->value, fieldKv->length);
}
if (code != 0) {
goto cleanup;
}
taos_sml_insert(taos, points, (int)numPoints);
TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
code = taos_sml_insert(taos, points, (int)numPoints);
if (code != 0) {
tscError("taos_sml_insert error: %s", tstrerror((code)));
}
cleanup:
tscDebug("taos_insert_lines finish inserting %d lines. code: %d", numLines, code);
for (int i=0; i<numPoints; ++i) {
destroySmlDataPoint(points+i);
}
free(points);
taosArrayDestroyEx(lpPoints, destroyLPPoint);
return 0;
taosArrayDestroy(lpPoints);
return code;
}
......@@ -1617,6 +1617,8 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
(*t1)->prevTS = INT64_MIN;
}
tsSetBlockInfo(pBlk, (*t1)->pTableMeta, pBlk->numOfRows);
taosHashPut(pCmd->insertParam.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)t1, POINTER_BYTES);
tscDebug("0x%"PRIx64" table:%s is already prepared, uid:%" PRIu64, pSql->self, name, pStmt->mtb.currentUid);
......
......@@ -348,4 +348,13 @@ public class TSDBJNIConnector {
}
private native int closeStmt(long stmt, long con);
public void insertLines(String[] lines) throws SQLException {
int code = insertLinesImp(lines, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to insertLines");
}
}
private native int insertLinesImp(String[] lines, long conn);
}
......@@ -114,6 +114,10 @@ public class TSDBJNIConnectorTest {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_RESULT_SET_NULL);
}
// close statement
connector.executeQuery("use d");
String[] lines = new String[] {"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns"};
connector.insertLines(lines);
// close connection
connector.closeConnection();
......
......@@ -403,6 +403,20 @@ class CTaosInterface(object):
"""
return CTaosInterface.libtaos.taos_affected_rows(result)
@staticmethod
def insertLines(connection, lines):
'''
insert through lines protocol
@lines: list of str
@rtype: tsdb error codes
'''
numLines = len(lines)
c_lines_type = ctypes.c_char_p*numLines
c_lines = c_lines_type()
for i in range(numLines):
c_lines[i] = ctypes.c_char_p(lines[i].encode('utf-8'))
return CTaosInterface.libtaos.taos_insert_lines(connection, c_lines, ctypes.c_int(numLines))
@staticmethod
def subscribe(connection, restart, topic, sql, interval):
"""Create a subscription
......
......@@ -66,6 +66,14 @@ class TDengineConnection(object):
self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub)
def insertLines(self, lines):
"""
insert lines through line protocol
"""
if self._conn is None:
return None
return CTaosInterface.insertLines(self._conn, lines)
def cursor(self):
"""Return a new Cursor object using the connection.
"""
......
......@@ -101,6 +101,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0218) //"Table does not exist")
#define TSDB_CODE_TSC_EXCEED_SQL_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0219) //"SQL statement too long check maxSQLLength config")
#define TSDB_CODE_TSC_FILE_EMPTY TAOS_DEF_ERROR_CODE(0, 0x021A) //"File is empty")
#define TSDB_CODE_TSC_LINE_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x021B) //"Syntax error in Line")
// mnode
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed")
......
......@@ -5,6 +5,8 @@ IF (TD_LINUX)
AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(demo apitest.c)
TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread )
ADD_EXECUTABLE(sml schemaless.c)
TARGET_LINK_LIBRARIES(sml taos_static trpc tutil pthread )
ADD_EXECUTABLE(subscribe subscribe.c)
TARGET_LINK_LIBRARIES(subscribe taos_static trpc tutil pthread )
ADD_EXECUTABLE(epoll epoll.c)
......
......@@ -964,21 +964,31 @@ int32_t verify_schema_less(TAOS* taos) {
usleep(100000);
char* lines[] = {
"st,t1=3i,t2=4,t3=\"t3\" c1=3i,c3=L\"passit\",c2=false,c4=4 1626006833639000000",
"st,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin\",c2=true,c4=5,c5=5 1626006833640000000",
"ste,t2=5,t3=L\"ste\" c1=true,c2=4,c3=\"iam\" 1626056811823316532",
"st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833642000000",
"ste,t2=5,t3=L\"ste2\" c3=\"iamszhou\",c4=false 1626056811843316532",
"ste,t2=5,t3=L\"ste2\" c3=\"iamszhou\",c4=false,c5=32b,c6=64s,c7=32w,c8=88.88f 1626056812843316532",
"st,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin\",c2=true,c4=5,c5=5,c6=7u 1626006933640000000",
"stf,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin\",c2=true,c4=5,c5=5,c6=7u 1626006933640000000",
"stf,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin_stf\",c2=false,c5=5,c6=7u 1626006933641a"
"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns",
"ste,t2=5f64,t3=L\"ste\" c1=true,c2=4i64,c3=\"iam\" 1626056811823316532ns",
"st,t1=4i64,t2=5f64,t3=\"t4\" c1=3i64,c3=L\"passitagain\",c2=true,c4=5f64 1626006833642000000ns",
"ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false 1626056811843316532ns",
"ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false,c5=32i8,c6=64i16,c7=32i32,c8=88.88f32 1626056812843316532ns",
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns",
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns",
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641000000ns"
};
// int code = taos_insert_lines(taos, lines , sizeof(lines)/sizeof(char*));
int code = taos_insert_lines(taos, &lines[0], 1);
code = taos_insert_lines(taos, &lines[1], 1);
int code = 0;
code = taos_insert_lines(taos, lines , sizeof(lines)/sizeof(char*));
char* lines2[] = {
"stg,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"stg,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns"
};
code = taos_insert_lines(taos, &lines2[0], 1);
code = taos_insert_lines(taos, &lines2[1], 1);
char* lines3[] = {
"sth,t1=4i64,t2=5f64,t4=5f64,ID=\"childtable\" c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641ms",
"sth,t1=4i64,t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933654ms"
};
code = taos_insert_lines(taos, lines3, 2);
return code;
}
......@@ -1000,10 +1010,8 @@ int main(int argc, char *argv[]) {
printf("client info: %s\n", info);
printf("************ verify shemaless *************\n");
int code = verify_schema_less(taos);
if (code == 0) {
return code;
}
verify_schema_less(taos);
printf("************ verify query *************\n");
verify_query(taos);
......
#include "taos.h"
#include "taoserror.h"
#include "os.h"
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
int numSuperTables = 8;
int numChildTables = 1024;
int numRowsPerChildTable = 128;
void shuffle(char**lines, size_t n)
{
if (n > 1)
{
size_t i;
for (i = 0; i < n - 1; i++)
{
size_t j = i + rand() / (RAND_MAX / (n - i) + 1);
char* t = lines[j];
lines[j] = lines[i];
lines[i] = t;
}
}
}
static int64_t getTimeInUs() {
struct timeval systemTime;
gettimeofday(&systemTime, NULL);
return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec;
}
int main(int argc, char* argv[]) {
TAOS_RES *result;
const char* host = "127.0.0.1";
const char* user = "root";
const char* passwd = "taosdata";
taos_options(TSDB_OPTION_TIMEZONE, "GMT-8");
TAOS* taos = taos_connect(host, user, passwd, "", 0);
if (taos == NULL) {
printf("\033[31mfailed to connect to db, reason:%s\033[0m\n", taos_errstr(taos));
exit(1);
}
char* info = taos_get_server_info(taos);
printf("server info: %s\n", info);
info = taos_get_client_info(taos);
printf("client info: %s\n", info);
result = taos_query(taos, "drop database if exists db;");
taos_free_result(result);
usleep(100000);
result = taos_query(taos, "create database db precision 'ms';");
taos_free_result(result);
usleep(100000);
(void)taos_select_db(taos, "db");
time_t ct = time(0);
int64_t ts = ct * 1000;
char* lineFormat = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms";
char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*));
int l = 0;
for (int i = 0; i < numSuperTables; ++i) {
for (int j = 0; j < numChildTables; ++j) {
for (int k = 0; k < numRowsPerChildTable; ++k) {
char* line = calloc(512, 1);
snprintf(line, 512, lineFormat, i, j, ts + 10 * l);
lines[l] = line;
++l;
}
}
}
shuffle(lines, numSuperTables * numChildTables * numRowsPerChildTable);
printf("%s\n", "begin taos_insert_lines");
int64_t begin = getTimeInUs();
int32_t code = taos_insert_lines(taos, lines, numSuperTables * numChildTables * numRowsPerChildTable);
int64_t end = getTimeInUs();
printf("code: %d, %s. time used: %"PRId64"\n", code, tstrerror(code), end-begin);
char* lines_000_0[] = {
"sta1,id=sta1_1,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=255u8,t6=32770u16,t7=2147483699u32,t8=9223372036854775899u64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639000us"
};
code = taos_insert_lines(taos, lines_000_0 , sizeof(lines_000_0)/sizeof(char*));
if (0 == code) {
printf("taos_insert_lines() lines_000_0 should return error\n");
return -1;
}
char* lines_000_1[] = {
"sta2,id=\"sta2_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=255u8,t6=32770u16,t7=2147483699u32,t8=9223372036854775899u64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639001"
};
code = taos_insert_lines(taos, lines_000_1 , sizeof(lines_000_1)/sizeof(char*));
if (0 == code) {
printf("taos_insert_lines() lines_000_1 should return error\n");
return -1;
}
char* lines_000_2[] = {
"sta3,id=\"sta3_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 0"
};
code = taos_insert_lines(taos, lines_000_2 , sizeof(lines_000_2)/sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_000_2 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_001_0[] = {
"sta4,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639000us",
};
code = taos_insert_lines(taos, lines_001_0 , sizeof(lines_001_0)/sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_001_0 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_001_1[] = {
"sta5,id=\"sta5_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639001"
};
code = taos_insert_lines(taos, lines_001_1 , sizeof(lines_001_1)/sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_001_1 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_001_2[] = {
"sta6,id=\"sta6_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 0"
};
code = taos_insert_lines(taos, lines_001_2 , sizeof(lines_001_2)/sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_001_2 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_002[] = {
"stb,id=\"stb_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639000000ns",
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639019us",
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833640ms",
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006834s"
};
code = taos_insert_lines(taos, lines_002 , sizeof(lines_002)/sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_002 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
return 0;
}
......@@ -27,6 +27,7 @@ python3 ./test.py -f insert/bug3654.py
python3 ./test.py -f insert/insertDynamicColBeforeVal.py
python3 ./test.py -f insert/in_function.py
python3 ./test.py -f insert/modify_column.py
python3 ./test.py -f insert/line_insert.py
#table
python3 ./test.py -f table/alter_wal0.py
......
###################################################################
# Copyright (c) 2021 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self._conn = conn
def run(self):
print("running {}".format(__file__))
tdSql.execute("drop database if exists test")
tdSql.execute("create database if not exists test precision 'us'")
tdSql.execute('use test')
tdSql.execute('create stable ste(ts timestamp, f int) tags(t1 bigint)')
lines = [ "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns",
"ste,t2=5f64,t3=L\"ste\" c1=true,c2=4i64,c3=\"iam\" 1626056811823316532ns",
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns",
"st,t1=4i64,t2=5f64,t3=\"t4\" c1=3i64,c3=L\"passitagain\",c2=true,c4=5f64 1626006833642000000ns",
"ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false 1626056811843316532ns",
"ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false,c5=32i8,c6=64i16,c7=32i32,c8=88.88f32 1626056812843316532ns",
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns",
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641000000ns"
]
code = self._conn.insertLines(lines)
print("insertLines result {}".format(code))
lines2 = [ "stg,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"stg,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns"
]
code = self._conn.insertLines([ lines2[0] ])
print("insertLines result {}".format(code))
self._conn.insertLines([ lines2[1] ])
print("insertLines result {}".format(code))
tdSql.query("select * from st")
tdSql.checkRows(4)
tdSql.query("select * from ste")
tdSql.checkRows(3)
tdSql.query("select * from stf")
tdSql.checkRows(2)
tdSql.query("select * from stg")
tdSql.checkRows(2)
tdSql.query("show tables")
tdSql.checkRows(8)
tdSql.query("describe stf")
tdSql.checkData(2, 2, 14)
self._conn.insertLines([
"sth,t1=4i64,t2=5f64,t4=5f64,ID=\"childtable\" c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641ms",
"sth,t1=4i64,t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933654ms"
])
tdSql.query('select tbname, * from sth')
tdSql.checkRows(2)
tdSql.query('select tbname, * from childtable')
tdSql.checkRows(1)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
......@@ -105,6 +105,7 @@ run general/parser/import_commit2.sim
run general/parser/import_commit3.sim
run general/parser/insert_tb.sim
run general/parser/first_last.sim
run general/parser/line_insert.sim
#unsupport run general/parser/import_file.sim
run general/parser/lastrow.sim
run general/parser/nchar.sim
......
......@@ -16,11 +16,10 @@ sql create database $db precision 'us'
sql use $db
sql create stable $mte (ts timestamp, f int) TAGS(t1 bigint)
line_insert st,t1=3i,t2=4,t3="t3" c1=3i,c3=L"passit",c2=false,c4=4 1626006833639000000
line_insert st,t1=4i,t3="t41",t2=5 c1=3i,c3=L"passiT",c2=true,c4=5 1626006833640000000
line_insert stf,t1=4i,t2=5,t3="t4" c1=3i,c3=L"passitagain",c2=true,c4=5 1626006833642000000
line_insert ste,t2=5,t3=L"ste" c1=true,c2=4,c3="iam" 1626056811823316532
line_insert st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000ns
line_insert st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin",c2=true,c4=5f64,c5=5f64 1626006833640000000ns
line_insert ste,t2=5f64,t3=L"ste" c1=true,c2=4i64,c3="iam" 1626056811823316532ns
line_insert stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns
sql select * from st
if $rows != 2 then
return -1
......@@ -30,7 +29,7 @@ if $data00 != @21-07-11 20:33:53.639000@ then
return -1
endi
if $data03 != @passit@ then
if $data02 != @passit@ then
return -1
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册