From dbac6f2081636d78628191b4bf615c9289682b68 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Sun, 11 Jul 2021 23:52:19 +0800 Subject: [PATCH] before getChildTableName and insertBatch --- src/client/src/tscParseLineProtocol.c | 752 +++++++++++++++----------- src/inc/taos.h | 2 + tests/examples/c/apitest.c | 15 + 3 files changed, 444 insertions(+), 325 deletions(-) diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 2738fe6f7a..3a88d2e906 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -15,263 +15,6 @@ #include "tskiplist.h" #include "tscUtil.h" -typedef enum { - LP_ITEM_TAG, - LP_ITEM_FIELD -} LPItemKind; - -typedef struct { - SStrToken key; - SStrToken value; - - char name[TSDB_COL_NAME_LEN]; - int8_t type; - int16_t bytes; - - char* payload; -}SLPItem; - -typedef struct { - SStrToken measToken; - SStrToken tsToken; - - char sTableName[TSDB_TABLE_NAME_LEN]; - SArray* tags; - SArray* fields; - int64_t ts; - -} SLPPoint; - -typedef enum { - LP_MEASUREMENT, - LP_TAG_KEY, - LP_TAG_VALUE, - LP_FIELD_KEY, - LP_FIELD_VALUE -} LPPart; - -int32_t scanToCommaOrSpace(SStrToken s, int32_t start, int32_t* index, LPPart part) { - for (int32_t i = start; i < s.n; ++i) { - if (s.z[i] == ',' || s.z[i] == ' ') { - *index = i; - return 0; - } - } - return -1; -} - -int32_t scanToEqual(SStrToken s, int32_t start, int32_t* index) { - for (int32_t i = start; i < s.n; ++i) { - if (s.z[i] == '=') { - *index = i; - return 0; - } - } - return -1; -} - -int32_t setPointMeasurement(SLPPoint* point, SStrToken token) { - point->measToken = token; - if (point->measToken.n < TSDB_TABLE_NAME_LEN) { - strncpy(point->sTableName, point->measToken.z, point->measToken.n); - point->sTableName[point->measToken.n] = '\0'; - } - return 0; -} - -int32_t setItemKey(SLPItem* item, SStrToken key, LPPart part) { - item->key = key; - if (item->key.n < TSDB_COL_NAME_LEN) { - strncpy(item->name, item->key.z, item->key.n); - item->name[item->key.n] = '\0'; - } - return 0; -} - -int32_t setItemValue(SLPItem* item, SStrToken value, LPPart part) { - item->value = value; - return 0; -} - -int32_t parseItemValue(SLPItem* item, LPItemKind kind) { - char* sv = item->value.z; - char* last = item->value.z + item->value.n - 1; - - if (isdigit(sv[0]) || sv[0] == '-') { - if (*last == 'i') { - item->type = TSDB_DATA_TYPE_BIGINT; - item->bytes = (int16_t)tDataTypes[item->type].bytes; - item->payload = malloc(item->bytes); - char* endptr = NULL; - *(item->payload) = strtoll(sv, &endptr, 10); - } else { - item->type = TSDB_DATA_TYPE_DOUBLE; - item->bytes = (int16_t)tDataTypes[item->type].bytes; - item->payload = malloc(item->bytes); - char* endptr = NULL; - *(item->payload) = strtold(sv, &endptr); - } - } else if ((sv[0] == 'L' && sv[1] =='"') || sv[0] == '"' ) { - if (sv[0] == 'L') { - item->type = TSDB_DATA_TYPE_NCHAR; - uint32_t bytes = item->value.n - 3; -// uint32_t len = bytes; -// char* ucs = malloc(len); -// int32_t ncharBytes = 0; -// taosMbsToUcs4(sv+2, len, ucs, len, &ncharBytes); -// item->bytes = ncharBytes; -// item->payload = malloc(ncharBytes); -// memcpy(item->payload, ucs, ncharBytes); -// free(ucs); - item->bytes = bytes; - item->payload = malloc(bytes); - memcpy(item->payload, sv+1, bytes); - } else if (sv[0]=='"'){ - item->type = TSDB_DATA_TYPE_BINARY; - uint32_t bytes = item->value.n - 2; - item->bytes = bytes; - item->payload = malloc(bytes); - memcpy(item->payload, sv+1, bytes); - } - } else if (sv[0] == 't' || sv[0] == 'f' || sv[0]=='T' || sv[0] == 'F') { - item->type = TSDB_DATA_TYPE_BOOL; - item->bytes = tDataTypes[item->type].bytes; - item->payload = malloc(tDataTypes[item->type].bytes); - *(item->payload) = tolower(sv[0])=='t' ? true : false; - } - return 0; -} - -int32_t compareLPItemKey(const void* p1, const void* p2) { - const SLPItem* t1 = p1; - const SLPItem* t2 = p2; - uint32_t min = (t1->key.n < t2->key.n) ? t1->key.n : t2->key.n; - int res = strncmp(t1->key.z, t2->key.z, min); - if (res != 0) { - return res; - } else { - return (int)(t1->key.n) - (int)(t2->key.n); - } -} - -int32_t setPointTimeStamp(SLPPoint* point, SStrToken tsToken) { - point->tsToken = tsToken; - return 0; -} - -int32_t parsePointTime(SLPPoint* point) { - if (point->tsToken.n <= 0) { - point->ts = taosGetTimestampNs(); - } else { - char* endptr = NULL; - point->ts = strtoll(point->tsToken.z, &endptr, 10); - } - return 0; -} - -int32_t tscParseLine(SStrToken line, SLPPoint* point) { - int32_t pos = 0; - - int32_t start = 0; - int32_t err = scanToCommaOrSpace(line, start, &pos, LP_MEASUREMENT); - if (err != 0) { - tscError("a"); - return err; - } - - SStrToken measurement = {.z = line.z+start, .n = pos-start}; - setPointMeasurement(point, measurement); - point->tags = taosArrayInit(64, sizeof(SLPItem)); - start = pos + 1; - while (line.z[start] == ',') { - SLPItem item; - - err = scanToEqual(line, start, &pos); - if (err != 0) { - tscError("b"); - goto error; - } - - SStrToken tagKey = {.z = line.z + start, .n = pos-start}; - setItemKey(&item, tagKey, LP_TAG_KEY); - - start = pos + 1; - err = scanToCommaOrSpace(line, start, &pos, LP_TAG_VALUE); - if (err != 0) { - tscError("c"); - goto error; - } - - SStrToken tagValue = {.z = line.z + start, .n = pos-start}; - setItemValue(&item, tagValue, LP_TAG_VALUE); - - parseItemValue(&item, LP_ITEM_TAG); - taosArrayPush(point->tags, &item); - - start = pos + 1; - } - - taosArraySort(point->tags, compareLPItemKey); - - point->fields = taosArrayInit(64, sizeof(SLPItem)); - do { - SLPItem item; - err = scanToEqual(line, start, &pos); - if (err != 0) { - goto error; - } - SStrToken fieldKey = {.z = line.z + start, .n = pos- start}; - setItemKey(&item, fieldKey, LP_FIELD_KEY); - - start = pos + 1; - err = scanToCommaOrSpace(line, start, &pos, LP_FIELD_VALUE); - if (err != 0) { - goto error; - } - SStrToken fieldValue = {.z = line.z + start, .n = pos - start}; - setItemValue(&item, fieldValue, LP_TAG_VALUE); - - parseItemValue(&item, LP_ITEM_FIELD); - taosArrayPush(point->fields, &item); - - start = pos + 1; - } while (line.z[pos] == ','); - - taosArraySort(point->fields, compareLPItemKey); - - SStrToken tsToken = {.z = line.z+start, .n = line.n-start}; - setPointTimeStamp(point, tsToken); - parsePointTime(point); - - goto done; - -error: - // free array - return err; -done: - return 0; -} - - -int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) { - for (int32_t i = 0; i < numLines; ++i) { - SStrToken tkLine = {.z = lines[i], .n = strlen(lines[i])+1}; - SLPPoint point; - tscParseLine(tkLine, &point); - taosArrayPush(points, &point); - } - return 0; -} - -TAOS_RES* taos_insert_by_lines(TAOS* taos, char* lines[], int numLines) { - SArray* points = taosArrayInit(numLines, sizeof(SLPPoint)); - tscParseLines(lines, numLines, points, NULL); - - - return NULL; -} -//================================================================================================= - typedef struct { char sTableName[TSDB_TABLE_NAME_LEN]; SHashObj* tagHash; @@ -305,6 +48,8 @@ typedef struct { SSmlSTableSchema* schema; } TAOS_SML_DATA_POINT; +//================================================================================================= + int compareSmlColKv(const void* p1, const void* p2) { TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1; TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2; @@ -318,32 +63,6 @@ int compareSmlColKv(const void* p1, const void* p2) { } } -int32_t getChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) { - qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv); - - SStringBuilder sb; memset(&sb, 0, sizeof(sb)); - taosStringBuilderAppendString(&sb, point->stableName); - for (int j = 0; j < point->tagNum; ++j) { - TAOS_SML_KV* tagKv = point->tags + j; - taosStringBuilderAppendChar(&sb, ','); - taosStringBuilderAppendString(&sb, tagKv->key); - taosStringBuilderAppendChar(&sb, '='); - taosStringBuilderAppend(&sb, tagKv->value, tagKv->length); - } - size_t len = 0; - char* keyJoined = taosStringBuilderGetResult(&sb, &len); - MD5_CTX context; - MD5Init(&context); - MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); - MD5Final(&context); - *tableNameLen = snprintf(tableName, *tableNameLen, - "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], - context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6], - context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], - context.digest[12], context.digest[13], context.digest[14], context.digest[15]); - return 0; -} - int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { int32_t code = 0; @@ -393,15 +112,14 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { tscFreeSqlObj(pSql); - uint32_t size = tscGetTableMetaMaxSize(); STableMeta* tableMeta = calloc(1, size); taosHashGetClone(tscTableMetaInfo, fullTableName, strlen(fullTableName), NULL, tableMeta, -1); - tstrncpy(schema->sTableName, tableName, strlen(tableName)); + tstrncpy(schema->sTableName, tableName, strlen(tableName)+1); for (int i=0; itableInfo.numOfColumns; ++i) { SSchema field; - tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)); + tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)+1); field.type = tableMeta->schema[i].type; field.bytes = tableMeta->schema[i].bytes; SSchema* pField = taosArrayPush(schema->fields, &field); @@ -411,13 +129,13 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { for (int i=0; itableInfo.numOfTags; ++i) { int j = i + tableMeta->tableInfo.numOfColumns; SSchema field; - tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)); + tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)+1); field.type = tableMeta->schema[j].type; field.bytes = tableMeta->schema[j].bytes; SSchema* pField = taosArrayPush(schema->tags, &field); taosHashPut(schema->tagHash, field.name, strlen(field.name), &pField, POINTER_BYTES); } - + free(tableMeta); tableMeta = NULL; return code; } @@ -586,6 +304,7 @@ int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { capacity-n, &outBytes); TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery code = taos_errno(res); + break; } case SCHEMA_ACTION_CHANGE_TAG_SIZE: { int n = sprintf(result, "alter stable %s modify tag ", action->alterSTable.sTableName); @@ -631,15 +350,46 @@ int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) { return code; } +int32_t getChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) { + qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv); + + SStringBuilder sb; memset(&sb, 0, sizeof(sb)); + taosStringBuilderAppendString(&sb, point->stableName); + for (int j = 0; j < point->tagNum; ++j) { + taosStringBuilderAppendChar(&sb, ','); + TAOS_SML_KV* tagKv = point->tags + j; + taosStringBuilderAppendString(&sb, tagKv->key); + taosStringBuilderAppendChar(&sb, '='); + taosStringBuilderAppend(&sb, tagKv->value, tagKv->length); + } + size_t len = 0; + char* keyJoined = taosStringBuilderGetResult(&sb, &len); + MD5_CTX context; + MD5Init(&context); + MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); + MD5Final(&context); + *tableNameLen = snprintf(tableName, *tableNameLen, + "tbl%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], + context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6], + context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], + context.digest[12], context.digest[13], context.digest[14], context.digest[15]); + taosStringBuilderDestroy(&sb); + return 0; +} + int32_t getPreparedSQL(const char* sTableName, SArray* tagsSchema, SArray* colsSchema, char* result, int16_t freeBytes) { size_t numTags = taosArrayGetSize(tagsSchema); size_t numCols = taosArrayGetSize(colsSchema); - sprintf(result, "insert into ? using %s(", sTableName); - for (int i = 0; i < numTags; ++i) { - SSchema* tagSchema = taosArrayGet(tagsSchema, i); - snprintf(result+strlen(result), freeBytes-strlen(result), "%s,", tagSchema->name); - } - snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ") tags ("); + sprintf(result, "insert into ? using %s", sTableName); + +// snprintf(result+strlen(result), freeBytes-strlen(result), "("); +// for (int i = 0; i < numTags; ++i) { +// SSchema* tagSchema = taosArrayGet(tagsSchema, i); +// snprintf(result+strlen(result), freeBytes-strlen(result), "%s,", tagSchema->name); +// } +// snprintf(result + strlen(result)-1, freeBytes-strlen(result)+1, ")"); + + snprintf(result + strlen(result), freeBytes-strlen(result), " tags ("); for (int i = 0; i < numTags; ++i) { snprintf(result+strlen(result), freeBytes-strlen(result), "?,"); @@ -661,17 +411,38 @@ int32_t getPreparedSQL(const char* sTableName, SArray* tagsSchema, SArray* colsS int32_t insertBatch(TAOS* taos, char* sql, char* cTableName, SArray* tagsBind, SArray* rowsBind) { TAOS_STMT* stmt = taos_stmt_init(taos); - taos_stmt_prepare(stmt, sql, strlen(sql)); + int32_t code; + code = taos_stmt_prepare(stmt, sql, strlen(sql)); + if (code != 0) { + printf("%s", taos_stmt_errstr(stmt)); + return code; + } - taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind)); + code = taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind)); + if (code != 0) { + printf("%s", taos_stmt_errstr(stmt)); + return code; + } size_t rows = taosArrayGetSize(rowsBind); for (int32_t i = 0; i < rows; ++i) { - TAOS_BIND* colBind = taosArrayGetP(rowsBind, i); - taos_stmt_bind_param(stmt, colBind); - taos_stmt_add_batch(stmt); + SArray* colBind = taosArrayGetP(rowsBind, i); + code = taos_stmt_bind_param(stmt, TARRAY_GET_START(colBind)); + if (code != 0) { + printf("%s", taos_stmt_errstr(stmt)); + return code; + } + code = taos_stmt_add_batch(stmt); + if (code != 0) { + printf("%s", taos_stmt_errstr(stmt)); + return code; + } } - taos_stmt_execute(stmt); + code = taos_stmt_execute(stmt); + if (code != 0) { + printf("%s", taos_stmt_errstr(stmt)); + return code; + } TAOS_RES* res = taos_stmt_use_result(stmt); return taos_errno(res); } @@ -682,7 +453,7 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) TAOS_SML_DATA_POINT * point = points + i; if (!point->childTableName) { char childTableName[TSDB_TABLE_NAME_LEN]; - int32_t tableNameLen; + int32_t tableNameLen = TSDB_TABLE_NAME_LEN; getChildTableName(point, childTableName, &tableNameLen); point->childTableName = calloc(1, tableNameLen+1); strncpy(point->childTableName, childTableName, tableNameLen); @@ -696,50 +467,62 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) cTablePoints = taosArrayInit(64, sizeof(point)); taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES); } - taosArrayPush(cTablePoints, point); + taosArrayPush(cTablePoints, &point); } + int isNullColBind = TSDB_TRUE; SArray** pCTablePoints = taosHashIterate(cname2points, NULL); while (pCTablePoints) { SArray* cTablePoints = *pCTablePoints; - TAOS_SML_DATA_POINT * point = taosArrayGet(cTablePoints, 0); + + TAOS_SML_DATA_POINT * point = taosArrayGetP(cTablePoints, 0); int32_t numTags = taosArrayGetSize(point->schema->tags); int32_t numCols = taosArrayGetSize(point->schema->fields); char* stableName = point->stableName; char* ctableName = point->childTableName; - char sql[TSDB_MAX_BINARY_LEN]; - getPreparedSQL(stableName, point->schema->tags, point->schema->fields, sql, TSDB_MAX_BINARY_LEN); + + SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); + taosArraySetSize(tagBinds, numTags); + for (int j = 0; j < numTags; ++j) { + TAOS_BIND* bind = taosArrayGet(tagBinds, j); + bind->is_null = &isNullColBind; + } + for (int j = 0; j < point->tagNum; ++j) { + TAOS_SML_KV* kv = point->tags + j; + int32_t idx = TARRAY_ELEM_IDX(point->schema->tags, kv->schema); + TAOS_BIND* bind = taosArrayGet(tagBinds, idx); + bind->buffer_type = kv->type; + bind->length = (uintptr_t*)&kv->length; + bind->buffer = kv->value; + bind->is_null = NULL; + } size_t rows = taosArrayGetSize(cTablePoints); SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES); - SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); for (int i = 0; i < rows; ++i) { - point = taosArrayGet(cTablePoints, i); - - taosArraySetSize(tagBinds, numTags); - for (int j = 0; j < point->tagNum; ++j) { - TAOS_SML_KV* kv = point->tags + j; - int32_t idx = TARRAY_ELEM_IDX(point->schema->tags, kv->schema); - TAOS_BIND* bind = taosArrayGet(tagBinds, idx); - bind->buffer_type = kv->type; - bind->length = (uintptr_t*)&kv->length; - bind->buffer = kv->value; - } + point = taosArrayGetP(cTablePoints, i); SArray* colBinds = taosArrayInit(numCols, sizeof(TAOS_BIND)); taosArraySetSize(colBinds, numCols); - for (int j = 0; jfieldNum; ++j) { + for (int j = 0; j < numCols; ++j) { + TAOS_BIND* bind = taosArrayGet(colBinds, j); + bind->is_null = &isNullColBind; + } + for (int j = 0; j < point->fieldNum; ++j) { TAOS_SML_KV* kv = point->fields + j; int32_t idx = TARRAY_ELEM_IDX(point->schema->fields, kv->schema); TAOS_BIND* bind = taosArrayGet(colBinds, idx); bind->buffer_type = kv->type; bind->length = (uintptr_t*)&kv->length; bind->buffer = kv->value; + bind->is_null = NULL; } taosArrayPush(rowsBind, &colBinds); } + char sql[TSDB_MAX_BINARY_LEN]; + getPreparedSQL(stableName, point->schema->tags, point->schema->fields, sql, TSDB_MAX_BINARY_LEN); insertBatch(taos, sql, ctableName, tagBinds, rowsBind); pCTablePoints = taosHashIterate(cname2points, pCTablePoints); @@ -747,7 +530,6 @@ int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints) return 0; } - int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { int32_t code = TSDB_CODE_SUCCESS; SArray* stableArray = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray @@ -792,11 +574,13 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { for (int i = 0; i < numStable; ++i) { SSmlSTableSchema* pointSchema = taosArrayGet(stableArray, i); SSmlSTableSchema dbSchema = {0}; - dbSchema.fields = taosArrayInit(64, sizeof(SSchema)); dbSchema.tags = taosArrayInit(8, sizeof(SSchema)); + dbSchema.fields = taosArrayInit(64, sizeof(SSchema)); dbSchema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); dbSchema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema); + if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) { SSchemaAction schemaAction = {0}; schemaAction.action = SCHEMA_ACTION_CREATE_STABLE; @@ -813,19 +597,23 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { SHashObj* dbFieldHash = dbSchema.fieldHash; for (int j = 0; j < pointTagSize; ++j) { - SSchema* pointTag = taosArrayGet(pointSchema->tags, j); + SSchema* pointTag = taosArrayGet(pointSchema->tags, j); SSchemaAction schemaAction = {0}; - bool actionNeeded = false; + bool actionNeeded = false; generateSchemaAction(pointTag, dbTagHash, true, pointSchema->sTableName, &schemaAction, &actionNeeded); if (actionNeeded) { taosArrayPush(schemaActions, &schemaAction); } } - for (int j = 0; j < pointFieldSize; ++j) { - SSchema* pointCol = taosArrayGet(pointSchema->tags, j); + SSchema* pointColTs = taosArrayGet(pointSchema->fields, 0); + SSchema* dbColTs = taosArrayGet(dbSchema.fields, 0); + memcpy(pointColTs->name, dbColTs->name, TSDB_COL_NAME_LEN); + + for (int j = 1; j < pointFieldSize; ++j) { + SSchema* pointCol = taosArrayGet(pointSchema->fields, j); SSchemaAction schemaAction = {0}; - bool actionNeeded = false; + bool actionNeeded = false; generateSchemaAction(pointCol, dbFieldHash, false, pointSchema->sTableName, &schemaAction, &actionNeeded); if (actionNeeded) { taosArrayPush(schemaActions, &schemaAction); @@ -849,3 +637,317 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { //todo: table/column length check //todo: type check //todo: taosmbs2ucs4 check + +//========================================================================= + +typedef enum { + LP_ITEM_TAG, + LP_ITEM_FIELD +} LPItemKind; + +typedef struct { + SStrToken keyToken; + SStrToken valueToken; + + char key[TSDB_COL_NAME_LEN]; + int8_t type; + int16_t length; + + char* value; +}SLPItem; + +typedef struct { + SStrToken measToken; + SStrToken tsToken; + + char sTableName[TSDB_TABLE_NAME_LEN]; + SArray* tags; + SArray* fields; + int64_t ts; + +} SLPPoint; + +typedef enum { + LP_MEASUREMENT, + LP_TAG_KEY, + LP_TAG_VALUE, + LP_FIELD_KEY, + LP_FIELD_VALUE +} LPPart; + +int32_t scanToCommaOrSpace(SStrToken s, int32_t start, int32_t* index, LPPart part) { + for (int32_t i = start; i < s.n; ++i) { + if (s.z[i] == ',' || s.z[i] == ' ') { + *index = i; + return 0; + } + } + return -1; +} + +int32_t scanToEqual(SStrToken s, int32_t start, int32_t* index) { + for (int32_t i = start; i < s.n; ++i) { + if (s.z[i] == '=') { + *index = i; + return 0; + } + } + return -1; +} + +int32_t setPointMeasurement(SLPPoint* point, SStrToken token) { + point->measToken = token; + if (point->measToken.n < TSDB_TABLE_NAME_LEN) { + strncpy(point->sTableName, point->measToken.z, point->measToken.n); + point->sTableName[point->measToken.n] = '\0'; + } + return 0; +} + +int32_t setItemKey(SLPItem* item, SStrToken key, LPPart part) { + item->keyToken = key; + if (item->keyToken.n < TSDB_COL_NAME_LEN) { + strncpy(item->key, item->keyToken.z, item->keyToken.n); + item->key[item->keyToken.n] = '\0'; + } + return 0; +} + +int32_t setItemValue(SLPItem* item, SStrToken value, LPPart part) { + item->valueToken = value; + return 0; +} + +int32_t parseItemValue(SLPItem* item, LPItemKind kind) { + char* sv = item->valueToken.z; + char* last = item->valueToken.z + item->valueToken.n - 1; + + if (isdigit(sv[0]) || sv[0] == '-') { + if (*last == 'i') { + item->type = TSDB_DATA_TYPE_BIGINT; + item->length = (int16_t)tDataTypes[item->type].bytes; + item->value = malloc(item->length); + char* endptr = NULL; + *(item->value) = strtoll(sv, &endptr, 10); + } else { + item->type = TSDB_DATA_TYPE_DOUBLE; + item->length = (int16_t)tDataTypes[item->type].bytes; + item->value = malloc(item->length); + char* endptr = NULL; + *(item->value) = strtold(sv, &endptr); + } + } else if ((sv[0] == 'L' && sv[1] =='"') || sv[0] == '"' ) { + if (sv[0] == 'L') { + item->type = TSDB_DATA_TYPE_NCHAR; + uint32_t bytes = item->valueToken.n - 3; + item->length = bytes; + item->value = malloc(bytes); + memcpy(item->value, sv+1, bytes); + } else if (sv[0]=='"'){ + item->type = TSDB_DATA_TYPE_BINARY; + uint32_t bytes = item->valueToken.n - 2; + item->length = bytes; + item->value = malloc(bytes); + memcpy(item->value, sv+1, bytes); + } + } else if (sv[0] == 't' || sv[0] == 'f' || sv[0]=='T' || sv[0] == 'F') { + item->type = TSDB_DATA_TYPE_BOOL; + item->length = tDataTypes[item->type].bytes; + item->value = malloc(tDataTypes[item->type].bytes); + *(item->value) = tolower(sv[0])=='t' ? TSDB_TRUE : TSDB_FALSE; + } + return 0; +} + +int32_t compareLPItemKey(const void* p1, const void* p2) { + const SLPItem* t1 = p1; + const SLPItem* t2 = p2; + uint32_t min = (t1->keyToken.n < t2->keyToken.n) ? t1->keyToken.n : t2->keyToken.n; + int res = strncmp(t1->keyToken.z, t2->keyToken.z, min); + if (res != 0) { + return res; + } else { + return (int)(t1->keyToken.n) - (int)(t2->keyToken.n); + } +} + +int32_t setPointTimeStamp(SLPPoint* point, SStrToken tsToken) { + point->tsToken = tsToken; + return 0; +} + +int32_t parsePointTime(SLPPoint* point) { + if (point->tsToken.n <= 0) { + point->ts = taosGetTimestampNs(); + } else { + char* endptr = NULL; + point->ts = strtoll(point->tsToken.z, &endptr, 10); + } + return 0; +} + +int32_t tscParseLine(SStrToken line, SLPPoint* point) { + int32_t pos = 0; + + int32_t start = 0; + int32_t err = scanToCommaOrSpace(line, start, &pos, LP_MEASUREMENT); + if (err != 0) { + tscError("a"); + return err; + } + + SStrToken measurement = {.z = line.z+start, .n = pos-start}; + setPointMeasurement(point, measurement); + point->tags = taosArrayInit(64, sizeof(SLPItem)); + start = pos; + while (line.z[start] == ',') { + SLPItem item; + + start++; + err = scanToEqual(line, start, &pos); + if (err != 0) { + tscError("b"); + goto error; + } + + SStrToken tagKey = {.z = line.z + start, .n = pos-start}; + setItemKey(&item, tagKey, LP_TAG_KEY); + + start = pos + 1; + err = scanToCommaOrSpace(line, start, &pos, LP_TAG_VALUE); + if (err != 0) { + tscError("c"); + goto error; + } + + SStrToken tagValue = {.z = line.z + start, .n = pos-start}; + setItemValue(&item, tagValue, LP_TAG_VALUE); + + parseItemValue(&item, LP_ITEM_TAG); + taosArrayPush(point->tags, &item); + + start = pos; + } + + taosArraySort(point->tags, compareLPItemKey); + + point->fields = taosArrayInit(64, sizeof(SLPItem)); + + start++; + do { + SLPItem item; + + err = scanToEqual(line, start, &pos); + if (err != 0) { + goto error; + } + SStrToken fieldKey = {.z = line.z + start, .n = pos- start}; + setItemKey(&item, fieldKey, LP_FIELD_KEY); + + start = pos + 1; + err = scanToCommaOrSpace(line, start, &pos, LP_FIELD_VALUE); + if (err != 0) { + goto error; + } + SStrToken fieldValue = {.z = line.z + start, .n = pos - start}; + setItemValue(&item, fieldValue, LP_TAG_VALUE); + + parseItemValue(&item, LP_ITEM_FIELD); + taosArrayPush(point->fields, &item); + + start = pos + 1; + } while (line.z[pos] == ','); + + taosArraySort(point->fields, compareLPItemKey); + + SStrToken tsToken = {.z = line.z+start, .n = line.n-start}; + setPointTimeStamp(point, tsToken); + parsePointTime(point); + + goto done; + + error: + // free array + return err; + done: + return 0; +} + + +int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) { + for (int32_t i = 0; i < numLines; ++i) { + SStrToken tkLine = {.z = lines[i], .n = strlen(lines[i])+1}; + SLPPoint point; + tscParseLine(tkLine, &point); + taosArrayPush(points, &point); + } + return 0; +} + +int taos_insert_by_lines(TAOS* taos, char* lines[], int numLines) { + SArray* lpPoints = taosArrayInit(numLines, sizeof(SLPPoint)); + tscParseLines(lines, numLines, lpPoints, NULL); + + size_t numPoints = taosArrayGetSize(lpPoints); + TAOS_SML_DATA_POINT* points = calloc(numPoints, sizeof(TAOS_SML_DATA_POINT)); + for (int i = 0; i < numPoints; ++i) { + SLPPoint* lpPoint = taosArrayGet(lpPoints, i); + TAOS_SML_DATA_POINT* point = points+i; + point->stableName = calloc(1, strlen(lpPoint->sTableName)+1); + strncpy(point->stableName, lpPoint->sTableName, strlen(lpPoint->sTableName)); + point->stableName[strlen(lpPoint->sTableName)] = '\0'; + + size_t lpTagSize = taosArrayGetSize(lpPoint->tags); + point->tags = calloc(lpTagSize, sizeof(TAOS_SML_KV)); + point->tagNum = lpTagSize; + for (int j=0; jtags, j); + TAOS_SML_KV* tagKv = point->tags + j; + + size_t kenLen = strlen(lpTag->key); + tagKv->key = calloc(1, kenLen+1); + strncpy(tagKv->key, lpTag->key, kenLen); + tagKv->key[kenLen] = '\0'; + + tagKv->type = lpTag->type; + tagKv->length = lpTag->length; + tagKv->value = malloc(tagKv->length); + memcpy(tagKv->value, lpTag->value, tagKv->length); + } + + size_t lpFieldsSize = taosArrayGetSize(lpPoint->fields); + point->fields = calloc(lpFieldsSize + 1, sizeof(TAOS_SML_KV)); + point->fieldNum = lpFieldsSize + 1; + + TAOS_SML_KV* tsField = point->fields + 0; + char tsKey[256]; + snprintf(tsKey, 256, "_%s_ts", point->stableName); + size_t tsKeyLen = strlen(tsKey); + tsField->key = calloc(1, tsKeyLen+1); + strncpy(tsField->key, tsKey, tsKeyLen); + tsField->key[tsKeyLen] = '\0'; + tsField->type = TSDB_DATA_TYPE_TIMESTAMP; + tsField->length = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes; + tsField->value = malloc(tsField->length); + memcpy(tsField->value, &(lpPoint->ts), tsField->length); + + for (int j=0; jfields, j); + TAOS_SML_KV* fieldKv = point->fields + j + 1; + + size_t kenLen = strlen(lpField->key); + fieldKv->key = calloc(1, kenLen+1); + strncpy(fieldKv->key, lpField->key, kenLen); + fieldKv->key[kenLen] = '\0'; + + fieldKv->type = lpField->type; + fieldKv->length = lpField->length; + fieldKv->value = malloc(fieldKv->length); + memcpy(fieldKv->value, lpField->value, fieldKv->length); + } + } + + taos_sml_insert(taos, points, numPoints); + return 0; +} + diff --git a/src/inc/taos.h b/src/inc/taos.h index 9f72945ef0..ca18c4fb93 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -169,6 +169,8 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); +DLL_EXPORT int taos_insert_by_lines(TAOS* taos, char* lines[], int numLines); + #ifdef __cplusplus } #endif diff --git a/tests/examples/c/apitest.c b/tests/examples/c/apitest.c index 0f24df0f47..dc77677774 100644 --- a/tests/examples/c/apitest.c +++ b/tests/examples/c/apitest.c @@ -949,6 +949,15 @@ void verify_stream(TAOS* taos) { taos_close_stream(strm); } +int32_t verify_schema_less(TAOS* taos) { + prepare_data(taos); + char* lines[] = { + "st,t1=3i,t2=4,t3=\"t3\" c1=3i,c3=L\"passit\",c2=false,c4=4 1626006833639162922" + }; + int code = taos_insert_by_lines(taos, lines , 1); + return code; +} + int main(int argc, char *argv[]) { const char* host = "127.0.0.1"; const char* user = "root"; @@ -967,6 +976,12 @@ int main(int argc, char *argv[]) { info = taos_get_client_info(taos); printf("client info: %s\n", info); + printf("************ verify query *************\n"); + int code = verify_schema_less(taos); + if (code == 0) { + return code; + } + printf("************ verify query *************\n"); verify_query(taos); -- GitLab