diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index e0aa2b432744f43fb1fd951c646975bab37eef75..dd24d2c458767291e4c5e92f761a24be3d50b807 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -570,6 +570,40 @@ static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableNa return 0; } + +static int32_t changeChildTableTagValue(TAOS* taos, const char* cTableName, const char* tagName, TAOS_BIND* bind) { + char sql[512]; + sprintf(sql, "alter table %s set tag %s=?", cTableName, tagName); + + int32_t code; + TAOS_STMT* stmt = taos_stmt_init(taos); + code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql)); + + if (code != 0) { + tscError("%s", taos_stmt_errstr(stmt)); + return code; + } + + code = taos_stmt_bind_param(stmt, bind); + if (code != 0) { + tscError("%s", taos_stmt_errstr(stmt)); + return code; + } + + code = taos_stmt_execute(stmt); + if (code != 0) { + tscError("%s", taos_stmt_errstr(stmt)); + return code; + } + + code = taos_stmt_close(stmt); + if (code != 0) { + tscError("%s", taos_stmt_errstr(stmt)); + return code; + } + return code; +} + static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName, SArray* tagsSchema, SArray* tagsBind) { size_t numTags = taosArrayGetSize(tagsSchema); char* sql = malloc(tsMaxSQLStringLen+1); @@ -742,97 +776,192 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu return 0; } -static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas) { - int32_t code = TSDB_CODE_SUCCESS; +static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableName, + SSmlSTableSchema* sTableSchema, SArray* cTablePoints) { + size_t numTags = taosArrayGetSize(sTableSchema->tags); + size_t rows = taosArrayGetSize(cTablePoints); - SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), - true, false); - arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas); + TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0}; + for (int i= 0; i < rows; ++i) { + TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i); + for (int j = 0; j < pDataPoint->tagNum; ++j) { + TAOS_SML_KV* kv = pDataPoint->tags + j; + tagKVs[kv->fieldSchemaIdx] = kv; + } + } + int32_t notNullTagsIndices[TSDB_MAX_TAGS] = {0}; + int32_t numNotNullTags = 0; + for (int32_t i = 0; i < numTags; ++i) { + if (tagKVs[i] != NULL) { + notNullTagsIndices[numNotNullTags] = i; + ++numNotNullTags; + } + } + + SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); + taosArraySetSize(tagBinds, numTags); int isNullColBind = TSDB_TRUE; - SArray** pCTablePoints = taosHashIterate(cname2points, NULL); - while (pCTablePoints) { - SArray* cTablePoints = *pCTablePoints; + for (int j = 0; j < numTags; ++j) { + TAOS_BIND* bind = taosArrayGet(tagBinds, j); + bind->is_null = &isNullColBind; + } + for (int j = 0; j < numTags; ++j) { + if (tagKVs[j] == NULL) continue; + TAOS_SML_KV* kv = tagKVs[j]; + TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx); + bind->buffer_type = kv->type; + bind->length = malloc(sizeof(uintptr_t*)); + *bind->length = kv->length; + bind->buffer = kv->value; + bind->is_null = NULL; + } + + // select tag1,tag2,... from stable where tbname in (ctable) + char sql[TSDB_MAX_BINARY_LEN]; + int capacity = TSDB_MAX_BINARY_LEN; + snprintf(sql, capacity, "select tbname, "); + for (int i = 0; i < numNotNullTags ; ++i) { + snprintf(sql + strlen(sql), capacity-strlen(sql), "%s,", tagKVs[notNullTagsIndices[i]]->key); + } + + snprintf(sql + strlen(sql) - 1, capacity - strlen(sql) + 1, + " from %s where tbname in (\'%s\')", sTableName, cTableName); + TAOS_RES* result = taos_query(taos, sql); + int32_t code = taos_errno(result); + if (code != 0) { + tscError("%s", taos_errstr(result)); + taos_free_result(result); + goto cleanup; + } + + // check tag value and set tag values if different + TAOS_ROW row = taos_fetch_row(result); + if (row != NULL) { + int numFields = taos_field_count(result); + TAOS_FIELD* fields = taos_fetch_fields(result); + int* lengths = taos_fetch_lengths(result); + for (int i = 1; i < numFields; ++i) { + uint8_t type = fields[i].type; + int32_t length = lengths[i]; + char* val = row[i]; + + TAOS_SML_KV* tagKV = tagKVs[notNullTagsIndices[i-1]]; + if (tagKV->type != type) { + tscError("child table %s tag %s type mismatch. point type : %d, db type : %d", + cTableName, tagKV->key, tagKV->type, type); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + if (memcmp(tagKV->value, val, length) != 0) { + TAOS_BIND* bind = taosArrayGet(tagBinds, tagKV->fieldSchemaIdx); + code = changeChildTableTagValue(taos, cTableName, tagKV->key, bind); + if (code != 0) { + tscError("change child table tag failed. table name %s, tag %s", cTableName, tagKV->key); + goto cleanup; + } + } + + } + } else { + code = creatChildTableIfNotExists(taos, cTableName, sTableName, sTableSchema->tags, tagBinds); + if (code != 0) { + goto cleanup; + } + } + +cleanup: + for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) { + TAOS_BIND* bind = taosArrayGet(tagBinds, i); + free(bind->length); + } + taosArrayDestroy(tagBinds); + return code; +} + +static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, char* cTableName, SArray* cTablePoints) { + int32_t code = TSDB_CODE_SUCCESS; + + size_t numCols = taosArrayGetSize(sTableSchema->fields); + size_t rows = taosArrayGetSize(cTablePoints); + SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES); - TAOS_SML_DATA_POINT * point = taosArrayGetP(cTablePoints, 0); - SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); - size_t numTags = taosArrayGetSize(sTableSchema->tags); - size_t numCols = taosArrayGetSize(sTableSchema->fields); + for (int i = 0; i < rows; ++i) { + TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, i); - SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); - taosArraySetSize(tagBinds, numTags); - for (int j = 0; j < numTags; ++j) { - TAOS_BIND* bind = taosArrayGet(tagBinds, j); + TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND)); + if (colBinds == NULL) { + tscError("taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, " + "num of rows: %zu, num of cols: %zu", rows, numCols); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + int isNullColBind = TSDB_TRUE; + for (int j = 0; j < numCols; ++j) { + TAOS_BIND* bind = colBinds + j; bind->is_null = &isNullColBind; } - for (int j = 0; j < point->tagNum; ++j) { - TAOS_SML_KV* kv = point->tags + j; - TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx); + for (int j = 0; j < point->fieldNum; ++j) { + TAOS_SML_KV* kv = point->fields + j; + TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx; bind->buffer_type = kv->type; bind->length = malloc(sizeof(uintptr_t*)); *bind->length = kv->length; bind->buffer = kv->value; bind->is_null = NULL; } + taosArrayPush(rowsBind, &colBinds); + } - size_t rows = taosArrayGetSize(cTablePoints); - SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES); - - for (int i = 0; i < rows; ++i) { - point = taosArrayGetP(cTablePoints, i); + code = insertChildTableBatch(taos, cTableName, sTableSchema->fields, rowsBind); + if (code != 0) { + tscError("insert into child table %s failed. error %s", cTableName, tstrerror(code)); + } - TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND)); - if (colBinds == NULL) { - tscError("taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, " - "num of rows: %zu, num of cols: %zu", rows, numCols); - } - for (int j = 0; j < numCols; ++j) { - TAOS_BIND* bind = colBinds + j; - bind->is_null = &isNullColBind; - } - for (int j = 0; j < point->fieldNum; ++j) { - TAOS_SML_KV* kv = point->fields + j; - TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx; - bind->buffer_type = kv->type; - bind->length = malloc(sizeof(uintptr_t*)); - *bind->length = kv->length; - bind->buffer = kv->value; - bind->is_null = NULL; - } - taosArrayPush(rowsBind, &colBinds); + for (int i = 0; i < rows; ++i) { + TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i); + for (int j = 0; j < numCols; ++j) { + TAOS_BIND* bind = colBinds + j; + free(bind->length); } + free(colBinds); + } + taosArrayDestroy(rowsBind); + return code; +} - code = creatChildTableIfNotExists(taos, point->childTableName, point->stableName, sTableSchema->tags, tagBinds); - if (code == 0) { - code = insertChildTableBatch(taos, point->childTableName, sTableSchema->fields, rowsBind); - if (code != 0) { - tscError("insert into child table %s failed. error %s", point->childTableName, tstrerror(code)); - } - } else { - tscError("Create Child Table %s failed, error %s", point->childTableName, tstrerror(code)); - } +static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas) { + int32_t code = TSDB_CODE_SUCCESS; - for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) { - TAOS_BIND* bind = taosArrayGet(tagBinds, i); - free(bind->length); - } - taosArrayDestroy(tagBinds); - for (int i = 0; i < rows; ++i) { - TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i); - for (int j = 0; j < numCols; ++j) { - TAOS_BIND* bind = colBinds + j; - free(bind->length); - } - free(colBinds); + SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas); + + SArray** pCTablePoints = taosHashIterate(cname2points, NULL); + while (pCTablePoints) { + SArray* cTablePoints = *pCTablePoints; + + TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0); + SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); + code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints); + if (code != 0) { + tscError("apply child table tags failed. child table %s, error %s", point->childTableName, tstrerror(code)); + goto cleanup; } - taosArrayDestroy(rowsBind); - taosArrayDestroy(cTablePoints); + code = applyChildTableFields(taos, sTableSchema, point->childTableName, cTablePoints); if (code != 0) { - break; + tscError("Apply child table fields failed. child table %s, error %s", point->childTableName, tstrerror(code)); + goto cleanup; } + pCTablePoints = taosHashIterate(cname2points, pCTablePoints); } +cleanup: + pCTablePoints = taosHashIterate(cname2points, NULL); + while (pCTablePoints) { + SArray* pPoints = *pCTablePoints; + taosArrayDestroy(pPoints); + } taosHashCleanup(cname2points); return code; } @@ -855,7 +984,7 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { goto clean_up; } - code = insertPoints(taos, points, numPoint, stableSchemas); + code = applyDataPoints(taos, points, numPoint, stableSchemas); if (code != 0) { tscError("error insert points : %s", tstrerror(code)); } diff --git a/tests/examples/c/apitest.c b/tests/examples/c/apitest.c index cbac1d3de3f6465121ba30b028bd7454f1d89073..37e93cdacbcf8bee7a47617f5e41c9de91ed288e 100644 --- a/tests/examples/c/apitest.c +++ b/tests/examples/c/apitest.c @@ -954,7 +954,7 @@ int32_t verify_schema_less(TAOS* taos) { result = taos_query(taos, "drop database if exists test;"); taos_free_result(result); usleep(100000); - result = taos_query(taos, "create database test precision 'us';"); + result = taos_query(taos, "create database test precision 'us' update 1;"); taos_free_result(result); usleep(100000); @@ -963,23 +963,11 @@ int32_t verify_schema_less(TAOS* taos) { taos_free_result(result); usleep(100000); - char* lines[] = { - "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns", - "st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns", - "ste,t2=5f64,t3=L\"ste\" c1=true,c2=4i64,c3=\"iam\" 1626056811823316532ns", - "st,t1=4i64,t2=5f64,t3=\"t4\" c1=3i64,c3=L\"passitagain\",c2=true,c4=5f64 1626006833642000000ns", - "ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false 1626056811843316532ns", - "ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false,c5=32i8,c6=64i16,c7=32i32,c8=88.88f32 1626056812843316532ns", - "st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns", - "stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns", - "stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641000000ns" - }; int code = 0; - code = taos_insert_lines(taos, lines , sizeof(lines)/sizeof(char*)); char* lines2[] = { - "stg,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns", - "stg,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns" + "zqlbgs,id=\"zqlbgs_39302_21680\",t0=f,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000ns", + "zqlbgs,t9=f,id=\"zqlbgs_39302_21680\",t0=f,t1=127i8,t11=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\",t10=L\"ncharTagValue\" c10=f,c0=f,c1=127i8,c12=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64,c11=L\"ncharColValue\" 1626006833639000000ns" }; code = taos_insert_lines(taos, &lines2[0], 1); code = taos_insert_lines(taos, &lines2[1], 1);