提交 32b1566d 编写于 作者: S shenglian zhou

fix invalid table id during insert data points with taos_query through sql

上级 664b2a46
...@@ -387,7 +387,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf ...@@ -387,7 +387,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
} }
taos_free_result(res2); taos_free_result(res2);
taosMsleep(500); taosMsleep(5000);
} }
break; break;
} }
...@@ -412,7 +412,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf ...@@ -412,7 +412,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
} }
taos_free_result(res2); taos_free_result(res2);
taosMsleep(500); taosMsleep(5000);
} }
break; break;
} }
...@@ -434,7 +434,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf ...@@ -434,7 +434,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
} }
taos_free_result(res2); taos_free_result(res2);
taosMsleep(500); taosMsleep(5000);
} }
break; break;
} }
...@@ -456,7 +456,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf ...@@ -456,7 +456,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
} }
taos_free_result(res2); taos_free_result(res2);
taosMsleep(500); taosMsleep(5000);
} }
break; break;
} }
...@@ -498,7 +498,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf ...@@ -498,7 +498,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
} }
taos_free_result(res2); taos_free_result(res2);
taosMsleep(500); taosMsleep(5000);
} }
break; break;
} }
...@@ -795,69 +795,69 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu ...@@ -795,69 +795,69 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
size_t numTags = taosArrayGetSize(sTableSchema->tags); size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t numCols = taosArrayGetSize(sTableSchema->fields); size_t numCols = taosArrayGetSize(sTableSchema->fields);
size_t rows = taosArrayGetSize(cTablePoints); size_t rows = taosArrayGetSize(cTablePoints);
SArray* tagsSchema = sTableSchema->tags; SArray* tagsSchema = sTableSchema->tags;
SArray* colsSchema = sTableSchema->fields; SArray* colsSchema = sTableSchema->fields;
TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0}; TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
for (int i= 0; i < rows; ++i) { for (int i = 0; i < rows; ++i) {
TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i); TAOS_SML_DATA_POINT* pDataPoint = taosArrayGetP(cTablePoints, i);
for (int j = 0; j < pDataPoint->tagNum; ++j) { for (int j = 0; j < pDataPoint->tagNum; ++j) {
TAOS_SML_KV* kv = pDataPoint->tags + j; TAOS_SML_KV* kv = pDataPoint->tags + j;
tagKVs[kv->fieldSchemaIdx] = kv; tagKVs[kv->fieldSchemaIdx] = kv;
} }
} }
char* sql = malloc(tsMaxSQLStringLen+1); char* sql = malloc(tsMaxSQLStringLen + 1);
if (sql == NULL) { if (sql == NULL) {
tscError("malloc sql memory error"); tscError("malloc sql memory error");
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
int32_t freeBytes = tsMaxSQLStringLen + 1 ; int32_t freeBytes = tsMaxSQLStringLen + 1;
int32_t totalLen = 0; int32_t totalLen = 0;
totalLen += sprintf(sql, "insert into %s using %s (", cTableName, sTableName); totalLen += sprintf(sql, "insert into %s using %s (", cTableName, sTableName);
for (int i = 0; i < numTags; ++i) { for (int i = 0; i < numTags; ++i) {
SSchema* tagSchema = taosArrayGet(tagsSchema, i); SSchema* tagSchema = taosArrayGet(tagsSchema, i);
totalLen += snprintf(sql+totalLen, freeBytes-totalLen, "%s,", tagSchema->name); totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", tagSchema->name);
} }
--totalLen; --totalLen;
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ")"); totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")");
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, " tags ("); totalLen += snprintf(sql + totalLen, freeBytes - totalLen, " tags (");
// for (int i = 0; i < numTags; ++i) { // for (int i = 0; i < numTags; ++i) {
// snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); // snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
// } // }
for (int i = 0; i < numTags; ++i) { for (int i = 0; i < numTags; ++i) {
if (tagKVs[i] == NULL) { if (tagKVs[i] == NULL) {
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "NULL,"); totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,");
} else { } else {
TAOS_SML_KV* kv = tagKVs[i]; TAOS_SML_KV* kv = tagKVs[i];
size_t beforeLen = totalLen; size_t beforeLen = totalLen;
int32_t len = 0; int32_t len = 0;
converToStr(sql+beforeLen, kv->type, kv->value, kv->length, &len); converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len);
totalLen += len; totalLen += len;
totalLen += snprintf(sql+totalLen, freeBytes-totalLen, ","); totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ",");
} }
} }
--totalLen; --totalLen;
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ") ("); totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") (");
for (int i = 0; i < numCols; ++i) { for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(colsSchema, i); SSchema* colSchema = taosArrayGet(colsSchema, i);
totalLen += snprintf(sql+totalLen, freeBytes-totalLen, "%s,", colSchema->name); totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", colSchema->name);
} }
--totalLen; --totalLen;
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ") values "); totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") values ");
TAOS_SML_KV** colKVs = malloc(numCols*sizeof(TAOS_SML_KV*)); TAOS_SML_KV** colKVs = malloc(numCols * sizeof(TAOS_SML_KV*));
for (int r = 0; r < rows; ++r) { for (int r = 0; r < rows; ++r) {
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "("); totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "(");
memset(colKVs, 0, numCols*sizeof(TAOS_SML_KV*)); memset(colKVs, 0, numCols * sizeof(TAOS_SML_KV*));
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, r); TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, r);
for (int i = 0; i < point->fieldNum; ++i) { for (int i = 0; i < point->fieldNum; ++i) {
...@@ -867,28 +867,68 @@ static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableNa ...@@ -867,28 +867,68 @@ static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableNa
for (int i = 0; i < numCols; ++i) { for (int i = 0; i < numCols; ++i) {
if (colKVs[i] == NULL) { if (colKVs[i] == NULL) {
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "NULL,"); totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,");
} else { } else {
TAOS_SML_KV* kv = colKVs[i]; TAOS_SML_KV* kv = colKVs[i];
size_t beforeLen = totalLen; size_t beforeLen = totalLen;
int32_t len = 0; int32_t len = 0;
converToStr(sql+beforeLen, kv->type, kv->value, kv->length, &len); converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len);
totalLen += len; totalLen += len;
totalLen += snprintf(sql+totalLen, freeBytes-totalLen, ","); totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ",");
} }
} }
--totalLen; --totalLen;
totalLen += snprintf(sql+totalLen, freeBytes - totalLen, ")"); totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")");
} }
free(colKVs); free(colKVs);
sql[totalLen] = '\0'; sql[totalLen] = '\0';
tscDebug("SML:0x%"PRIx64" insert child table table %s of super table %s sql: %s", info->id, cTableName, sTableName, sql); tscDebug("SML:0x%" PRIx64 " insert child table table %s of super table %s sql: %s", info->id, cTableName, sTableName,
TAOS_RES* res = taos_query(taos, sql); sql);
bool tryAgain = false;
int32_t try = 0;
do {
TAOS_RES* res = taos_query(taos, sql);
code = taos_errno(res);
if (code != 0) {
tscError("SML:0x%"PRIx64 " taos_query return %d:%s", info->id, code, taos_errstr(res));
}
tscDebug("SML:0x%"PRIx64 " taos_query inserted %d rows", info->id, taos_affected_rows(res));
info->affectedRows += taos_affected_rows(res);
taos_free_result(res);
tryAgain = false;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
|| code == TSDB_CODE_VND_INVALID_VGROUP_ID
|| code == TSDB_CODE_TDB_TABLE_RECONFIGURE
|| code == TSDB_CODE_APP_NOT_READY
|| code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) {
tryAgain = true;
}
if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
int32_t code2 = taos_errno(res2);
if (code2 != TSDB_CODE_SUCCESS) {
tscError("SML:0x%" PRIx64 " insert child table by sql. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
if (tryAgain) {
taosMsleep(100 * (2 << try));
}
}
if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (tryAgain) {
taosMsleep( 100 * (2 << try));
}
}
} while (tryAgain);
free(sql); free(sql);
code = taos_errno(res);
info->affectedRows += taos_affected_rows(res);
taos_free_result(res);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册