提交 3bfd1566 编写于 作者: wmmhello's avatar wmmhello

fix:[TD-22898]:modify schema if schema change

上级 e54e12ef
...@@ -817,6 +817,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { ...@@ -817,6 +817,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
if (i < pTableMeta->tableInfo.numOfColumns) { if (i < pTableMeta->tableInfo.numOfColumns) {
taosArrayPush(pColumns, &field); taosArrayPush(pColumns, &field);
} else { } else {
uError("SML:0x%" PRIx64 "field name:%s, bytes:%d", info->id, field.name, field.bytes);
taosArrayPush(pTags, &field); taosArrayPush(pTags, &field);
} }
} }
...@@ -1073,7 +1074,6 @@ void smlDestroyInfo(SSmlHandle *info) { ...@@ -1073,7 +1074,6 @@ void smlDestroyInfo(SSmlHandle *info) {
taosArrayDestroy(info->valueJsonArray); taosArrayDestroy(info->valueJsonArray);
taosArrayDestroy(info->preLineTagKV); taosArrayDestroy(info->preLineTagKV);
taosArrayDestroy(info->maxTagKVs);
taosArrayDestroy(info->preLineColKV); taosArrayDestroy(info->preLineColKV);
if (!info->dataFormat) { if (!info->dataFormat) {
...@@ -1117,7 +1117,6 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) { ...@@ -1117,7 +1117,6 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
info->tagJsonArray = taosArrayInit(8, POINTER_BYTES); info->tagJsonArray = taosArrayInit(8, POINTER_BYTES);
info->valueJsonArray = taosArrayInit(8, POINTER_BYTES); info->valueJsonArray = taosArrayInit(8, POINTER_BYTES);
info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv)); info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
info->maxTagKVs = taosArrayInit(8, sizeof(SSmlKv));
info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv)); info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv));
if (NULL == info->pVgHash || NULL == info->childTables || NULL == info->superTables) { if (NULL == info->pVgHash || NULL == info->childTables || NULL == info->superTables) {
......
...@@ -683,9 +683,6 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo ...@@ -683,9 +683,6 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo
int cnt = 0; int cnt = 0;
SArray *preLineKV = info->preLineTagKV; SArray *preLineKV = info->preLineTagKV;
SArray *maxKVs = info->maxTagKVs;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if (info->dataFormat) { if (info->dataFormat) {
if (unlikely(!isSameMeasure)) { if (unlikely(!isSameMeasure)) {
SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen); SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
...@@ -701,17 +698,15 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo ...@@ -701,17 +698,15 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo
sMeta->tableMeta = pTableMeta; sMeta->tableMeta = pTableMeta;
taosHashPut(info->superTables, elements->measure, elements->measureLen, &sMeta, POINTER_BYTES); taosHashPut(info->superTables, elements->measure, elements->measureLen, &sMeta, POINTER_BYTES);
tmp = &sMeta; tmp = &sMeta;
for(int i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++){
SSchema *tag = pTableMeta->schema + i;
SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type, .length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE };
taosArrayPush(sMeta->tags, &kv);
} }
info->currSTableMeta = (*tmp)->tableMeta;
superKV = (*tmp)->tags;
if (unlikely(taosArrayGetSize(superKV) == 0)) {
isSuperKVInit = false;
} }
taosArrayClear(maxKVs); info->currSTableMeta = (*tmp)->tableMeta;
info->maxTagKVs = (*tmp)->tags;
} }
} else {
taosArrayClear(maxKVs);
} }
taosArrayClear(preLineKV); taosArrayClear(preLineKV);
...@@ -747,58 +742,21 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo ...@@ -747,58 +742,21 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (isSameMeasure) { if (unlikely(cnt >= taosArrayGetSize(info->maxTagKVs))) {
if (unlikely(cnt >= taosArrayGetSize(maxKVs))) {
info->dataFormat = false; info->dataFormat = false;
info->reRun = true; info->reRun = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(maxKVs, cnt); SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxTagKVs, cnt);
if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length;
SSmlSTableMeta **tableMeta =
(SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
if (unlikely(NULL == tableMeta)) {
uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id);
return TSDB_CODE_SML_INTERNAL_ERROR;
}
SSmlKv *oldKV = (SSmlKv *)taosArrayGet((*tableMeta)->tags, cnt);
oldKV->length = kv.length;
info->needModifySchema = true;
}
if (unlikely(!IS_SAME_KEY)) { if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false; info->dataFormat = false;
info->reRun = true; info->reRun = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else {
if (isSuperKVInit) {
if (unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(superKV, cnt);
if (unlikely(kv.length > maxKV->length)) { if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length; maxKV->length = kv.length;
} else {
kv.length = maxKV->length;
}
info->needModifySchema = true; info->needModifySchema = true;
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
} else {
taosArrayPush(superKV, &kv);
}
taosArrayPush(maxKVs, &kv);
} }
} else {
taosArrayPush(maxKVs, &kv);
} }
taosArrayPush(preLineKV, &kv); taosArrayPush(preLineKV, &kv);
cnt++; cnt++;
......
...@@ -144,14 +144,11 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin ...@@ -144,14 +144,11 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
int cnt = 0; int cnt = 0;
SArray *preLineKV = info->preLineTagKV; SArray *preLineKV = info->preLineTagKV;
SArray *maxKVs = info->maxTagKVs;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if (info->dataFormat) { if (info->dataFormat) {
if (unlikely(!isSameMeasure)) { if (unlikely(!isSameMeasure)) {
SSmlSTableMeta **tmp = SSmlSTableMeta **tmp =
(SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen); (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
SSmlSTableMeta *sMeta = NULL;
if (unlikely(tmp == NULL)) { if (unlikely(tmp == NULL)) {
STableMeta *pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen); STableMeta *pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
if (pTableMeta == NULL) { if (pTableMeta == NULL) {
...@@ -159,21 +156,20 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin ...@@ -159,21 +156,20 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
info->reRun = true; info->reRun = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
sMeta = smlBuildSTableMeta(info->dataFormat); SSmlSTableMeta *sMeta = smlBuildSTableMeta(info->dataFormat);
sMeta->tableMeta = pTableMeta; sMeta->tableMeta = pTableMeta;
taosHashPut(info->superTables, currElement->measure, currElement->measureLen, &sMeta, POINTER_BYTES); taosHashPut(info->superTables, currElement->measure, currElement->measureLen, &sMeta, POINTER_BYTES);
tmp = &sMeta; tmp = &sMeta;
}
info->currSTableMeta = (*tmp)->tableMeta;
superKV = (*tmp)->tags;
if (unlikely(taosArrayGetSize(superKV) == 0)) { for(int i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++){
isSuperKVInit = false; SSchema *tag = pTableMeta->schema + i;
SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type, .length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE };
taosArrayPush(sMeta->tags, &kv);
} }
taosArrayClear(maxKVs);
} }
} else { info->currSTableMeta = (*tmp)->tableMeta;
taosArrayClear(maxKVs); info->maxTagKVs = (*tmp)->tags;
}
} }
taosArrayClear(preLineKV); taosArrayClear(preLineKV);
...@@ -252,58 +248,23 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin ...@@ -252,58 +248,23 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (isSameMeasure) { if (unlikely(cnt >= taosArrayGetSize(info->maxTagKVs))) {
if (unlikely(cnt >= taosArrayGetSize(maxKVs))) {
info->dataFormat = false; info->dataFormat = false;
info->reRun = true; info->reRun = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(maxKVs, cnt); SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxTagKVs, cnt);
if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length;
SSmlSTableMeta **tableMeta =
(SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
if (unlikely(NULL == tableMeta)) {
uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id);
return TSDB_CODE_SML_INTERNAL_ERROR;
}
SSmlKv *oldKV = (SSmlKv *)taosArrayGet((*tableMeta)->tags, cnt);
oldKV->length = kv.length;
info->needModifySchema = true;
}
if (unlikely(!IS_SAME_KEY)) { if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false; info->dataFormat = false;
info->reRun = true; info->reRun = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else {
if (isSuperKVInit) {
if (unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(superKV, cnt);
if (unlikely(kv.length > maxKV->length)) { if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length; maxKV->length = kv.length;
} else {
kv.length = maxKV->length;
}
info->needModifySchema = true; info->needModifySchema = true;
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
} }
} else {
taosArrayPush(superKV, &kv);
}
taosArrayPush(maxKVs, &kv);
}
} else {
taosArrayPush(maxKVs, &kv);
} }
taosArrayPush(preLineKV, &kv); taosArrayPush(preLineKV, &kv);
......
...@@ -79,13 +79,10 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS ...@@ -79,13 +79,10 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
int cnt = 0; int cnt = 0;
SArray *preLineKV = info->preLineTagKV; SArray *preLineKV = info->preLineTagKV;
SArray *maxKVs = info->maxTagKVs;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if (info->dataFormat) { if (info->dataFormat) {
if (!isSameMeasure) { if (!isSameMeasure) {
SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen); SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
SSmlSTableMeta *sMeta = NULL; SSmlSTableMeta * sMeta = NULL;
if (unlikely(tmp == NULL)) { if (unlikely(tmp == NULL)) {
STableMeta *pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen); STableMeta *pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
if (pTableMeta == NULL) { if (pTableMeta == NULL) {
...@@ -97,17 +94,15 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS ...@@ -97,17 +94,15 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
sMeta->tableMeta = pTableMeta; sMeta->tableMeta = pTableMeta;
taosHashPut(info->superTables, elements->measure, elements->measureLen, &sMeta, POINTER_BYTES); taosHashPut(info->superTables, elements->measure, elements->measureLen, &sMeta, POINTER_BYTES);
tmp = &sMeta; tmp = &sMeta;
for(int i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++){
SSchema *tag = pTableMeta->schema + i;
SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type, .length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE };
taosArrayPush(sMeta->tags, &kv);
} }
info->currSTableMeta = (*tmp)->tableMeta;
superKV = (*tmp)->tags;
if (unlikely(taosArrayGetSize(superKV) == 0)) {
isSuperKVInit = false;
} }
taosArrayClear(maxKVs); info->currSTableMeta = (*tmp)->tableMeta;
info->maxTagKVs = (*tmp)->tags;
} }
} else {
taosArrayClear(maxKVs);
} }
taosArrayClear(preLineKV); taosArrayClear(preLineKV);
...@@ -175,59 +170,21 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS ...@@ -175,59 +170,21 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
info->reRun = true; info->reRun = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (unlikely(cnt >= taosArrayGetSize(info->maxTagKVs))) {
if (isSameMeasure) {
if (unlikely(cnt >= taosArrayGetSize(maxKVs))) {
info->dataFormat = false; info->dataFormat = false;
info->reRun = true; info->reRun = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(maxKVs, cnt); SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxTagKVs, cnt);
if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length;
SSmlSTableMeta **tableMeta =
(SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
if (unlikely(NULL == tableMeta)) {
uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id);
return TSDB_CODE_SML_INTERNAL_ERROR;
}
SSmlKv *oldKV = (SSmlKv *)taosArrayGet((*tableMeta)->tags, cnt);
oldKV->length = kv.length;
info->needModifySchema = true;
}
if (unlikely(!IS_SAME_KEY)) { if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false; info->dataFormat = false;
info->reRun = true; info->reRun = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else {
if (isSuperKVInit) {
if (unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(superKV, cnt);
if (unlikely(kv.length > maxKV->length)) { if (unlikely(kv.length > maxKV->length)) {
maxKV->length = kv.length; maxKV->length = kv.length;
} else {
kv.length = maxKV->length;
}
info->needModifySchema = true; info->needModifySchema = true;
if (unlikely(!IS_SAME_KEY)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
} else {
taosArrayPush(superKV, &kv);
}
taosArrayPush(maxKVs, &kv);
} }
} else {
taosArrayPush(maxKVs, &kv);
} }
taosArrayPush(preLineKV, &kv); taosArrayPush(preLineKV, &kv);
cnt++; cnt++;
......
...@@ -957,6 +957,46 @@ int sml_ts2164_Test() { ...@@ -957,6 +957,46 @@ int sml_ts2164_Test() {
return code; return code;
} }
int sml_td22898_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes =
taos_query(taos, "CREATE DATABASE IF NOT EXISTS line_test BUFFER 384 MINROWS 1000 PAGES 256 PRECISION 'ns'");
taos_free_result(pRes);
const char *sql[] = {
"svlzxdfutx,id=nyavpjyfas,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=t,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64 1626006833639"
};
pRes = taos_query(taos, "use line_test");
taos_free_result(pRes);
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
int code = taos_errno(pRes);
taos_free_result(pRes);
const char *sql1[] = {
"svlzxdfutx,id=nyavpjyfas,t0=True,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"tgqkvsws\",t8=L\"ncharTagValue\" c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"htvnnldm\",c8=L\"ncharColValue\",c9=7u64 1626006833639"
};
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
code = taos_errno(pRes);
taos_free_result(pRes);
pRes = taos_query(taos, "select * from svlzxdfutx");
taos_free_result(pRes);
taos_close(taos);
return code;
}
int sml_td22900_Test() { int sml_td22900_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
...@@ -1157,10 +1197,12 @@ int main(int argc, char *argv[]) { ...@@ -1157,10 +1197,12 @@ int main(int argc, char *argv[]) {
// printf("str:%s \t %d\n", str[i], smlCalTypeSum(str[i], strlen(str[i]))); // printf("str:%s \t %d\n", str[i], smlCalTypeSum(str[i], strlen(str[i])));
// } // }
// int ret = 0; // int ret = 0;
// ret = sml_ttl_Test(); ret = sml_ttl_Test();
// ASSERT(!ret); ASSERT(!ret);
ret = sml_ts2164_Test(); ret = sml_ts2164_Test();
ASSERT(!ret); ASSERT(!ret);
ret = sml_td22898_Test();
ASSERT(!ret);
ret = sml_td22900_Test(); ret = sml_td22900_Test();
ASSERT(ret); ASSERT(ret);
ret = smlProcess_influx_Test(); ret = smlProcess_influx_Test();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册