未验证 提交 f17d7c8b 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #7011 from taosdata/hotfix/td-5478

[TD-5478]<fix>:modify tag value of existing child table in line protocol processing
...@@ -474,7 +474,7 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { ...@@ -474,7 +474,7 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) {
return code; return code;
} }
static int32_t reconcileDBSchemas(TAOS* taos, SArray* stableSchemas) { static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas) {
int32_t code = 0; int32_t code = 0;
size_t numStable = taosArrayGetSize(stableSchemas); size_t numStable = taosArrayGetSize(stableSchemas);
for (int i = 0; i < numStable; ++i) { for (int i = 0; i < numStable; ++i) {
...@@ -570,6 +570,40 @@ static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableNa ...@@ -570,6 +570,40 @@ static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableNa
return 0; return 0;
} }
static int32_t changeChildTableTagValue(TAOS* taos, const char* cTableName, const char* tagName, TAOS_BIND* bind) {
char sql[512];
sprintf(sql, "alter table %s set tag %s=?", cTableName, tagName);
int32_t code;
TAOS_STMT* stmt = taos_stmt_init(taos);
code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
return code;
}
code = taos_stmt_bind_param(stmt, bind);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
return code;
}
code = taos_stmt_execute(stmt);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
return code;
}
code = taos_stmt_close(stmt);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
return code;
}
return code;
}
static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName, SArray* tagsSchema, SArray* tagsBind) { static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName, SArray* tagsSchema, SArray* tagsBind) {
size_t numTags = taosArrayGetSize(tagsSchema); size_t numTags = taosArrayGetSize(tagsSchema);
char* sql = malloc(tsMaxSQLStringLen+1); char* sql = malloc(tsMaxSQLStringLen+1);
...@@ -657,7 +691,6 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols ...@@ -657,7 +691,6 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols
} }
do { do {
code = taos_stmt_set_tbname(stmt, cTableName); code = taos_stmt_set_tbname(stmt, cTableName);
if (code != 0) { if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt)); tscError("%s", taos_stmt_errstr(stmt));
...@@ -742,97 +775,200 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu ...@@ -742,97 +775,200 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
return 0; return 0;
} }
static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas) { static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableName,
int32_t code = TSDB_CODE_SUCCESS; SSmlSTableSchema* sTableSchema, SArray* cTablePoints) {
size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t rows = taosArrayGetSize(cTablePoints);
SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
true, false); for (int i= 0; i < rows; ++i) {
arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas); TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
for (int j = 0; j < pDataPoint->tagNum; ++j) {
TAOS_SML_KV* kv = pDataPoint->tags + j;
tagKVs[kv->fieldSchemaIdx] = kv;
}
}
int32_t notNullTagsIndices[TSDB_MAX_TAGS] = {0};
int32_t numNotNullTags = 0;
for (int32_t i = 0; i < numTags; ++i) {
if (tagKVs[i] != NULL) {
notNullTagsIndices[numNotNullTags] = i;
++numNotNullTags;
}
}
SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
taosArraySetSize(tagBinds, numTags);
int isNullColBind = TSDB_TRUE; int isNullColBind = TSDB_TRUE;
SArray** pCTablePoints = taosHashIterate(cname2points, NULL); for (int j = 0; j < numTags; ++j) {
while (pCTablePoints) { TAOS_BIND* bind = taosArrayGet(tagBinds, j);
SArray* cTablePoints = *pCTablePoints; bind->is_null = &isNullColBind;
}
for (int j = 0; j < numTags; ++j) {
if (tagKVs[j] == NULL) continue;
TAOS_SML_KV* kv = tagKVs[j];
TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
bind->buffer = kv->value;
bind->is_null = NULL;
}
// select tag1,tag2,... from stable where tbname in (ctable)
char* sql = malloc(tsMaxSQLStringLen+1);
int freeBytes = tsMaxSQLStringLen + 1;
snprintf(sql, freeBytes, "select tbname, ");
for (int i = 0; i < numNotNullTags ; ++i) {
snprintf(sql + strlen(sql), freeBytes-strlen(sql), "%s,", tagKVs[notNullTagsIndices[i]]->key);
}
snprintf(sql + strlen(sql) - 1, freeBytes - strlen(sql) + 1,
" from %s where tbname in (\'%s\')", sTableName, cTableName);
sql[strlen(sql)] = '\0';
TAOS_RES* result = taos_query(taos, sql);
free(sql);
int32_t code = taos_errno(result);
if (code != 0) {
tscError("get child table %s tags failed. error string %s", cTableName, taos_errstr(result));
goto cleanup;
}
// check tag value and set tag values if different
TAOS_ROW row = taos_fetch_row(result);
if (row != NULL) {
int numFields = taos_field_count(result);
TAOS_FIELD* fields = taos_fetch_fields(result);
int* lengths = taos_fetch_lengths(result);
for (int i = 1; i < numFields; ++i) {
uint8_t dbType = fields[i].type;
int32_t length = lengths[i];
char* val = row[i];
TAOS_SML_KV* tagKV = tagKVs[notNullTagsIndices[i-1]];
if (tagKV->type != dbType) {
tscError("child table %s tag %s type mismatch. point type : %d, db type : %d",
cTableName, tagKV->key, tagKV->type, dbType);
return TSDB_CODE_TSC_INVALID_VALUE;
}
assert(tagKV->value);
TAOS_SML_DATA_POINT * point = taosArrayGetP(cTablePoints, 0); if (val == NULL || length != tagKV->length || memcmp(tagKV->value, val, length) != 0) {
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); TAOS_BIND* bind = taosArrayGet(tagBinds, tagKV->fieldSchemaIdx);
size_t numTags = taosArrayGetSize(sTableSchema->tags); code = changeChildTableTagValue(taos, cTableName, tagKV->key, bind);
size_t numCols = taosArrayGetSize(sTableSchema->fields); if (code != 0) {
tscError("change child table tag failed. table name %s, tag %s", cTableName, tagKV->key);
goto cleanup;
}
}
}
tscDebug("successfully applied point tags. child table: %s", cTableName);
} else {
code = creatChildTableIfNotExists(taos, cTableName, sTableName, sTableSchema->tags, tagBinds);
if (code != 0) {
goto cleanup;
}
}
cleanup:
taos_free_result(result);
for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
TAOS_BIND* bind = taosArrayGet(tagBinds, i);
free(bind->length);
}
taosArrayDestroy(tagBinds);
return code;
}
static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, char* cTableName, SArray* cTablePoints) {
int32_t code = TSDB_CODE_SUCCESS;
SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); size_t numCols = taosArrayGetSize(sTableSchema->fields);
taosArraySetSize(tagBinds, numTags); size_t rows = taosArrayGetSize(cTablePoints);
for (int j = 0; j < numTags; ++j) { SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES);
TAOS_BIND* bind = taosArrayGet(tagBinds, j);
for (int i = 0; i < rows; ++i) {
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, i);
TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND));
if (colBinds == NULL) {
tscError("taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, "
"num of rows: %zu, num of cols: %zu", rows, numCols);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int isNullColBind = TSDB_TRUE;
for (int j = 0; j < numCols; ++j) {
TAOS_BIND* bind = colBinds + j;
bind->is_null = &isNullColBind; bind->is_null = &isNullColBind;
} }
for (int j = 0; j < point->tagNum; ++j) { for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* kv = point->tags + j; TAOS_SML_KV* kv = point->fields + j;
TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx); TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
bind->buffer_type = kv->type; bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*)); bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length; *bind->length = kv->length;
bind->buffer = kv->value; bind->buffer = kv->value;
bind->is_null = NULL; bind->is_null = NULL;
} }
taosArrayPush(rowsBind, &colBinds);
}
size_t rows = taosArrayGetSize(cTablePoints); code = insertChildTableBatch(taos, cTableName, sTableSchema->fields, rowsBind);
SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES); if (code != 0) {
tscError("insert into child table %s failed. error %s", cTableName, tstrerror(code));
for (int i = 0; i < rows; ++i) { }
point = taosArrayGetP(cTablePoints, i);
TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND)); for (int i = 0; i < rows; ++i) {
if (colBinds == NULL) { TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
tscError("taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, " for (int j = 0; j < numCols; ++j) {
"num of rows: %zu, num of cols: %zu", rows, numCols); TAOS_BIND* bind = colBinds + j;
} free(bind->length);
for (int j = 0; j < numCols; ++j) {
TAOS_BIND* bind = colBinds + j;
bind->is_null = &isNullColBind;
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* kv = point->fields + j;
TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
bind->buffer = kv->value;
bind->is_null = NULL;
}
taosArrayPush(rowsBind, &colBinds);
} }
free(colBinds);
}
taosArrayDestroy(rowsBind);
return code;
}
code = creatChildTableIfNotExists(taos, point->childTableName, point->stableName, sTableSchema->tags, tagBinds); static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas) {
if (code == 0) { int32_t code = TSDB_CODE_SUCCESS;
code = insertChildTableBatch(taos, point->childTableName, sTableSchema->fields, rowsBind);
if (code != 0) {
tscError("insert into child table %s failed. error %s", point->childTableName, tstrerror(code));
}
} else {
tscError("Create Child Table %s failed, error %s", point->childTableName, tstrerror(code));
}
for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) { SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
TAOS_BIND* bind = taosArrayGet(tagBinds, i); arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas);
free(bind->length);
} SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
taosArrayDestroy(tagBinds); while (pCTablePoints) {
for (int i = 0; i < rows; ++i) { SArray* cTablePoints = *pCTablePoints;
TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
for (int j = 0; j < numCols; ++j) { TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
TAOS_BIND* bind = colBinds + j; SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
free(bind->length); code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints);
} if (code != 0) {
free(colBinds); tscError("apply child table tags failed. child table %s, error %s", point->childTableName, tstrerror(code));
goto cleanup;
} }
taosArrayDestroy(rowsBind); code = applyChildTableFields(taos, sTableSchema, point->childTableName, cTablePoints);
taosArrayDestroy(cTablePoints);
if (code != 0) { if (code != 0) {
break; tscError("Apply child table fields failed. child table %s, error %s", point->childTableName, tstrerror(code));
goto cleanup;
} }
tscDebug("successfully applied data points of child table %s", point->childTableName);
pCTablePoints = taosHashIterate(cname2points, pCTablePoints); pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
} }
cleanup:
pCTablePoints = taosHashIterate(cname2points, NULL);
while (pCTablePoints) {
SArray* pPoints = *pCTablePoints;
taosArrayDestroy(pPoints);
pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
}
taosHashCleanup(cname2points); taosHashCleanup(cname2points);
return code; return code;
} }
...@@ -849,15 +985,15 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { ...@@ -849,15 +985,15 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
goto clean_up; goto clean_up;
} }
code = reconcileDBSchemas(taos, stableSchemas); code = modifyDBSchemas(taos, stableSchemas);
if (code != 0) { if (code != 0) {
tscError("error change db schema : %s", tstrerror(code)); tscError("error change db schema : %s", tstrerror(code));
goto clean_up; goto clean_up;
} }
code = insertPoints(taos, points, numPoint, stableSchemas); code = applyDataPoints(taos, points, numPoint, stableSchemas);
if (code != 0) { if (code != 0) {
tscError("error insert points : %s", tstrerror(code)); tscError("error apply data points : %s", tstrerror(code));
} }
clean_up: clean_up:
......
...@@ -954,7 +954,7 @@ int32_t verify_schema_less(TAOS* taos) { ...@@ -954,7 +954,7 @@ int32_t verify_schema_less(TAOS* taos) {
result = taos_query(taos, "drop database if exists test;"); result = taos_query(taos, "drop database if exists test;");
taos_free_result(result); taos_free_result(result);
usleep(100000); usleep(100000);
result = taos_query(taos, "create database test precision 'us';"); result = taos_query(taos, "create database test precision 'us' update 1;");
taos_free_result(result); taos_free_result(result);
usleep(100000); usleep(100000);
...@@ -963,6 +963,8 @@ int32_t verify_schema_less(TAOS* taos) { ...@@ -963,6 +963,8 @@ int32_t verify_schema_less(TAOS* taos) {
taos_free_result(result); taos_free_result(result);
usleep(100000); usleep(100000);
int code = 0;
char* lines[] = { char* lines[] = {
"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns", "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns", "st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns",
...@@ -975,8 +977,8 @@ int32_t verify_schema_less(TAOS* taos) { ...@@ -975,8 +977,8 @@ int32_t verify_schema_less(TAOS* taos) {
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641000000ns" "stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641000000ns"
}; };
int code = 0;
code = taos_insert_lines(taos, lines , sizeof(lines)/sizeof(char*)); code = taos_insert_lines(taos, lines , sizeof(lines)/sizeof(char*));
char* lines2[] = { char* lines2[] = {
"stg,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns", "stg,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"stg,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns" "stg,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns"
...@@ -995,7 +997,21 @@ int32_t verify_schema_less(TAOS* taos) { ...@@ -995,7 +997,21 @@ int32_t verify_schema_less(TAOS* taos) {
"dgtyqodr,t2=5f64,t3=L\"ste\" c1=tRue,c2=4i64,c3=\"iam\" 1626056811823316532ns" "dgtyqodr,t2=5f64,t3=L\"ste\" c1=tRue,c2=4i64,c3=\"iam\" 1626056811823316532ns"
}; };
code = taos_insert_lines(taos, lines4, 2); code = taos_insert_lines(taos, lines4, 2);
return code;
char* lines5[] = {
"zqlbgs,id=\"zqlbgs_39302_21680\",t0=f,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000ns",
"zqlbgs,t9=f,id=\"zqlbgs_39302_21680\",t0=f,t1=127i8,t11=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\",t10=L\"ncharTagValue\" c10=f,c0=f,c1=127i8,c12=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64,c11=L\"ncharColValue\" 1626006833639000000ns"
};
code = taos_insert_lines(taos, &lines5[0], 1);
code = taos_insert_lines(taos, &lines5[1], 1);
char* lines6[] = {
"st123456,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"dgtyqodr,t2=5f64,t3=L\"ste\" c1=tRue,c2=4i64,c3=\"iam\" 1626056811823316532ns"
};
code = taos_insert_lines(taos, lines6, 2);
return (code);
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
......
...@@ -9,8 +9,8 @@ ...@@ -9,8 +9,8 @@
#include <unistd.h> #include <unistd.h>
int numSuperTables = 8; int numSuperTables = 8;
int numChildTables = 1024; int numChildTables = 4;
int numRowsPerChildTable = 128; int numRowsPerChildTable = 2048;
void shuffle(char**lines, size_t n) void shuffle(char**lines, size_t n)
{ {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册