diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 5b5224e92eed6c18409d063222d9a6220ad6dcdf..83ec28898cde23256f1048b1f2eba55b5f22fd33 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -412,6 +412,7 @@ int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* s int32_t tscValidateSqlInfo(SSqlObj *pSql, struct SSqlInfo *pInfo); +int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows); extern int32_t sentinel; extern SHashObj *tscVgroupMap; extern SHashObj *tscTableMetaInfo; diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c index 379cf86301f6d52f202cb0fc4916db33d8d5896c..c9b00800e6f3fe950895279d17ca5b7fbb587be5 100644 --- a/src/client/src/TSDBJNIConnector.c +++ b/src/client/src/TSDBJNIConnector.c @@ -946,3 +946,34 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsI return JNI_SUCCESS; } + +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(JNIEnv *env, jobject jobj, + jobjectArray lines, jlong conn) { + TAOS *taos = (TAOS *)conn; + if (taos == NULL) { + jniError("jobj:%p, connection already closed", jobj); + return JNI_CONNECTION_NULL; + } + + int numLines = (*env)->GetArrayLength(env, lines); + char** c_lines = calloc(numLines, sizeof(char*)); + + for (int i = 0; i < numLines; ++i) { + jstring line = (jstring) ((*env)->GetObjectArrayElement(env, lines, i)); + c_lines[i] = (char*)(*env)->GetStringUTFChars(env, line, 0); + } + + int code = taos_insert_lines(taos, c_lines, numLines); + + for (int i = 0; i < numLines; ++i) { + jstring line = (jstring) ((*env)->GetObjectArrayElement(env, lines, i)); + (*env)->ReleaseStringUTFChars(env, line, c_lines[i]); + } + + if (code != TSDB_CODE_SUCCESS) { + jniError("jobj:%p, conn:%p, code:%s", jobj, taos, tstrerror(code)); + return JNI_TDENGINE_ERROR; + } + + return code; +} \ No newline at end of file diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 37264e8eaa6b22444af893f12205d85dec8ad795..ce76f5d82c6c1deed77c7c61cf10712c3652875f 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -17,6 +17,7 @@ #include "tscLog.h" #include "taos.h" + typedef struct { char sTableName[TSDB_TABLE_NAME_LEN]; SHashObj* tagHash; @@ -33,7 +34,7 @@ typedef struct { char* value; //=================================== - SSchema* schema; + size_t fieldSchemaIdx; } TAOS_SML_KV; typedef struct { @@ -48,9 +49,17 @@ typedef struct { int fieldNum; //================================ - SSmlSTableSchema* schema; + size_t schemaIdx; } TAOS_SML_DATA_POINT; +typedef enum { + SML_TIME_STAMP_NOW, + SML_TIME_STAMP_SECONDS, + SML_TIME_STAMP_MILLI_SECONDS, + SML_TIME_STAMP_MICRO_SECONDS, + SML_TIME_STAMP_NANO_SECONDS +} SMLTimeStampType; + //================================================================================================= int compareSmlColKv(const void* p1, const void* p2) { @@ -117,10 +126,12 @@ static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) { static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array) { SSchema* pField = NULL; - SSchema** ppField = taosHashGet(hash, smlKv->key, strlen(smlKv->key)); + size_t* pFieldIdx = taosHashGet(hash, smlKv->key, strlen(smlKv->key)); + size_t fieldIdx = -1; int32_t code = 0; - if (ppField) { - pField = *ppField; + if (pFieldIdx) { + fieldIdx = *pFieldIdx; + pField = taosArrayGet(array, fieldIdx); if (pField->type != smlKv->type) { tscError("type mismatch. key %s, type %d. type before %d", smlKv->key, smlKv->type, pField->type); @@ -149,10 +160,11 @@ static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* arra field.bytes = bytes; pField = taosArrayPush(array, &field); - taosHashPut(hash, field.name, tagKeyLen, &pField, POINTER_BYTES); + fieldIdx = taosArrayGetSize(array) - 1; + taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx)); } - smlKv->schema = pField; + smlKv->fieldSchemaIdx = fieldIdx; return 0; } @@ -165,10 +177,12 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, for (int i = 0; i < numPoint; ++i) { TAOS_SML_DATA_POINT* point = &points[i]; size_t stableNameLen = strlen(point->stableName); - SSmlSTableSchema** ppStableSchema = taosHashGet(sname2shema, point->stableName, stableNameLen); + size_t* pStableIdx = taosHashGet(sname2shema, point->stableName, stableNameLen); SSmlSTableSchema* pStableSchema = NULL; - if (ppStableSchema) { - pStableSchema= *ppStableSchema; + size_t stableIdx = -1; + if (pStableIdx) { + pStableSchema= taosArrayGet(stableSchemas, *pStableIdx); + stableIdx = *pStableIdx; } else { SSmlSTableSchema schema; strncpy(schema.sTableName, point->stableName, stableNameLen); @@ -179,7 +193,8 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); pStableSchema = taosArrayPush(stableSchemas, &schema); - taosHashPut(sname2shema, schema.sTableName, stableNameLen, &pStableSchema, POINTER_BYTES); + stableIdx = taosArrayGetSize(stableSchemas) - 1; + taosHashPut(sname2shema, schema.sTableName, stableNameLen, &stableIdx, sizeof(size_t)); } for (int j = 0; j < point->tagNum; ++j) { @@ -200,7 +215,7 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, } } - point->schema = pStableSchema; + point->schemaIdx = stableIdx; } size_t numStables = taosArrayGetSize(stableSchemas); @@ -221,11 +236,12 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, return 0; } -static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, bool isTag, char sTableName[], +static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[], SSchemaAction* action, bool* actionNeeded) { - SSchema** ppDbAttr = taosHashGet(dbAttrHash, pointColField->name, strlen(pointColField->name)); - if (ppDbAttr) { - SSchema* dbAttr = *ppDbAttr; + size_t* pDbIndex = taosHashGet(dbAttrHash, pointColField->name, strlen(pointColField->name)); + if (pDbIndex) { + SSchema* dbAttr = taosArrayGet(dbAttrArray, *pDbIndex); + assert(strcasecmp(dbAttr->name, pointColField->name) == 0); if (pointColField->type != dbAttr->type) { tscError("point type and db type mismatch. key: %s. point type: %d, db type: %d", pointColField->name, pointColField->type, dbAttr->type); @@ -437,8 +453,9 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)+1); field.type = tableMeta->schema[i].type; field.bytes = tableMeta->schema[i].bytes; - SSchema* pField = taosArrayPush(schema->fields, &field); - taosHashPut(schema->fieldHash, field.name, strlen(field.name), &pField, POINTER_BYTES); + taosArrayPush(schema->fields, &field); + size_t fieldIndex = taosArrayGetSize(schema->fields) - 1; + taosHashPut(schema->fieldHash, field.name, strlen(field.name), &fieldIndex, sizeof(fieldIndex)); } for (int i=0; itableInfo.numOfTags; ++i) { @@ -447,8 +464,9 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)+1); field.type = tableMeta->schema[j].type; field.bytes = tableMeta->schema[j].bytes; - SSchema* pField = taosArrayPush(schema->tags, &field); - taosHashPut(schema->tagHash, field.name, strlen(field.name), &pField, POINTER_BYTES); + taosArrayPush(schema->tags, &field); + size_t tagIndex = taosArrayGetSize(schema->tags) - 1; + taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex)); } tscDebug("load table meta succeed. %s, columns number: %d, tag number: %d, precision: %d", tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision); @@ -476,6 +494,7 @@ static int32_t reconcileDBSchemas(TAOS* taos, SArray* stableSchemas) { code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema); if (code != 0) { tscError("reconcile point schema failed. can not create %s", pointSchema->sTableName); + return code; } else { pointSchema->precision = dbSchema.precision; destroySmlSTableSchema(&dbSchema); @@ -491,7 +510,7 @@ static int32_t reconcileDBSchemas(TAOS* taos, SArray* stableSchemas) { SSchema* pointTag = taosArrayGet(pointSchema->tags, j); SSchemaAction schemaAction = {0}; bool actionNeeded = false; - generateSchemaAction(pointTag, dbTagHash, true, pointSchema->sTableName, &schemaAction, &actionNeeded); + generateSchemaAction(pointTag, dbTagHash, dbSchema.tags, true, pointSchema->sTableName, &schemaAction, &actionNeeded); if (actionNeeded) { applySchemaAction(taos, &schemaAction); } @@ -505,7 +524,7 @@ static int32_t reconcileDBSchemas(TAOS* taos, SArray* stableSchemas) { SSchema* pointCol = taosArrayGet(pointSchema->fields, j); SSchemaAction schemaAction = {0}; bool actionNeeded = false; - generateSchemaAction(pointCol, dbFieldHash, false, pointSchema->sTableName, &schemaAction, &actionNeeded); + generateSchemaAction(pointCol, dbFieldHash, dbSchema.fields,false, pointSchema->sTableName, &schemaAction, &actionNeeded); if (actionNeeded) { applySchemaAction(taos, &schemaAction); } @@ -522,7 +541,8 @@ static int32_t reconcileDBSchemas(TAOS* taos, SArray* stableSchemas) { return 0; } -static int32_t getChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) { +static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) { + tscDebug("taos_sml_insert get child table name through md5"); qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv); SStringBuilder sb; memset(&sb, 0, sizeof(sb)); @@ -592,8 +612,12 @@ static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, co return code; } - taos_stmt_close(stmt); - return 0; + code = taos_stmt_close(stmt); + if (code != 0) { + tscError("%s", taos_stmt_errstr(stmt)); + return code; + } + return code; } static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* rowsBind) { @@ -619,7 +643,7 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols int32_t try = 0; TAOS_STMT* stmt = taos_stmt_init(taos); - + code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql)); if (code != 0) { tscError("%s", taos_stmt_errstr(stmt)); @@ -665,23 +689,26 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols return code; } -static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints, SHashObj* cname2points) { +static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints, + SHashObj* cname2points, SArray* stableSchemas) { for (int32_t i = 0; i < numPoints; ++i) { TAOS_SML_DATA_POINT * point = points + i; if (!point->childTableName) { char childTableName[TSDB_TABLE_NAME_LEN]; int32_t tableNameLen = TSDB_TABLE_NAME_LEN; - getChildTableName(point, childTableName, &tableNameLen); + getSmlMd5ChildTableName(point, childTableName, &tableNameLen); point->childTableName = calloc(1, tableNameLen+1); strncpy(point->childTableName, childTableName, tableNameLen); point->childTableName[tableNameLen] = '\0'; } + SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx); + for (int j = 0; j < point->tagNum; ++j) { TAOS_SML_KV* kv = point->tags + j; if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) { int64_t ts = *(int64_t*)(kv->value); - ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, point->schema->precision); + ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision); *(int64_t*)(kv->value) = ts; } } @@ -690,7 +717,7 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu TAOS_SML_KV* kv = point->fields + j; if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) { int64_t ts = *(int64_t*)(kv->value); - ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, point->schema->precision); + ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision); *(int64_t*)(kv->value) = ts; } } @@ -709,10 +736,12 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu return 0; } -static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) { +static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas) { + int32_t code = TSDB_CODE_SUCCESS; + SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - arrangePointsByChildTableName(points, numPoints, cname2points); + arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas); int isNullColBind = TSDB_TRUE; SArray** pCTablePoints = taosHashIterate(cname2points, NULL); @@ -720,8 +749,9 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num SArray* cTablePoints = *pCTablePoints; TAOS_SML_DATA_POINT * point = taosArrayGetP(cTablePoints, 0); - size_t numTags = taosArrayGetSize(point->schema->tags); - size_t numCols = taosArrayGetSize(point->schema->fields); + SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); + size_t numTags = taosArrayGetSize(sTableSchema->tags); + size_t numCols = taosArrayGetSize(sTableSchema->fields); SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); taosArraySetSize(tagBinds, numTags); @@ -731,8 +761,7 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num } for (int j = 0; j < point->tagNum; ++j) { TAOS_SML_KV* kv = point->tags + j; - size_t idx = TARRAY_ELEM_IDX(point->schema->tags, kv->schema); - TAOS_BIND* bind = taosArrayGet(tagBinds, idx); + TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx); bind->buffer_type = kv->type; bind->length = malloc(sizeof(uintptr_t*)); *bind->length = kv->length; @@ -747,14 +776,17 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num point = taosArrayGetP(cTablePoints, i); TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND)); + if (colBinds == NULL) { + tscError("taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, " + "num of rows: %zu, num of cols: %zu", rows, numCols); + } for (int j = 0; j < numCols; ++j) { TAOS_BIND* bind = colBinds + j; bind->is_null = &isNullColBind; } for (int j = 0; j < point->fieldNum; ++j) { TAOS_SML_KV* kv = point->fields + j; - size_t idx = TARRAY_ELEM_IDX(point->schema->fields, kv->schema); - TAOS_BIND* bind = colBinds + idx; + TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx; bind->buffer_type = kv->type; bind->length = malloc(sizeof(uintptr_t*)); *bind->length = kv->length; @@ -764,14 +796,21 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num taosArrayPush(rowsBind, &colBinds); } - creatChildTableIfNotExists(taos, point->childTableName, point->stableName, point->schema->tags, tagBinds); + code = creatChildTableIfNotExists(taos, point->childTableName, point->stableName, sTableSchema->tags, tagBinds); + if (code == 0) { + code = insertChildTableBatch(taos, point->childTableName, sTableSchema->fields, rowsBind); + if (code != 0) { + tscError("insert into child table %s failed. error %s", point->childTableName, tstrerror(code)); + } + } else { + tscError("Create Child Table %s failed, error %s", point->childTableName, tstrerror(code)); + } + for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) { TAOS_BIND* bind = taosArrayGet(tagBinds, i); free(bind->length); } taosArrayDestroy(tagBinds); - - insertChildTableBatch(taos, point->childTableName, point->schema->fields, rowsBind); for (int i = 0; i < rows; ++i) { TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i); for (int j = 0; j < numCols; ++j) { @@ -782,12 +821,14 @@ static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t num } taosArrayDestroy(rowsBind); taosArrayDestroy(cTablePoints); - + if (code != 0) { + break; + } pCTablePoints = taosHashIterate(cname2points, pCTablePoints); } taosHashCleanup(cname2points); - return 0; + return code; } int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { @@ -808,7 +849,7 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { goto clean_up; } - code = insertPoints(taos, points, numPoint); + code = insertPoints(taos, points, numPoint, stableSchemas); if (code != 0) { tscError("error insert points : %s", tstrerror(code)); } @@ -825,305 +866,886 @@ clean_up: //========================================================================= -typedef enum { - LP_ITEM_TAG, - LP_ITEM_FIELD -} LPItemKind; +/* Field Escape charaters + 1: measurement Comma,Space + 2: tag_key, tag_value, field_key Comma,Equal Sign,Space + 3: field_value Double quote,Backslash +*/ +static void escapeSpecialCharacter(uint8_t field, const char **pos) { + const char *cur = *pos; + if (*cur != '\\') { + return; + } + switch (field) { + case 1: + switch (*(cur + 1)) { + case ',': + case ' ': + cur++; + break; + default: + break; + } + break; + case 2: + switch (*(cur + 1)) { + case ',': + case ' ': + case '=': + cur++; + break; + default: + break; + } + break; + case 3: + switch (*(cur + 1)) { + case '"': + case '\\': + cur++; + break; + default: + break; + } + break; + default: + break; + } + *pos = cur; +} -typedef struct { - SStrToken keyToken; - SStrToken valueToken; +static bool isValidInteger(char *str) { + char *c = str; + if (*c != '+' && *c != '-' && !isdigit(*c)) { + return false; + } + c++; + while (*c != '\0') { + if (!isdigit(*c)) { + return false; + } + c++; + } + return true; +} - char key[TSDB_COL_NAME_LEN]; - int8_t type; - int16_t length; +static bool isValidFloat(char *str) { + char *c = str; + uint8_t has_dot, has_exp, has_sign; + has_dot = 0; + has_exp = 0; + has_sign = 0; - char* value; -}SLPItem; + if (*c != '+' && *c != '-' && *c != '.' && !isdigit(*c)) { + return false; + } + if (*c == '.' && isdigit(*(c + 1))) { + has_dot = 1; + } + c++; + while (*c != '\0') { + if (!isdigit(*c)) { + switch (*c) { + case '.': { + if (!has_dot && !has_exp && isdigit(*(c + 1))) { + has_dot = 1; + } else { + return false; + } + break; + } + case 'e': + case 'E': { + if (!has_exp && isdigit(*(c - 1)) && + (isdigit(*(c + 1)) || + *(c + 1) == '+' || + *(c + 1) == '-')) { + has_exp = 1; + } else { + return false; + } + break; + } + case '+': + case '-': { + if (!has_sign && has_exp && isdigit(*(c + 1))) { + has_sign = 1; + } else { + return false; + } + break; + } + default: { + return false; + } + } + } + c++; + } //while + return true; +} -typedef struct { - SStrToken measToken; - SStrToken tsToken; +static bool isTinyInt(char *pVal, uint16_t len) { + if (len <= 2) { + return false; + } + if (!strcmp(&pVal[len - 2], "i8")) { + //printf("Type is int8(%s)\n", pVal); + return true; + } + return false; +} - char sTableName[TSDB_TABLE_NAME_LEN]; - SArray* tags; - SArray* fields; - int64_t ts; +static bool isTinyUint(char *pVal, uint16_t len) { + if (len <= 2) { + return false; + } + if (pVal[0] == '-') { + return false; + } + if (!strcmp(&pVal[len - 2], "u8")) { + //printf("Type is uint8(%s)\n", pVal); + return true; + } + return false; +} -} SLPPoint; +static bool isSmallInt(char *pVal, uint16_t len) { + if (len <= 3) { + return false; + } + if (!strcmp(&pVal[len - 3], "i16")) { + //printf("Type is int16(%s)\n", pVal); + return true; + } + return false; +} -typedef enum { - LP_MEASUREMENT, - LP_TAG_KEY, - LP_TAG_VALUE, - LP_FIELD_KEY, - LP_FIELD_VALUE -} LPPart; +static bool isSmallUint(char *pVal, uint16_t len) { + if (len <= 3) { + return false; + } + if (pVal[0] == '-') { + return false; + } + if (strcmp(&pVal[len - 3], "u16") == 0) { + //printf("Type is uint16(%s)\n", pVal); + return true; + } + return false; +} -int32_t scanToCommaOrSpace(SStrToken s, int32_t start, int32_t* index, LPPart part) { - for (int32_t i = start; i < s.n; ++i) { - if (s.z[i] == ',' || s.z[i] == ' ') { - *index = i; - return 0; - } +static bool isInt(char *pVal, uint16_t len) { + if (len <= 3) { + return false; + } + if (strcmp(&pVal[len - 3], "i32") == 0) { + //printf("Type is int32(%s)\n", pVal); + return true; } - return -1; + return false; } -int32_t scanToEqual(SStrToken s, int32_t start, int32_t* index) { - for (int32_t i = start; i < s.n; ++i) { - if (s.z[i] == '=') { - *index = i; - return 0; - } +static bool isUint(char *pVal, uint16_t len) { + if (len <= 3) { + return false; + } + if (pVal[0] == '-') { + return false; } - return -1; + if (strcmp(&pVal[len - 3], "u32") == 0) { + //printf("Type is uint32(%s)\n", pVal); + return true; + } + return false; } -int32_t setPointMeasurement(SLPPoint* point, SStrToken token) { - point->measToken = token; - if (point->measToken.n < TSDB_TABLE_NAME_LEN) { - strncpy(point->sTableName, point->measToken.z, point->measToken.n); - point->sTableName[point->measToken.n] = '\0'; +static bool isBigInt(char *pVal, uint16_t len) { + if (len <= 3) { + return false; } - return 0; + if (strcmp(&pVal[len - 3], "i64") == 0) { + //printf("Type is int64(%s)\n", pVal); + return true; + } + return false; } -int32_t setItemKey(SLPItem* item, SStrToken key, LPPart part) { - item->keyToken = key; - if (item->keyToken.n < TSDB_COL_NAME_LEN) { - strncpy(item->key, item->keyToken.z, item->keyToken.n); - item->key[item->keyToken.n] = '\0'; +static bool isBigUint(char *pVal, uint16_t len) { + if (len <= 3) { + return false; } - return 0; + if (pVal[0] == '-') { + return false; + } + if (strcmp(&pVal[len - 3], "u64") == 0) { + //printf("Type is uint64(%s)\n", pVal); + return true; + } + return false; } -int32_t setItemValue(SLPItem* item, SStrToken value, LPPart part) { - item->valueToken = value; - return 0; +static bool isFloat(char *pVal, uint16_t len) { + if (len <= 3) { + return false; + } + if (strcmp(&pVal[len - 3], "f32") == 0) { + //printf("Type is float(%s)\n", pVal); + return true; + } + return false; } -int32_t parseItemValue(SLPItem* item, LPItemKind kind) { - char* sv = item->valueToken.z; - char* last = item->valueToken.z + item->valueToken.n - 1; - - if (isdigit(sv[0]) || sv[0] == '-') { - if (*last == 'i') { - item->type = TSDB_DATA_TYPE_BIGINT; - item->length = (int16_t)tDataTypes[item->type].bytes; - item->value = malloc(item->length); - char* endptr = NULL; - *(int64_t*)(item->value) = strtoll(sv, &endptr, 10); - } else if (*last == 'u') { - item->type = TSDB_DATA_TYPE_UBIGINT; - item->length = (int16_t)tDataTypes[item->type].bytes; - item->value = malloc(item->length); - char* endptr = NULL; - *(uint64_t*)(item->value) = (uint64_t)strtoull(sv, &endptr, 10); - } else if (*last == 'b') { - item->type = TSDB_DATA_TYPE_TINYINT; - item->length = (int16_t)tDataTypes[item->type].bytes; - item->value = malloc(item->length); - char* endptr = NULL; - *(int8_t*)(item->value) = (int8_t)strtoll(sv, &endptr, 10); - } else if (*last == 's') { - item->type = TSDB_DATA_TYPE_SMALLINT; - item->length = (int16_t)tDataTypes[item->type].bytes; - item->value = malloc(item->length); - char* endptr = NULL; - *(int16_t*)(item->value) = (int16_t)strtoll(sv, &endptr, 10); - } else if (*last == 'w') { - item->type = TSDB_DATA_TYPE_INT; - item->length = (int16_t)tDataTypes[item->type].bytes; - item->value = malloc(item->length); - char* endptr = NULL; - *(int32_t*)(item->value) = (int32_t)strtoll(sv, &endptr, 10); - } else if (*last == 'f') { - item->type = TSDB_DATA_TYPE_FLOAT; - item->length = (int16_t)tDataTypes[item->type].bytes; - item->value = malloc(item->length); - char* endptr = NULL; - *(float*)(item->value) = (float)strtold(sv, &endptr); - } else { - item->type = TSDB_DATA_TYPE_DOUBLE; - item->length = (int16_t)tDataTypes[item->type].bytes; - item->value = malloc(item->length); - char* endptr = NULL; - *(double*)(item->value) = strtold(sv, &endptr); - } - } else if ((sv[0] == 'L' && sv[1] =='"') || sv[0] == '"' ) { - if (sv[0] == 'L') { - item->type = TSDB_DATA_TYPE_NCHAR; - uint32_t bytes = item->valueToken.n - 3; - item->length = bytes; - item->value = malloc(bytes); - memcpy(item->value, sv+2, bytes); - } else if (sv[0]=='"'){ - item->type = TSDB_DATA_TYPE_BINARY; - uint32_t bytes = item->valueToken.n - 2; - item->length = bytes; - item->value = malloc(bytes); - memcpy(item->value, sv+1, bytes); - } - } else if (sv[0] == 't' || sv[0] == 'f' || sv[0]=='T' || sv[0] == 'F') { - item->type = TSDB_DATA_TYPE_BOOL; - item->length = tDataTypes[item->type].bytes; - item->value = malloc(tDataTypes[item->type].bytes); - *(uint8_t*)(item->value) = tolower(sv[0])=='t' ? TSDB_TRUE : TSDB_FALSE; +static bool isDouble(char *pVal, uint16_t len) { + if (len <= 3) { + return false; } - return 0; + if (strcmp(&pVal[len - 3], "f64") == 0) { + //printf("Type is double(%s)\n", pVal); + return true; + } + return false; } -int32_t compareLPItemKey(const void* p1, const void* p2) { - const SLPItem* t1 = p1; - const SLPItem* t2 = p2; - uint32_t min = (t1->keyToken.n < t2->keyToken.n) ? t1->keyToken.n : t2->keyToken.n; - int res = strncmp(t1->keyToken.z, t2->keyToken.z, min); - if (res != 0) { - return res; - } else { - return (int)(t1->keyToken.n) - (int)(t2->keyToken.n); +static bool isBool(char *pVal, uint16_t len, bool *bVal) { + if ((len == 1) && + (pVal[len - 1] == 't' || + pVal[len - 1] == 'T')) { + //printf("Type is bool(%c)\n", pVal[len - 1]); + *bVal = true; + return true; + } + + if ((len == 1) && + (pVal[len - 1] == 'f' || + pVal[len - 1] == 'F')) { + //printf("Type is bool(%c)\n", pVal[len - 1]); + *bVal = false; + return true; + } + + if((len == 4) && + (!strcmp(&pVal[len - 4], "true") || + !strcmp(&pVal[len - 4], "True") || + !strcmp(&pVal[len - 4], "TRUE"))) { + //printf("Type is bool(%s)\n", &pVal[len - 4]); + *bVal = true; + return true; + } + if((len == 5) && + (!strcmp(&pVal[len - 5], "false") || + !strcmp(&pVal[len - 5], "False") || + !strcmp(&pVal[len - 5], "FALSE"))) { + //printf("Type is bool(%s)\n", &pVal[len - 5]); + *bVal = false; + return true; } + return false; } -int32_t setPointTimeStamp(SLPPoint* point, SStrToken tsToken) { - point->tsToken = tsToken; - return 0; +static bool isBinary(char *pVal, uint16_t len) { + //binary: "abc" + if (len < 2) { + return false; + } + //binary + if (pVal[0] == '"' && pVal[len - 1] == '"') { + //printf("Type is binary(%s)\n", pVal); + return true; + } + return false; +} + +static bool isNchar(char *pVal, uint16_t len) { + //nchar: L"abc" + if (len < 3) { + return false; + } + if (pVal[0] == 'L' && pVal[1] == '"' && pVal[len - 1] == '"') { + //printf("Type is nchar(%s)\n", pVal); + return true; + } + return false; +} + +static bool isTimeStamp(char *pVal, uint16_t len, SMLTimeStampType *tsType) { + if (len == 0) { + return true; + } + if ((len == 1) && pVal[0] == '0') { + *tsType = SML_TIME_STAMP_NOW; + //printf("Type is timestamp(%s)\n", pVal); + return true; + } + if (len < 2) { + return false; + } + //No appendix use usec as default + if (isdigit(pVal[len - 1]) && isdigit(pVal[len - 2])) { + *tsType = SML_TIME_STAMP_MICRO_SECONDS; + //printf("Type is timestamp(%s)\n", pVal); + return true; + } + if (pVal[len - 1] == 's') { + switch (pVal[len - 2]) { + case 'm': + *tsType = SML_TIME_STAMP_MILLI_SECONDS; + break; + case 'u': + *tsType = SML_TIME_STAMP_MICRO_SECONDS; + break; + case 'n': + *tsType = SML_TIME_STAMP_NANO_SECONDS; + break; + default: + if (isdigit(pVal[len - 2])) { + *tsType = SML_TIME_STAMP_SECONDS; + break; + } else { + return false; + } + } + //printf("Type is timestamp(%s)\n", pVal); + return true; + } + return false; +} + +//len does not include '\0' from value. +static bool convertSmlValueType(TAOS_SML_KV *pVal, char *value, + uint16_t len) { + if (len <= 0) { + return false; + } + + //integer number + if (isTinyInt(value, len)) { + pVal->type = TSDB_DATA_TYPE_TINYINT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + value[len - 2] = '\0'; + if (!isValidInteger(value)) { + return false; + } + pVal->value = calloc(pVal->length, 1); + int8_t val = (int8_t)strtoll(value, NULL, 10); + memcpy(pVal->value, &val, pVal->length); + return true; + } + if (isTinyUint(value, len)) { + pVal->type = TSDB_DATA_TYPE_UTINYINT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + value[len - 2] = '\0'; + if (!isValidInteger(value)) { + return false; + } + pVal->value = calloc(pVal->length, 1); + uint8_t val = (uint8_t)strtoul(value, NULL, 10); + memcpy(pVal->value, &val, pVal->length); + return true; + } + if (isSmallInt(value, len)) { + pVal->type = TSDB_DATA_TYPE_SMALLINT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + value[len - 3] = '\0'; + if (!isValidInteger(value)) { + return false; + } + pVal->value = calloc(pVal->length, 1); + int16_t val = (int16_t)strtoll(value, NULL, 10); + memcpy(pVal->value, &val, pVal->length); + return true; + } + if (isSmallUint(value, len)) { + pVal->type = TSDB_DATA_TYPE_USMALLINT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + value[len - 3] = '\0'; + if (!isValidInteger(value)) { + return false; + } + pVal->value = calloc(pVal->length, 1); + uint16_t val = (uint16_t)strtoul(value, NULL, 10); + memcpy(pVal->value, &val, pVal->length); + //memcpy(pVal->value, &val, pVal->length); + return true; + } + if (isInt(value, len)) { + pVal->type = TSDB_DATA_TYPE_INT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + value[len - 3] = '\0'; + if (!isValidInteger(value)) { + return false; + } + pVal->value = calloc(pVal->length, 1); + int32_t val = (int32_t)strtoll(value, NULL, 10); + memcpy(pVal->value, &val, pVal->length); + return true; + } + if (isUint(value, len)) { + pVal->type = TSDB_DATA_TYPE_UINT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + value[len - 3] = '\0'; + if (!isValidInteger(value)) { + return false; + } + pVal->value = calloc(pVal->length, 1); + uint32_t val = (uint32_t)strtoul(value, NULL, 10); + memcpy(pVal->value, &val, pVal->length); + return true; + } + if (isBigInt(value, len)) { + pVal->type = TSDB_DATA_TYPE_BIGINT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + value[len - 3] = '\0'; + if (!isValidInteger(value)) { + return false; + } + pVal->value = calloc(pVal->length, 1); + int64_t val = (int64_t)strtoll(value, NULL, 10); + memcpy(pVal->value, &val, pVal->length); + return true; + } + if (isBigUint(value, len)) { + pVal->type = TSDB_DATA_TYPE_UBIGINT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + value[len - 3] = '\0'; + if (!isValidInteger(value)) { + return false; + } + pVal->value = calloc(pVal->length, 1); + uint64_t val = (uint64_t)strtoul(value, NULL, 10); + memcpy(pVal->value, &val, pVal->length); + return true; + } + //floating number + if (isFloat(value, len)) { + pVal->type = TSDB_DATA_TYPE_FLOAT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + value[len - 3] = '\0'; + if (!isValidFloat(value)) { + return false; + } + pVal->value = calloc(pVal->length, 1); + float val = (float)strtold(value, NULL); + memcpy(pVal->value, &val, pVal->length); + return true; + } + if (isDouble(value, len)) { + pVal->type = TSDB_DATA_TYPE_DOUBLE; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + value[len - 3] = '\0'; + if (!isValidFloat(value)) { + return false; + } + pVal->value = calloc(pVal->length, 1); + double val = (double)strtold(value, NULL); + memcpy(pVal->value, &val, pVal->length); + return true; + } + //binary + if (isBinary(value, len)) { + pVal->type = TSDB_DATA_TYPE_BINARY; + pVal->length = len - 2; + pVal->value = calloc(pVal->length, 1); + //copy after " + memcpy(pVal->value, value + 1, pVal->length); + return true; + } + //nchar + if (isNchar(value, len)) { + pVal->type = TSDB_DATA_TYPE_NCHAR; + pVal->length = len - 3; + pVal->value = calloc(pVal->length, 1); + //copy after L" + memcpy(pVal->value, value + 2, pVal->length); + return true; + } + //bool + bool bVal; + if (isBool(value, len, &bVal)) { + pVal->type = TSDB_DATA_TYPE_BOOL; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + pVal->value = calloc(pVal->length, 1); + memcpy(pVal->value, &bVal, pVal->length); + return true; + } + //Handle default(no appendix) as float + if (isValidInteger(value) || isValidFloat(value)) { + pVal->type = TSDB_DATA_TYPE_FLOAT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + pVal->value = calloc(pVal->length, 1); + float val = (float)strtold(value, NULL); + memcpy(pVal->value, &val, pVal->length); + return true; + } + return false; } -int32_t parsePointTime(SLPPoint* point) { - if (point->tsToken.n <= 0) { - point->ts = taosGetTimestampNs(); +static int32_t getTimeStampValue(char *value, uint16_t len, + SMLTimeStampType type, int64_t *ts) { + + if (len >= 2) { + for (int i = 0; i < len - 2; ++i) { + if(!isdigit(value[i])) { + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } + } + } + //No appendix or no timestamp given (len = 0) + if (len >= 1 && isdigit(value[len - 1]) && type != SML_TIME_STAMP_NOW) { + type = SML_TIME_STAMP_MICRO_SECONDS; + } + if (len != 0) { + *ts = (int64_t)strtoll(value, NULL, 10); } else { - char* endptr = NULL; - point->ts = strtoll(point->tsToken.z, &endptr, 10); - char* last = point->tsToken.z + point->tsToken.n - 1; - if (*last == 's') { - point->ts *= (int64_t)1e9; - } else if (*last == 'a') { - point->ts *= (int64_t)1e6; - } else if (*last == 'u') { - point->ts *= (int64_t)1e3; - } else if (*last == 'b') { - point->ts *= 1; + type = SML_TIME_STAMP_NOW; + } + switch (type) { + case SML_TIME_STAMP_NOW: { + *ts = taosGetTimestampNs(); + break; + } + case SML_TIME_STAMP_SECONDS: { + *ts = (int64_t)(*ts * 1e9); + break; + } + case SML_TIME_STAMP_MILLI_SECONDS: { + *ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_NANO); + break; + } + case SML_TIME_STAMP_MICRO_SECONDS: { + *ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO); + break; + } + case SML_TIME_STAMP_NANO_SECONDS: { + *ts = *ts * 1; + break; + } + default: { + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; } } - return 0; + return TSDB_CODE_SUCCESS; } -int32_t tscParseLine(SStrToken line, SLPPoint* point) { - int32_t pos = 0; +static int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value, + uint16_t len) { + int32_t ret; + SMLTimeStampType type; + int64_t tsVal; - int32_t start = 0; - int32_t err = scanToCommaOrSpace(line, start, &pos, LP_MEASUREMENT); - if (err != 0) { - tscError("a"); - return err; + if (!isTimeStamp(value, len, &type)) { + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; } - SStrToken measurement = {.z = line.z+start, .n = pos-start}; - setPointMeasurement(point, measurement); - point->tags = taosArrayInit(64, sizeof(SLPItem)); - start = pos; - while (line.z[start] == ',') { - SLPItem item; + ret = getTimeStampValue(value, len, type, &tsVal); + if (ret) { + return ret; + } + tscDebug("Timestamp after conversion:%"PRId64, tsVal); - start++; - err = scanToEqual(line, start, &pos); - if (err != 0) { - tscError("b"); - goto error; + pVal->type = TSDB_DATA_TYPE_TIMESTAMP; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + pVal->value = calloc(pVal->length, 1); + memcpy(pVal->value, &tsVal, pVal->length); + return TSDB_CODE_SUCCESS; +} + +static int32_t parseSmlTimeStamp(TAOS_SML_KV **pTS, const char **index) { + const char *start, *cur; + int32_t ret = TSDB_CODE_SUCCESS; + int len = 0; + char key[] = "_ts"; + char *value = NULL; + + start = cur = *index; + *pTS = calloc(1, sizeof(TAOS_SML_KV)); + + while(*cur != '\0') { + cur++; + len++; + } + + if (len > 0) { + value = calloc(len + 1, 1); + memcpy(value, start, len); + } + + ret = convertSmlTimeStamp(*pTS, value, len); + if (ret) { + free(value); + free(*pTS); + return ret; + } + free(value); + + (*pTS)->key = calloc(sizeof(key), 1); + memcpy((*pTS)->key, key, sizeof(key)); + return ret; +} + +static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **index) { + const char *cur = *index; + char key[TSDB_COL_NAME_LEN]; + uint16_t len = 0; + + //key field cannot start with digit + if (isdigit(*cur)) { + tscError("Tag key cannnot start with digit\n"); + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } + while (*cur != '\0') { + if (len > TSDB_COL_NAME_LEN) { + tscDebug("Key field cannot exceeds 65 characters"); + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } + //unescaped '=' identifies a tag key + if (*cur == '=' && *(cur - 1) != '\\') { + break; + } + //Escape special character + if (*cur == '\\') { + escapeSpecialCharacter(2, &cur); } + key[len] = *cur; + cur++; + len++; + } + key[len] = '\0'; + + pKV->key = calloc(len + 1, 1); + memcpy(pKV->key, key, len + 1); + //tscDebug("Key:%s|len:%d", pKV->key, len); + *index = cur + 1; + return TSDB_CODE_SUCCESS; +} - SStrToken tagKey = {.z = line.z + start, .n = pos-start}; - setItemKey(&item, tagKey, LP_TAG_KEY); - start = pos + 1; - err = scanToCommaOrSpace(line, start, &pos, LP_TAG_VALUE); - if (err != 0) { - tscError("c"); - goto error; +static bool parseSmlValue(TAOS_SML_KV *pKV, const char **index, + bool *is_last_kv) { + const char *start, *cur; + char *value = NULL; + uint16_t len = 0; + start = cur = *index; + + while (1) { + // unescaped ',' or ' ' or '\0' identifies a value + if ((*cur == ',' || *cur == ' ' || *cur == '\0') && *(cur - 1) != '\\') { + //unescaped ' ' or '\0' indicates end of value + *is_last_kv = (*cur == ' ' || *cur == '\0') ? true : false; + break; } + //Escape special character + if (*cur == '\\') { + escapeSpecialCharacter(2, &cur); + } + cur++; + len++; + } - SStrToken tagValue = {.z = line.z + start, .n = pos-start}; - setItemValue(&item, tagValue, LP_TAG_VALUE); + value = calloc(len + 1, 1); + memcpy(value, start, len); + value[len] = '\0'; + if (!convertSmlValueType(pKV, value, len)) { + //free previous alocated key field + free(pKV->key); + free(value); + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } + free(value); - parseItemValue(&item, LP_ITEM_TAG); - taosArrayPush(point->tags, &item); + *index = (*cur == '\0') ? cur : cur + 1; + return TSDB_CODE_SUCCESS; +} - start = pos; +static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **index, + uint8_t *has_tags) { + const char *cur = *index; + uint16_t len = 0; + + pSml->stableName = calloc(TSDB_TABLE_NAME_LEN, 1); + if (isdigit(*cur)) { + tscError("Measurement field cannnot start with digit"); + free(pSml->stableName); + pSml->stableName = NULL; + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; } - taosArraySort(point->tags, compareLPItemKey); + while (*cur != '\0') { + if (len > TSDB_TABLE_NAME_LEN) { + tscError("Measurement field cannot exceeds 193 characters"); + free(pSml->stableName); + pSml->stableName = NULL; + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } + //first unescaped comma or space identifies measurement + //if space detected first, meaning no tag in the input + if (*cur == ',' && *(cur - 1) != '\\') { + *has_tags = 1; + break; + } + if (*cur == ' ' && *(cur - 1) != '\\') { + break; + } + //Comma, Space, Backslash needs to be escaped if any + if (*cur == '\\') { + escapeSpecialCharacter(1, &cur); + } + pSml->stableName[len] = *cur; + cur++; + len++; + } + pSml->stableName[len] = '\0'; + *index = cur + 1; + tscDebug("Stable name in measurement:%s|len:%d", pSml->stableName, len); - point->fields = taosArrayInit(64, sizeof(SLPItem)); + return TSDB_CODE_SUCCESS; +} - start++; - do { - SLPItem item; +static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs, + const char **index, bool isField, TAOS_SML_DATA_POINT* smlData) { + const char *cur = *index; + int32_t ret = TSDB_CODE_SUCCESS; + TAOS_SML_KV *pkv; + bool is_last_kv = false; + + int32_t capacity = 0; + if (isField) { + capacity = 64; + *pKVs = calloc(capacity, sizeof(TAOS_SML_KV)); + // leave space for timestamp; + pkv = *pKVs; + pkv++; + } else { + capacity = 8; + *pKVs = calloc(capacity, sizeof(TAOS_SML_KV)); + pkv = *pKVs; + } - err = scanToEqual(line, start, &pos); - if (err != 0) { + while (*cur != '\0') { + ret = parseSmlKey(pkv, &cur); + if (ret) { + tscError("Unable to parse key field"); goto error; } - SStrToken fieldKey = {.z = line.z + start, .n = pos- start}; - setItemKey(&item, fieldKey, LP_FIELD_KEY); - - start = pos + 1; - err = scanToCommaOrSpace(line, start, &pos, LP_FIELD_VALUE); - if (err != 0) { + ret = parseSmlValue(pkv, &cur, &is_last_kv); + if (ret) { + tscError("Unable to parse value field"); goto error; } - SStrToken fieldValue = {.z = line.z + start, .n = pos - start}; - setItemValue(&item, fieldValue, LP_TAG_VALUE); - - parseItemValue(&item, LP_ITEM_FIELD); - taosArrayPush(point->fields, &item); - - start = pos + 1; - } while (line.z[pos] == ','); + if (!isField && + (strcasecmp(pkv->key, "ID") == 0) && pkv->type == TSDB_DATA_TYPE_BINARY) { + smlData->childTableName = malloc( pkv->length + 1); + memcpy(smlData->childTableName, pkv->value, pkv->length); + smlData->childTableName[pkv->length] = '\0'; + free(pkv->key); + free(pkv->value); + } else { + *num_kvs += 1; + } + if (is_last_kv) { + //tscDebug("last key-value field detected"); + goto done; + } - taosArraySort(point->fields, compareLPItemKey); + //reallocate addtional memory for more kvs + TAOS_SML_KV *more_kvs = NULL; - SStrToken tsToken = {.z = line.z+start, .n = line.n-start}; - setPointTimeStamp(point, tsToken); - parsePointTime(point); + if (isField) { + if ((*num_kvs + 2) > capacity) { + capacity *= 3; capacity /= 2; + more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV)); + } else { + more_kvs = *pKVs; + } + } else { + if ((*num_kvs + 1) > capacity) { + capacity *= 3; capacity /= 2; + more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV)); + } else { + more_kvs = *pKVs; + } + } + if (!more_kvs) { + goto error; + } + *pKVs = more_kvs; + //move pKV points to next TAOS_SML_KV block + if (isField) { + pkv = *pKVs + *num_kvs + 1; + } else { + pkv = *pKVs + *num_kvs; + } + } goto done; - error: - // free array - return err; - done: - return 0; +error: + return ret; +done: + *index = cur; + return ret; +} + +static void moveTimeStampToFirstKv(TAOS_SML_DATA_POINT** smlData, TAOS_SML_KV *ts) { + TAOS_SML_KV* tsField = (*smlData)->fields; + tsField->length = ts->length; + tsField->type = ts->type; + tsField->value = malloc(ts->length); + tsField->key = malloc(strlen(ts->key) + 1); + memcpy(tsField->key, ts->key, strlen(ts->key) + 1); + memcpy(tsField->value, ts->value, ts->length); + (*smlData)->fieldNum = (*smlData)->fieldNum + 1; + + free(ts->key); + free(ts->value); + free(ts); } +int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData) { + const char* index = sql; + int32_t ret = TSDB_CODE_SUCCESS; + uint8_t has_tags = 0; + TAOS_SML_KV *timestamp = NULL; -int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) { - for (int32_t i = 0; i < numLines; ++i) { - SStrToken tkLine = {.z = lines[i], .n = (uint32_t)strlen(lines[i])}; - SLPPoint point; - tscParseLine(tkLine, &point); - taosArrayPush(points, &point); + ret = parseSmlMeasurement(smlData, &index, &has_tags); + if (ret) { + tscError("Unable to parse measurement"); + return ret; } - return 0; -} + tscDebug("Parse measurement finished, has_tags:%d", has_tags); + + //Parse Tags + if (has_tags) { + ret = parseSmlKvPairs(&smlData->tags, &smlData->tagNum, &index, false, smlData); + if (ret) { + tscError("Unable to parse tag"); + return ret; + } + } + tscDebug("Parse tags finished, num of tags:%d", smlData->tagNum); -void destroyLPPoint(void* p) { - SLPPoint* lpPoint = p; - for (int i=0; ifields); ++i) { - SLPItem* item = taosArrayGet(lpPoint->fields, i); - free(item->value); + //Parse fields + ret = parseSmlKvPairs(&smlData->fields, &smlData->fieldNum, &index, true, smlData); + if (ret) { + tscError("Unable to parse field"); + return ret; } - taosArrayDestroy(lpPoint->fields); + tscDebug("Parse fields finished, num of fields:%d", smlData->fieldNum); - for (int i=0; itags); ++i) { - SLPItem* item = taosArrayGet(lpPoint->tags, i); - free(item->value); + //Parse timestamp + ret = parseSmlTimeStamp(×tamp, &index); + if (ret) { + tscError("Unable to parse timestamp"); + return ret; } - taosArrayDestroy(lpPoint->tags); + moveTimeStampToFirstKv(&smlData, timestamp); + tscDebug("Parse timestamp finished"); + + return TSDB_CODE_SUCCESS; } +//========================================================================= + void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) { for (int i=0; itagNum; ++i) { free((point->tags+i)->key); @@ -1139,76 +1761,67 @@ void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) { free(point->childTableName); } -int taos_insert_lines(TAOS* taos, char* lines[], int numLines) { - SArray* lpPoints = taosArrayInit(numLines, sizeof(SLPPoint)); - tscParseLines(lines, numLines, lpPoints, NULL); - - size_t numPoints = taosArrayGetSize(lpPoints); - TAOS_SML_DATA_POINT* points = calloc(numPoints, sizeof(TAOS_SML_DATA_POINT)); - for (int i = 0; i < numPoints; ++i) { - SLPPoint* lpPoint = taosArrayGet(lpPoints, i); - TAOS_SML_DATA_POINT* point = points+i; - point->stableName = calloc(1, strlen(lpPoint->sTableName)+1); - strncpy(point->stableName, lpPoint->sTableName, strlen(lpPoint->sTableName)); - point->stableName[strlen(lpPoint->sTableName)] = '\0'; - - size_t lpTagSize = taosArrayGetSize(lpPoint->tags); - point->tags = calloc(lpTagSize, sizeof(TAOS_SML_KV)); - point->tagNum = (int)lpTagSize; - for (int j=0; jtags, j); - TAOS_SML_KV* tagKv = point->tags + j; +int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) { + for (int32_t i = 0; i < numLines; ++i) { + TAOS_SML_DATA_POINT point = {0}; + int32_t code = tscParseLine(lines[i], &point); + if (code != TSDB_CODE_SUCCESS) { + tscError("data point line parse failed. line %d : %s", i, lines[i]); + destroySmlDataPoint(&point); + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } else { + tscDebug("data point line parse success. line %d", i); + } - size_t kenLen = strlen(lpTag->key); - tagKv->key = calloc(1, kenLen+1); - strncpy(tagKv->key, lpTag->key, kenLen); - tagKv->key[kenLen] = '\0'; + taosArrayPush(points, &point); + } + return 0; +} - tagKv->type = lpTag->type; - tagKv->length = lpTag->length; - tagKv->value = malloc(tagKv->length); - memcpy(tagKv->value, lpTag->value, tagKv->length); - } +int taos_insert_lines(TAOS* taos, char* lines[], int numLines) { + int32_t code = 0; - size_t lpFieldsSize = taosArrayGetSize(lpPoint->fields); - point->fields = calloc(lpFieldsSize + 1, sizeof(TAOS_SML_KV)); - point->fieldNum = (int)(lpFieldsSize + 1); + if (numLines <= 0 || numLines > 65536) { + tscError("taos_insert_lines numLines should be between 1 and 65536. numLines: %d", numLines); + code = TSDB_CODE_TSC_APP_ERROR; + return code; + } - TAOS_SML_KV* tsField = point->fields + 0; - char tsKey[256]; - snprintf(tsKey, 256, "_%s_ts", point->stableName); - size_t tsKeyLen = strlen(tsKey); - tsField->key = calloc(1, tsKeyLen+1); - strncpy(tsField->key, tsKey, tsKeyLen); - tsField->key[tsKeyLen] = '\0'; - tsField->type = TSDB_DATA_TYPE_TIMESTAMP; - tsField->length = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes; - tsField->value = malloc(tsField->length); - memcpy(tsField->value, &(lpPoint->ts), tsField->length); + for (int i = 0; i < numLines; ++i) { + if (lines[i] == NULL) { + tscError("taos_insert_lines line %d is NULL", i); + code = TSDB_CODE_TSC_APP_ERROR; + return code; + } + } - for (int j=0; jfields, j); - TAOS_SML_KV* fieldKv = point->fields + j + 1; + SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT)); + if (lpPoints == NULL) { + tscError("taos_insert_lines failed to allocate memory"); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } - size_t kenLen = strlen(lpField->key); - fieldKv->key = calloc(1, kenLen+1); - strncpy(fieldKv->key, lpField->key, kenLen); - fieldKv->key[kenLen] = '\0'; + tscDebug("taos_insert_lines begin inserting %d lines, first line: %s", numLines, lines[0]); + code = tscParseLines(lines, numLines, lpPoints, NULL); + size_t numPoints = taosArrayGetSize(lpPoints); - fieldKv->type = lpField->type; - fieldKv->length = lpField->length; - fieldKv->value = malloc(fieldKv->length); - memcpy(fieldKv->value, lpField->value, fieldKv->length); - } + if (code != 0) { + goto cleanup; } - taos_sml_insert(taos, points, (int)numPoints); + TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints); + code = taos_sml_insert(taos, points, (int)numPoints); + if (code != 0) { + tscError("taos_sml_insert error: %s", tstrerror((code))); + } +cleanup: + tscDebug("taos_insert_lines finish inserting %d lines. code: %d", numLines, code); for (int i=0; iprevTS = INT64_MIN; } + tsSetBlockInfo(pBlk, (*t1)->pTableMeta, pBlk->numOfRows); + taosHashPut(pCmd->insertParam.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)t1, POINTER_BYTES); tscDebug("0x%"PRIx64" table:%s is already prepared, uid:%" PRIu64, pSql->self, name, pStmt->mtb.currentUid); diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java index 7f400fc1eeae2efc3d0ab800083969404c50a469..051eca7e10ad18daea6a7b1ad55f148b786e0798 100755 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java @@ -348,4 +348,13 @@ public class TSDBJNIConnector { } private native int closeStmt(long stmt, long con); + + public void insertLines(String[] lines) throws SQLException { + int code = insertLinesImp(lines, this.taos); + if (code != TSDBConstants.JNI_SUCCESS) { + throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to insertLines"); + } + } + + private native int insertLinesImp(String[] lines, long conn); } diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBJNIConnectorTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBJNIConnectorTest.java index 2cbb43d1960317a2d96f99537825e9cfb5d5c07c..88ff5d3a811e17aaabbeb0a451fbff010307ab6d 100644 --- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBJNIConnectorTest.java +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBJNIConnectorTest.java @@ -114,6 +114,10 @@ public class TSDBJNIConnectorTest { throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_RESULT_SET_NULL); } // close statement + connector.executeQuery("use d"); + String[] lines = new String[] {"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns", + "st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns"}; + connector.insertLines(lines); // close connection connector.closeConnection(); diff --git a/src/connector/python/taos/cinterface.py b/src/connector/python/taos/cinterface.py index cc7c279458c779b924564cdb17c88989014e3193..6d8ceb7a293ef71c7e8944772e6b8a6a0ed8e7a9 100644 --- a/src/connector/python/taos/cinterface.py +++ b/src/connector/python/taos/cinterface.py @@ -403,6 +403,20 @@ class CTaosInterface(object): """ return CTaosInterface.libtaos.taos_affected_rows(result) + @staticmethod + def insertLines(connection, lines): + ''' + insert through lines protocol + @lines: list of str + @rtype: tsdb error codes + ''' + numLines = len(lines) + c_lines_type = ctypes.c_char_p*numLines + c_lines = c_lines_type() + for i in range(numLines): + c_lines[i] = ctypes.c_char_p(lines[i].encode('utf-8')) + return CTaosInterface.libtaos.taos_insert_lines(connection, c_lines, ctypes.c_int(numLines)) + @staticmethod def subscribe(connection, restart, topic, sql, interval): """Create a subscription diff --git a/src/connector/python/taos/connection.py b/src/connector/python/taos/connection.py index f6c395342c9c39a24bda6022f0ed36cb7bfe045b..88d06cd7186018788aeb25c982fc205441193cb8 100644 --- a/src/connector/python/taos/connection.py +++ b/src/connector/python/taos/connection.py @@ -66,6 +66,14 @@ class TDengineConnection(object): self._conn, restart, topic, sql, interval) return TDengineSubscription(sub) + def insertLines(self, lines): + """ + insert lines through line protocol + """ + if self._conn is None: + return None + return CTaosInterface.insertLines(self._conn, lines) + def cursor(self): """Return a new Cursor object using the connection. """ diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index eefa0ed92aa610899f69bc965e03d7c0ea5ee917..f57e553e3f43053552e30a5191abbd7374032f9d 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -101,6 +101,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TSC_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0218) //"Table does not exist") #define TSDB_CODE_TSC_EXCEED_SQL_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0219) //"SQL statement too long check maxSQLLength config") #define TSDB_CODE_TSC_FILE_EMPTY TAOS_DEF_ERROR_CODE(0, 0x021A) //"File is empty") +#define TSDB_CODE_TSC_LINE_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x021B) //"Syntax error in Line") // mnode #define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed") diff --git a/tests/examples/c/CMakeLists.txt b/tests/examples/c/CMakeLists.txt index 906ca2dd41c3fb222c816f49dd23af41cc81cffe..e94de3cbca574de71c8bcefc4b52173922c05a98 100644 --- a/tests/examples/c/CMakeLists.txt +++ b/tests/examples/c/CMakeLists.txt @@ -5,6 +5,8 @@ IF (TD_LINUX) AUX_SOURCE_DIRECTORY(. SRC) ADD_EXECUTABLE(demo apitest.c) TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread ) + ADD_EXECUTABLE(sml schemaless.c) + TARGET_LINK_LIBRARIES(sml taos_static trpc tutil pthread ) ADD_EXECUTABLE(subscribe subscribe.c) TARGET_LINK_LIBRARIES(subscribe taos_static trpc tutil pthread ) ADD_EXECUTABLE(epoll epoll.c) diff --git a/tests/examples/c/apitest.c b/tests/examples/c/apitest.c index a377bbc7b47e1a58d4b3294b88386a9c4fb74e47..ac522d6151b005e51dee0dd6d352f1eeb5cb62a9 100644 --- a/tests/examples/c/apitest.c +++ b/tests/examples/c/apitest.c @@ -964,21 +964,31 @@ int32_t verify_schema_less(TAOS* taos) { usleep(100000); char* lines[] = { - "st,t1=3i,t2=4,t3=\"t3\" c1=3i,c3=L\"passit\",c2=false,c4=4 1626006833639000000", - "st,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin\",c2=true,c4=5,c5=5 1626006833640000000", - "ste,t2=5,t3=L\"ste\" c1=true,c2=4,c3=\"iam\" 1626056811823316532", - "st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833642000000", - "ste,t2=5,t3=L\"ste2\" c3=\"iamszhou\",c4=false 1626056811843316532", - "ste,t2=5,t3=L\"ste2\" c3=\"iamszhou\",c4=false,c5=32b,c6=64s,c7=32w,c8=88.88f 1626056812843316532", - "st,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin\",c2=true,c4=5,c5=5,c6=7u 1626006933640000000", - "stf,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin\",c2=true,c4=5,c5=5,c6=7u 1626006933640000000", - "stf,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin_stf\",c2=false,c5=5,c6=7u 1626006933641a" + "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns", + "st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns", + "ste,t2=5f64,t3=L\"ste\" c1=true,c2=4i64,c3=\"iam\" 1626056811823316532ns", + "st,t1=4i64,t2=5f64,t3=\"t4\" c1=3i64,c3=L\"passitagain\",c2=true,c4=5f64 1626006833642000000ns", + "ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false 1626056811843316532ns", + "ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false,c5=32i8,c6=64i16,c7=32i32,c8=88.88f32 1626056812843316532ns", + "st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns", + "stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns", + "stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641000000ns" }; -// int code = taos_insert_lines(taos, lines , sizeof(lines)/sizeof(char*)); - int code = taos_insert_lines(taos, &lines[0], 1); - code = taos_insert_lines(taos, &lines[1], 1); + int code = 0; + code = taos_insert_lines(taos, lines , sizeof(lines)/sizeof(char*)); + char* lines2[] = { + "stg,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns", + "stg,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns" + }; + code = taos_insert_lines(taos, &lines2[0], 1); + code = taos_insert_lines(taos, &lines2[1], 1); + char* lines3[] = { + "sth,t1=4i64,t2=5f64,t4=5f64,ID=\"childtable\" c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641ms", + "sth,t1=4i64,t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933654ms" + }; + code = taos_insert_lines(taos, lines3, 2); return code; } @@ -1000,10 +1010,8 @@ int main(int argc, char *argv[]) { printf("client info: %s\n", info); printf("************ verify shemaless *************\n"); - int code = verify_schema_less(taos); - if (code == 0) { - return code; - } + verify_schema_less(taos); + printf("************ verify query *************\n"); verify_query(taos); diff --git a/tests/examples/c/schemaless.c b/tests/examples/c/schemaless.c new file mode 100644 index 0000000000000000000000000000000000000000..d6450914dfc7d406febca1792a35bd677c7b185a --- /dev/null +++ b/tests/examples/c/schemaless.c @@ -0,0 +1,161 @@ +#include "taos.h" +#include "taoserror.h" +#include "os.h" + +#include +#include +#include +#include +#include + +int numSuperTables = 8; +int numChildTables = 1024; +int numRowsPerChildTable = 128; + +void shuffle(char**lines, size_t n) +{ + if (n > 1) + { + size_t i; + for (i = 0; i < n - 1; i++) + { + size_t j = i + rand() / (RAND_MAX / (n - i) + 1); + char* t = lines[j]; + lines[j] = lines[i]; + lines[i] = t; + } + } +} + +static int64_t getTimeInUs() { + struct timeval systemTime; + gettimeofday(&systemTime, NULL); + return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec; +} + +int main(int argc, char* argv[]) { + TAOS_RES *result; + const char* host = "127.0.0.1"; + const char* user = "root"; + const char* passwd = "taosdata"; + + taos_options(TSDB_OPTION_TIMEZONE, "GMT-8"); + TAOS* taos = taos_connect(host, user, passwd, "", 0); + if (taos == NULL) { + printf("\033[31mfailed to connect to db, reason:%s\033[0m\n", taos_errstr(taos)); + exit(1); + } + + char* info = taos_get_server_info(taos); + printf("server info: %s\n", info); + info = taos_get_client_info(taos); + printf("client info: %s\n", info); + result = taos_query(taos, "drop database if exists db;"); + taos_free_result(result); + usleep(100000); + result = taos_query(taos, "create database db precision 'ms';"); + taos_free_result(result); + usleep(100000); + + (void)taos_select_db(taos, "db"); + + time_t ct = time(0); + int64_t ts = ct * 1000; + char* lineFormat = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms"; + + char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*)); + int l = 0; + for (int i = 0; i < numSuperTables; ++i) { + for (int j = 0; j < numChildTables; ++j) { + for (int k = 0; k < numRowsPerChildTable; ++k) { + char* line = calloc(512, 1); + snprintf(line, 512, lineFormat, i, j, ts + 10 * l); + lines[l] = line; + ++l; + } + } + } + shuffle(lines, numSuperTables * numChildTables * numRowsPerChildTable); + + printf("%s\n", "begin taos_insert_lines"); + int64_t begin = getTimeInUs(); + int32_t code = taos_insert_lines(taos, lines, numSuperTables * numChildTables * numRowsPerChildTable); + int64_t end = getTimeInUs(); + printf("code: %d, %s. time used: %"PRId64"\n", code, tstrerror(code), end-begin); + + char* lines_000_0[] = { + "sta1,id=sta1_1,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=255u8,t6=32770u16,t7=2147483699u32,t8=9223372036854775899u64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639000us" + }; + + code = taos_insert_lines(taos, lines_000_0 , sizeof(lines_000_0)/sizeof(char*)); + if (0 == code) { + printf("taos_insert_lines() lines_000_0 should return error\n"); + return -1; + } + + char* lines_000_1[] = { + "sta2,id=\"sta2_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=255u8,t6=32770u16,t7=2147483699u32,t8=9223372036854775899u64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639001" + }; + + code = taos_insert_lines(taos, lines_000_1 , sizeof(lines_000_1)/sizeof(char*)); + if (0 == code) { + printf("taos_insert_lines() lines_000_1 should return error\n"); + return -1; + } + + char* lines_000_2[] = { + "sta3,id=\"sta3_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 0" + }; + + code = taos_insert_lines(taos, lines_000_2 , sizeof(lines_000_2)/sizeof(char*)); + if (0 != code) { + printf("taos_insert_lines() lines_000_2 return code:%d (%s)\n", code, (char*)tstrerror(code)); + return -1; + } + + char* lines_001_0[] = { + "sta4,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639000us", + + }; + + code = taos_insert_lines(taos, lines_001_0 , sizeof(lines_001_0)/sizeof(char*)); + if (0 != code) { + printf("taos_insert_lines() lines_001_0 return code:%d (%s)\n", code, (char*)tstrerror(code)); + return -1; + } + + char* lines_001_1[] = { + "sta5,id=\"sta5_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639001" + }; + + code = taos_insert_lines(taos, lines_001_1 , sizeof(lines_001_1)/sizeof(char*)); + if (0 != code) { + printf("taos_insert_lines() lines_001_1 return code:%d (%s)\n", code, (char*)tstrerror(code)); + return -1; + } + + char* lines_001_2[] = { + "sta6,id=\"sta6_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 0" + }; + + code = taos_insert_lines(taos, lines_001_2 , sizeof(lines_001_2)/sizeof(char*)); + if (0 != code) { + printf("taos_insert_lines() lines_001_2 return code:%d (%s)\n", code, (char*)tstrerror(code)); + return -1; + } + + char* lines_002[] = { + "stb,id=\"stb_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639000000ns", + "stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639019us", + "stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833640ms", + "stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006834s" + }; + + code = taos_insert_lines(taos, lines_002 , sizeof(lines_002)/sizeof(char*)); + if (0 != code) { + printf("taos_insert_lines() lines_002 return code:%d (%s)\n", code, (char*)tstrerror(code)); + return -1; + } + + return 0; +} diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index c9a41a929b883e69f188cd3053ce4f4a88e7f446..555abf186319bafe06f95ef77e892252a7998f18 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -27,6 +27,7 @@ python3 ./test.py -f insert/bug3654.py python3 ./test.py -f insert/insertDynamicColBeforeVal.py python3 ./test.py -f insert/in_function.py python3 ./test.py -f insert/modify_column.py +python3 ./test.py -f insert/line_insert.py #table python3 ./test.py -f table/alter_wal0.py diff --git a/tests/pytest/insert/line_insert.py b/tests/pytest/insert/line_insert.py new file mode 100644 index 0000000000000000000000000000000000000000..ff3a32b0f79028ce4f612c12b41171a2bd45a765 --- /dev/null +++ b/tests/pytest/insert/line_insert.py @@ -0,0 +1,91 @@ +################################################################### +# Copyright (c) 2021 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from util.log import * +from util.cases import * +from util.sql import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + self._conn = conn + + def run(self): + print("running {}".format(__file__)) + tdSql.execute("drop database if exists test") + tdSql.execute("create database if not exists test precision 'us'") + tdSql.execute('use test') + + tdSql.execute('create stable ste(ts timestamp, f int) tags(t1 bigint)') + + lines = [ "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns", + "st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns", + "ste,t2=5f64,t3=L\"ste\" c1=true,c2=4i64,c3=\"iam\" 1626056811823316532ns", + "stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns", + "st,t1=4i64,t2=5f64,t3=\"t4\" c1=3i64,c3=L\"passitagain\",c2=true,c4=5f64 1626006833642000000ns", + "ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false 1626056811843316532ns", + "ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false,c5=32i8,c6=64i16,c7=32i32,c8=88.88f32 1626056812843316532ns", + "st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns", + "stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641000000ns" + ] + + code = self._conn.insertLines(lines) + print("insertLines result {}".format(code)) + + lines2 = [ "stg,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns", + "stg,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns" + ] + + code = self._conn.insertLines([ lines2[0] ]) + print("insertLines result {}".format(code)) + + self._conn.insertLines([ lines2[1] ]) + print("insertLines result {}".format(code)) + + tdSql.query("select * from st") + tdSql.checkRows(4) + + tdSql.query("select * from ste") + tdSql.checkRows(3) + + tdSql.query("select * from stf") + tdSql.checkRows(2) + + tdSql.query("select * from stg") + tdSql.checkRows(2) + + tdSql.query("show tables") + tdSql.checkRows(8) + + tdSql.query("describe stf") + tdSql.checkData(2, 2, 14) + + self._conn.insertLines([ + "sth,t1=4i64,t2=5f64,t4=5f64,ID=\"childtable\" c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641ms", + "sth,t1=4i64,t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933654ms" + ]) + tdSql.query('select tbname, * from sth') + tdSql.checkRows(2) + + tdSql.query('select tbname, * from childtable') + tdSql.checkRows(1) + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/script/fullGeneralSuite.sim b/tests/script/fullGeneralSuite.sim index c820dd3bf56fb5268092dbdec2d37d7cfa0ca0c5..5b5a911558b5eb1d27f34fb10590a3b9ff52658c 100644 --- a/tests/script/fullGeneralSuite.sim +++ b/tests/script/fullGeneralSuite.sim @@ -105,6 +105,7 @@ run general/parser/import_commit2.sim run general/parser/import_commit3.sim run general/parser/insert_tb.sim run general/parser/first_last.sim +run general/parser/line_insert.sim #unsupport run general/parser/import_file.sim run general/parser/lastrow.sim run general/parser/nchar.sim diff --git a/tests/script/general/parser/line_insert.sim b/tests/script/general/parser/line_insert.sim index f3067a3bbec8c7d566570704d6b84caaaa1f8e67..85f2714ad3100766557797d2158d9d3e181b0f0b 100644 --- a/tests/script/general/parser/line_insert.sim +++ b/tests/script/general/parser/line_insert.sim @@ -16,11 +16,10 @@ sql create database $db precision 'us' sql use $db sql create stable $mte (ts timestamp, f int) TAGS(t1 bigint) -line_insert st,t1=3i,t2=4,t3="t3" c1=3i,c3=L"passit",c2=false,c4=4 1626006833639000000 -line_insert st,t1=4i,t3="t41",t2=5 c1=3i,c3=L"passiT",c2=true,c4=5 1626006833640000000 -line_insert stf,t1=4i,t2=5,t3="t4" c1=3i,c3=L"passitagain",c2=true,c4=5 1626006833642000000 -line_insert ste,t2=5,t3=L"ste" c1=true,c2=4,c3="iam" 1626056811823316532 - +line_insert st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000ns +line_insert st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin",c2=true,c4=5f64,c5=5f64 1626006833640000000ns +line_insert ste,t2=5f64,t3=L"ste" c1=true,c2=4i64,c3="iam" 1626056811823316532ns +line_insert stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns sql select * from st if $rows != 2 then return -1 @@ -30,7 +29,7 @@ if $data00 != @21-07-11 20:33:53.639000@ then return -1 endi -if $data03 != @passit@ then +if $data02 != @passit@ then return -1 endi