diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 085b949cc102e3752abd41caacdf07985351e1db..0d43e101754e9ae3eb33440005790f238f6664a5 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -795,69 +795,69 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { int32_t code = TSDB_CODE_SUCCESS; - size_t numTags = taosArrayGetSize(sTableSchema->tags); - size_t numCols = taosArrayGetSize(sTableSchema->fields); - size_t rows = taosArrayGetSize(cTablePoints); + size_t numTags = taosArrayGetSize(sTableSchema->tags); + size_t numCols = taosArrayGetSize(sTableSchema->fields); + size_t rows = taosArrayGetSize(cTablePoints); SArray* tagsSchema = sTableSchema->tags; SArray* colsSchema = sTableSchema->fields; TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0}; - for (int i= 0; i < rows; ++i) { - TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i); + for (int i = 0; i < rows; ++i) { + TAOS_SML_DATA_POINT* pDataPoint = taosArrayGetP(cTablePoints, i); for (int j = 0; j < pDataPoint->tagNum; ++j) { TAOS_SML_KV* kv = pDataPoint->tags + j; tagKVs[kv->fieldSchemaIdx] = kv; } } - char* sql = malloc(tsMaxSQLStringLen+1); + char* sql = malloc(tsMaxSQLStringLen + 1); if (sql == NULL) { tscError("malloc sql memory error"); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - int32_t freeBytes = tsMaxSQLStringLen + 1 ; + int32_t freeBytes = tsMaxSQLStringLen + 1; int32_t totalLen = 0; totalLen += sprintf(sql, "insert into %s using %s (", cTableName, sTableName); for (int i = 0; i < numTags; ++i) { SSchema* tagSchema = taosArrayGet(tagsSchema, i); - totalLen += snprintf(sql+totalLen, freeBytes-totalLen, "%s,", tagSchema->name); + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", tagSchema->name); } --totalLen; - totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ")"); + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")"); - totalLen += snprintf(sql + totalLen, freeBytes-totalLen, " tags ("); + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, " tags ("); -// for (int i = 0; i < numTags; ++i) { -// snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); -// } + // for (int i = 0; i < numTags; ++i) { + // snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); + // } for (int i = 0; i < numTags; ++i) { if (tagKVs[i] == NULL) { - totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "NULL,"); + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,"); } else { - TAOS_SML_KV* kv = tagKVs[i]; - size_t beforeLen = totalLen; - int32_t len = 0; - converToStr(sql+beforeLen, kv->type, kv->value, kv->length, &len); + TAOS_SML_KV* kv = tagKVs[i]; + size_t beforeLen = totalLen; + int32_t len = 0; + converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len); totalLen += len; - totalLen += snprintf(sql+totalLen, freeBytes-totalLen, ","); + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ","); } } --totalLen; - totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ") ("); + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") ("); for (int i = 0; i < numCols; ++i) { SSchema* colSchema = taosArrayGet(colsSchema, i); - totalLen += snprintf(sql+totalLen, freeBytes-totalLen, "%s,", colSchema->name); + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", colSchema->name); } --totalLen; - totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ") values "); + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") values "); - TAOS_SML_KV** colKVs = malloc(numCols*sizeof(TAOS_SML_KV*)); + TAOS_SML_KV** colKVs = malloc(numCols * sizeof(TAOS_SML_KV*)); for (int r = 0; r < rows; ++r) { - totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "("); + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "("); - memset(colKVs, 0, numCols*sizeof(TAOS_SML_KV*)); + memset(colKVs, 0, numCols * sizeof(TAOS_SML_KV*)); TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, r); for (int i = 0; i < point->fieldNum; ++i) { @@ -867,28 +867,68 @@ static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableNa for (int i = 0; i < numCols; ++i) { if (colKVs[i] == NULL) { - totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "NULL,"); + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,"); } else { - TAOS_SML_KV* kv = colKVs[i]; - size_t beforeLen = totalLen; - int32_t len = 0; - converToStr(sql+beforeLen, kv->type, kv->value, kv->length, &len); + TAOS_SML_KV* kv = colKVs[i]; + size_t beforeLen = totalLen; + int32_t len = 0; + converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len); totalLen += len; - totalLen += snprintf(sql+totalLen, freeBytes-totalLen, ","); + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ","); } } --totalLen; - totalLen += snprintf(sql+totalLen, freeBytes - totalLen, ")"); + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")"); } free(colKVs); sql[totalLen] = '\0'; - tscDebug("SML:0x%"PRIx64" insert child table table %s of super table %s sql: %s", info->id, cTableName, sTableName, sql); - TAOS_RES* res = taos_query(taos, sql); + tscDebug("SML:0x%" PRIx64 " insert child table table %s of super table %s sql: %s", info->id, cTableName, sTableName, + sql); + + bool tryAgain = false; + int32_t try = 0; + do { + TAOS_RES* res = taos_query(taos, sql); + code = taos_errno(res); + if (code != 0) { + tscError("SML:0x%"PRIx64 " taos_query return %d:%s", info->id, code, taos_errstr(res)); + } + + tscDebug("SML:0x%"PRIx64 " taos_query inserted %d rows", info->id, taos_affected_rows(res)); + info->affectedRows += taos_affected_rows(res); + taos_free_result(res); + + tryAgain = false; + if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID + || code == TSDB_CODE_VND_INVALID_VGROUP_ID + || code == TSDB_CODE_TDB_TABLE_RECONFIGURE + || code == TSDB_CODE_APP_NOT_READY + || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) { + tryAgain = true; + } + + if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) { + TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE"); + int32_t code2 = taos_errno(res2); + if (code2 != TSDB_CODE_SUCCESS) { + tscError("SML:0x%" PRIx64 " insert child table by sql. reset query cache. error: %s", info->id, taos_errstr(res2)); + } + taos_free_result(res2); + if (tryAgain) { + taosMsleep(100 * (2 << try)); + } + } + + if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + if (tryAgain) { + taosMsleep( 100 * (2 << try)); + } + } + } while (tryAgain); + free(sql); - code = taos_errno(res); - info->affectedRows += taos_affected_rows(res); - taos_free_result(res); + return code; }