diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 60e4e2334cc466fa62cae9ba84fcf30dc3fdddf4..3488bdbc78a76c18154029c387f62b6645c17652 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -792,6 +792,214 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu return 0; } +static int32_t addChildTableDataPointsToInsertSql(char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, SArray* cTablePoints, + char* sql, int32_t capacity, int32_t* cTableSqlLen, SSmlLinesInfo* info) { + size_t numTags = taosArrayGetSize(sTableSchema->tags); + size_t numCols = taosArrayGetSize(sTableSchema->fields); + size_t rows = taosArrayGetSize(cTablePoints); + SArray* tagsSchema = sTableSchema->tags; + SArray* colsSchema = sTableSchema->fields; + + TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0}; + for (int i = 0; i < rows; ++i) { + TAOS_SML_DATA_POINT* pDataPoint = taosArrayGetP(cTablePoints, i); + for (int j = 0; j < pDataPoint->tagNum; ++j) { + TAOS_SML_KV* kv = pDataPoint->tags + j; + tagKVs[kv->fieldSchemaIdx] = kv; + } + } + + int32_t freeBytes = capacity; + int32_t totalLen = 0; + totalLen += sprintf(sql, " %s using %s (", cTableName, sTableName); + for (int i = 0; i < numTags; ++i) { + SSchema* tagSchema = taosArrayGet(tagsSchema, i); + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", tagSchema->name); + } + --totalLen; + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")"); + + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, " tags ("); + + for (int i = 0; i < numTags; ++i) { + if (tagKVs[i] == NULL) { + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,"); + } else { + TAOS_SML_KV* kv = tagKVs[i]; + size_t beforeLen = totalLen; + int32_t len = 0; + converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len); + totalLen += len; + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ","); + } + } + --totalLen; + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") ("); + + for (int i = 0; i < numCols; ++i) { + SSchema* colSchema = taosArrayGet(colsSchema, i); + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", colSchema->name); + } + --totalLen; + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") values "); + + TAOS_SML_KV** colKVs = malloc(numCols * sizeof(TAOS_SML_KV*)); + for (int r = 0; r < rows; ++r) { + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "("); + + memset(colKVs, 0, numCols * sizeof(TAOS_SML_KV*)); + + TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, r); + for (int i = 0; i < point->fieldNum; ++i) { + TAOS_SML_KV* kv = point->fields + i; + colKVs[kv->fieldSchemaIdx] = kv; + } + + for (int i = 0; i < numCols; ++i) { + if (colKVs[i] == NULL) { + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,"); + } else { + TAOS_SML_KV* kv = colKVs[i]; + size_t beforeLen = totalLen; + int32_t len = 0; + converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len); + totalLen += len; + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ","); + } + } + --totalLen; + totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")"); + } + free(colKVs); + + *cTableSqlLen = totalLen; + + return 0; +} + +static int32_t doRunInsertSQL(TAOS* taos, char* sql, SSmlLinesInfo* info) { + int32_t code = 0; + bool tryAgain = false; + int32_t try = 0; + do { + TAOS_RES* res = taos_query(taos, sql); + code = taos_errno(res); + if (code != 0) { + tscError("SML:0x%"PRIx64 " taos_query return %d:%s", info->id, code, taos_errstr(res)); + } + + tscDebug("SML:0x%"PRIx64 " taos_query inserted %d rows", info->id, taos_affected_rows(res)); + info->affectedRows += taos_affected_rows(res); + taos_free_result(res); + + tryAgain = false; + if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID + || code == TSDB_CODE_VND_INVALID_VGROUP_ID + || code == TSDB_CODE_TDB_TABLE_RECONFIGURE + || code == TSDB_CODE_APP_NOT_READY + || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) { + tryAgain = true; + } + + if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) { + TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE"); + int32_t code2 = taos_errno(res2); + if (code2 != TSDB_CODE_SUCCESS) { + tscError("SML:0x%" PRIx64 " insert child table by sql. reset query cache. error: %s", info->id, taos_errstr(res2)); + } + taos_free_result(res2); + if (tryAgain) { + taosMsleep(100 * (2 << try)); + } + } + + if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + if (tryAgain) { + taosMsleep( 100 * (2 << try)); + } + } + } while (tryAgain); + + free(sql); + + return code; + +} + +static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) { + int32_t code = TSDB_CODE_SUCCESS; + + SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas, info); + + char* sql = malloc(tsMaxSQLStringLen + 1); + if (sql == NULL) { + tscError("malloc sql memory error"); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + int32_t freeBytes = tsMaxSQLStringLen; + int32_t usedBytes = 0; + usedBytes += sprintf(sql, "insert into"); + freeBytes -= usedBytes; + + SArray** pCTablePoints = taosHashIterate(cname2points, NULL); + while (pCTablePoints) { + SArray* cTablePoints = *pCTablePoints; + + TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0); + SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); + + size_t rowSize = 0; + size_t numCols = taosArrayGetSize(sTableSchema->fields); + for (int i = 0; i < numCols; ++i) { + SSchema* colSchema = taosArrayGet(sTableSchema->fields, i); + rowSize += colSchema->bytes; + } + + tscDebug("SML:0x%"PRIx64" add child table points to SQL. child table: %s of super table %s, row size: %zu", + info->id, point->childTableName, point->stableName, rowSize); + int32_t cTableSqlLen = 0; + code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, sql+usedBytes, freeBytes, &cTableSqlLen, info); + if (cTableSqlLen < freeBytes) { + usedBytes += cTableSqlLen; + freeBytes -= cTableSqlLen; + } else { + sql[usedBytes] = '\0'; + code = doRunInsertSQL(taos, sql, info); + if (code != 0) { + tscError("SML:0x%"PRIx64" Apply points failed. sql: %s, error: %s", info->id, sql, tstrerror(code)); + goto cleanup; + } + freeBytes = tsMaxSQLStringLen; + usedBytes = 0; + usedBytes += sprintf(sql, "insert into"); + freeBytes -= usedBytes; + //TODO deal with one child table rows exceeds columns + code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, sql+usedBytes, freeBytes, &cTableSqlLen, info); + } + + pCTablePoints = taosHashIterate(cname2points, pCTablePoints); + } + + sql[usedBytes] = '\0'; + code = doRunInsertSQL(taos, sql, info); + if (code != 0) { + tscError("SML:0x%"PRIx64" Apply points failed. sql: %s, error: %s", info->id, sql, tstrerror(code)); + goto cleanup; + } + tscDebug("SML:0x%"PRIx64" successfully applied data points", info->id); + +cleanup: + pCTablePoints = taosHashIterate(cname2points, NULL); + while (pCTablePoints) { + SArray* pPoints = *pCTablePoints; + taosArrayDestroy(&pPoints); + pCTablePoints = taosHashIterate(cname2points, pCTablePoints); + } + taosHashCleanup(cname2points); + return code; +} + static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { int32_t code = TSDB_CODE_SUCCESS; @@ -1328,7 +1536,12 @@ int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLine } tscDebug("SML:0x%"PRIx64" apply data points", info->id); - code = applyDataPoints(taos, points, numPoint, stableSchemas, info); + bool tableByTable = false; + if (tableByTable) { + code = applyDataPoints(taos, points, numPoint, stableSchemas, info); + } else { + code = applyDataPointsWithSqlInsert(taos, points, numPoint, stableSchemas, info); + } if (code != 0) { tscError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code)); }