未验证 提交 09e6ee2e 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #9174 from taosdata/szhou/hotfix/invalid-schema-after-optimization

TD-12262: fix invalid table id error after insert data points with taos_query
......@@ -795,69 +795,69 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t numCols = taosArrayGetSize(sTableSchema->fields);
size_t rows = taosArrayGetSize(cTablePoints);
size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t numCols = taosArrayGetSize(sTableSchema->fields);
size_t rows = taosArrayGetSize(cTablePoints);
SArray* tagsSchema = sTableSchema->tags;
SArray* colsSchema = sTableSchema->fields;
TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
for (int i= 0; i < rows; ++i) {
TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
for (int i = 0; i < rows; ++i) {
TAOS_SML_DATA_POINT* pDataPoint = taosArrayGetP(cTablePoints, i);
for (int j = 0; j < pDataPoint->tagNum; ++j) {
TAOS_SML_KV* kv = pDataPoint->tags + j;
tagKVs[kv->fieldSchemaIdx] = kv;
}
}
char* sql = malloc(tsMaxSQLStringLen+1);
char* sql = malloc(tsMaxSQLStringLen + 1);
if (sql == NULL) {
tscError("malloc sql memory error");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t freeBytes = tsMaxSQLStringLen + 1 ;
int32_t freeBytes = tsMaxSQLStringLen + 1;
int32_t totalLen = 0;
totalLen += sprintf(sql, "insert into %s using %s (", cTableName, sTableName);
for (int i = 0; i < numTags; ++i) {
SSchema* tagSchema = taosArrayGet(tagsSchema, i);
totalLen += snprintf(sql+totalLen, freeBytes-totalLen, "%s,", tagSchema->name);
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", tagSchema->name);
}
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ")");
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")");
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, " tags (");
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, " tags (");
// for (int i = 0; i < numTags; ++i) {
// snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
// }
// for (int i = 0; i < numTags; ++i) {
// snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
// }
for (int i = 0; i < numTags; ++i) {
if (tagKVs[i] == NULL) {
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "NULL,");
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,");
} else {
TAOS_SML_KV* kv = tagKVs[i];
size_t beforeLen = totalLen;
int32_t len = 0;
converToStr(sql+beforeLen, kv->type, kv->value, kv->length, &len);
TAOS_SML_KV* kv = tagKVs[i];
size_t beforeLen = totalLen;
int32_t len = 0;
converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len);
totalLen += len;
totalLen += snprintf(sql+totalLen, freeBytes-totalLen, ",");
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ",");
}
}
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ") (");
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") (");
for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(colsSchema, i);
totalLen += snprintf(sql+totalLen, freeBytes-totalLen, "%s,", colSchema->name);
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", colSchema->name);
}
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ") values ");
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") values ");
TAOS_SML_KV** colKVs = malloc(numCols*sizeof(TAOS_SML_KV*));
TAOS_SML_KV** colKVs = malloc(numCols * sizeof(TAOS_SML_KV*));
for (int r = 0; r < rows; ++r) {
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "(");
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "(");
memset(colKVs, 0, numCols*sizeof(TAOS_SML_KV*));
memset(colKVs, 0, numCols * sizeof(TAOS_SML_KV*));
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, r);
for (int i = 0; i < point->fieldNum; ++i) {
......@@ -867,28 +867,68 @@ static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableNa
for (int i = 0; i < numCols; ++i) {
if (colKVs[i] == NULL) {
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "NULL,");
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,");
} else {
TAOS_SML_KV* kv = colKVs[i];
size_t beforeLen = totalLen;
int32_t len = 0;
converToStr(sql+beforeLen, kv->type, kv->value, kv->length, &len);
TAOS_SML_KV* kv = colKVs[i];
size_t beforeLen = totalLen;
int32_t len = 0;
converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len);
totalLen += len;
totalLen += snprintf(sql+totalLen, freeBytes-totalLen, ",");
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ",");
}
}
--totalLen;
totalLen += snprintf(sql+totalLen, freeBytes - totalLen, ")");
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")");
}
free(colKVs);
sql[totalLen] = '\0';
tscDebug("SML:0x%"PRIx64" insert child table table %s of super table %s sql: %s", info->id, cTableName, sTableName, sql);
TAOS_RES* res = taos_query(taos, sql);
tscDebug("SML:0x%" PRIx64 " insert child table table %s of super table %s sql: %s", info->id, cTableName, sTableName,
sql);
bool tryAgain = false;
int32_t try = 0;
do {
TAOS_RES* res = taos_query(taos, sql);
code = taos_errno(res);
if (code != 0) {
tscError("SML:0x%"PRIx64 " taos_query return %d:%s", info->id, code, taos_errstr(res));
}
tscDebug("SML:0x%"PRIx64 " taos_query inserted %d rows", info->id, taos_affected_rows(res));
info->affectedRows += taos_affected_rows(res);
taos_free_result(res);
tryAgain = false;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
|| code == TSDB_CODE_VND_INVALID_VGROUP_ID
|| code == TSDB_CODE_TDB_TABLE_RECONFIGURE
|| code == TSDB_CODE_APP_NOT_READY
|| code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) {
tryAgain = true;
}
if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
int32_t code2 = taos_errno(res2);
if (code2 != TSDB_CODE_SUCCESS) {
tscError("SML:0x%" PRIx64 " insert child table by sql. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
if (tryAgain) {
taosMsleep(100 * (2 << try));
}
}
if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (tryAgain) {
taosMsleep( 100 * (2 << try));
}
}
} while (tryAgain);
free(sql);
code = taos_errno(res);
info->affectedRows += taos_affected_rows(res);
taos_free_result(res);
return code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册