From b4eaa71dbe5c1724c5d0c735baaee42b0aced395 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 27 Aug 2021 20:25:08 +0800 Subject: [PATCH] schemaless:batch insize to avoid exceeding tsdb wal size --- src/client/src/tscParseLineProtocol.c | 106 +++++++++++++++++--------- 1 file changed, 69 insertions(+), 37 deletions(-) diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index fc665c77b6..78eea92c57 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -781,41 +781,15 @@ static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, co return code; } -static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* rowsBind, SSmlLinesInfo* info) { - size_t numCols = taosArrayGetSize(colsSchema); - char* sql = malloc(tsMaxSQLStringLen+1); - if (sql == NULL) { - tscError("malloc sql memory error"); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - int32_t freeBytes = tsMaxSQLStringLen + 1 ; - sprintf(sql, "insert into ? ("); - - for (int i = 0; i < numCols; ++i) { - SSchema* colSchema = taosArrayGet(colsSchema, i); - snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name); - } - snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values ("); - - for (int i = 0; i < numCols; ++i) { - snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); - } - snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")"); - sql[strlen(sql)] = '\0'; - - tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu", info->id, cTableName, taosArrayGetSize(rowsBind)); - +static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableName, SArray* batchBind, SSmlLinesInfo* info) { int32_t code = 0; TAOS_STMT* stmt = taos_stmt_init(taos); if (stmt == NULL) { - tfree(sql); return TSDB_CODE_TSC_OUT_OF_MEMORY; } code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql)); - tfree(sql); if (code != 0) { tscError("SML:0x%"PRIx64" taos_stmt_prepare return %d:%s", info->id, code, tstrerror(code)); @@ -833,9 +807,9 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols return code; } - size_t rows = taosArrayGetSize(rowsBind); + size_t rows = taosArrayGetSize(batchBind); for (int32_t i = 0; i < rows; ++i) { - TAOS_BIND* colsBinds = taosArrayGetP(rowsBind, i); + TAOS_BIND* colsBinds = taosArrayGetP(batchBind, i); code = taos_stmt_bind_param(stmt, colsBinds); if (code != 0) { tscError("SML:0x%"PRIx64" taos_stmt_bind_param return %d:%s", info->id, code, tstrerror(code)); @@ -857,10 +831,10 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols tryAgain = false; if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID - || code == TSDB_CODE_VND_INVALID_VGROUP_ID - || code == TSDB_CODE_TDB_TABLE_RECONFIGURE - || code == TSDB_CODE_APP_NOT_READY - || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) { + || code == TSDB_CODE_VND_INVALID_VGROUP_ID + || code == TSDB_CODE_TDB_TABLE_RECONFIGURE + || code == TSDB_CODE_APP_NOT_READY + || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) { tryAgain = true; } @@ -887,6 +861,57 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols return code; } +static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* rowsBind, size_t rowSize, SSmlLinesInfo* info) { + size_t numCols = taosArrayGetSize(colsSchema); + char* sql = malloc(tsMaxSQLStringLen+1); + if (sql == NULL) { + tscError("malloc sql memory error"); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + int32_t freeBytes = tsMaxSQLStringLen + 1 ; + sprintf(sql, "insert into ? ("); + + for (int i = 0; i < numCols; ++i) { + SSchema* colSchema = taosArrayGet(colsSchema, i); + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name); + } + snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values ("); + + for (int i = 0; i < numCols; ++i) { + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); + } + snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")"); + sql[strlen(sql)] = '\0'; + + tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu", info->id, cTableName, taosArrayGetSize(rowsBind)); + + size_t rows = taosArrayGetSize(rowsBind); + int32_t maxBatchSize = TSDB_MAX_WAL_SIZE/rowSize * 4 / 5; + int32_t batchSize = MIN(maxBatchSize, rows); + SArray* batchBind = taosArrayInit(batchSize, POINTER_BYTES); + int32_t code = TSDB_CODE_SUCCESS; + for (int i=0; i=i) { + code = doInsertChildTableWithStmt(taos, sql, cTableName, batchBind, info); + if (code != 0) { + taosArrayDestroy(batchBind); + tfree(sql); + return code; + } + taosArrayClear(batchBind); + } + i = j; + } + taosArrayDestroy(batchBind); + tfree(sql); + return code; +} + static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints, SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) { for (int32_t i = 0; i < numPoints; ++i) { @@ -977,7 +1002,7 @@ static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableNam } static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, char* cTableName, - SArray* cTablePoints, SSmlLinesInfo* info) { + SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { int32_t code = TSDB_CODE_SUCCESS; size_t numCols = taosArrayGetSize(sTableSchema->fields); @@ -1014,7 +1039,7 @@ static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, taosArrayPush(rowsBind, &colBinds); } - code = insertChildTableBatch(taos, cTableName, sTableSchema->fields, rowsBind, info); + code = insertChildTableBatch(taos, cTableName, sTableSchema->fields, rowsBind, rowSize, info); if (code != 0) { tscError("SML:0x%"PRIx64" insert into child table %s failed. error %s", info->id, cTableName, tstrerror(code)); } @@ -1054,8 +1079,15 @@ static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t goto cleanup; } - tscDebug("SML:0x%"PRIx64" apply child table points. child table: %s", info->id, point->childTableName); - code = applyChildTableFields(taos, sTableSchema, point->childTableName, cTablePoints, info); + size_t rowSize = 0; + size_t numCols = taosArrayGetSize(sTableSchema->fields); + for (int i = 0; i < numCols; ++i) { + SSchema* colSchema = taosArrayGet(sTableSchema->fields, i); + rowSize += colSchema->bytes; + } + + tscDebug("SML:0x%"PRIx64" apply child table points. child table: %s, row size: %zu", info->id, point->childTableName, rowSize); + code = applyChildTableFields(taos, sTableSchema, point->childTableName, cTablePoints, rowSize, info); if (code != 0) { tscError("SML:0x%"PRIx64" Apply child table fields failed. child table %s, error %s", info->id, point->childTableName, tstrerror(code)); goto cleanup; -- GitLab