diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 611038d98e46e7cc36f7b3889901ee57f28de20a..f7eafb094ee6b9900f29baa74b22b34fbb8030ea 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -794,7 +794,7 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu } static int32_t addChildTableDataPointsToInsertSql(char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, SArray* cTablePoints, - char* sql, int32_t capacity, int32_t* cTableSqlLen, SSmlLinesInfo* info) { + char* sql, int32_t capacity, int32_t* cTableSqlLen, int fromIndex, int* nextIndex, SSmlLinesInfo* info) { size_t numTags = taosArrayGetSize(sTableSchema->tags); size_t numCols = taosArrayGetSize(sTableSchema->fields); size_t rows = taosArrayGetSize(cTablePoints); @@ -845,7 +845,11 @@ static int32_t addChildTableDataPointsToInsertSql(char* cTableName, char* sTable totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") values "); TAOS_SML_KV** colKVs = malloc(numCols * sizeof(TAOS_SML_KV*)); - for (int r = 0; r < rows; ++r) { + int r = fromIndex; + for (; r < rows; ++r) { + if (freeBytes - totalLen < 1024 * 16) { + break; + } totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "("); memset(colKVs, 0, numCols * sizeof(TAOS_SML_KV*)); @@ -873,6 +877,10 @@ static int32_t addChildTableDataPointsToInsertSql(char* cTableName, char* sTable } free(colKVs); + if (r == fromIndex) { + tscError("buf can not fit one line"); + } + *nextIndex = r; *cTableSqlLen = totalLen; return 0; @@ -981,11 +989,12 @@ static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* poi info->numBatches = 0; SSmlSqlInsertBatch *batch = info->batches; batch->sql = malloc(tsMaxSQLStringLen + 1); - //TODO batch->sql allocation errror int32_t freeBytes = tsMaxSQLStringLen; int32_t usedBytes = sprintf(batch->sql, "insert into"); freeBytes -= usedBytes; + int32_t cTableSqlLen = 0; + SArray** pCTablePoints = taosHashIterate(cname2points, NULL); while (pCTablePoints) { SArray* cTablePoints = *pCTablePoints; @@ -993,37 +1002,49 @@ static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* poi TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0); SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); - tscDebug("SML:0x%"PRIx64" add child table points to SQL. child table: %s of super table %s", - info->id, point->childTableName, point->stableName); - int32_t cTableSqlLen = 0; - code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, batch->sql+usedBytes, freeBytes, &cTableSqlLen, info); - int32_t safeBound = 1024 * 24; - if (cTableSqlLen < freeBytes - safeBound) { + int32_t nextIndex = 0; + int32_t fromIndex = nextIndex; + + while (nextIndex != taosArrayGetSize(cTablePoints)) { + fromIndex = nextIndex; + code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, + batch->sql + usedBytes, freeBytes, &cTableSqlLen, fromIndex, &nextIndex, + info); + tscDebug("SML:0x%"PRIx64" add child table points to SQL. child table: %s of super table %s. range[%d-%d).", + info->id, point->childTableName, point->stableName, fromIndex, nextIndex); usedBytes += cTableSqlLen; freeBytes -= cTableSqlLen; - } else { - batch->sql[usedBytes] = '\0'; - info->numBatches++; - if (info->numBatches >= MAX_SML_SQL_INSERT_BATCHES) { - tscError("SML:0x%"PRIx64" Apply points failed. exceeds max sql insert batches", info->id); - code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto cleanup; - } + if (nextIndex != taosArrayGetSize(cTablePoints)) { + batch->sql[usedBytes] = '\0'; + info->numBatches++; + tscDebug("SML:0x%"PRIx64" sql: %s" , info->id, batch->sql); + + if (info->numBatches >= MAX_SML_SQL_INSERT_BATCHES) { + tscError("SML:0x%"PRIx64" Apply points failed. exceeds max sql insert batches", info->id); + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto cleanup; + } - batch = &info->batches[info->numBatches]; - batch->sql = malloc(tsMaxSQLStringLen + 1); - freeBytes = tsMaxSQLStringLen; - usedBytes = sprintf(batch->sql, "insert into"); - freeBytes -= usedBytes; - //TODO deal with one child table rows exceeds columns - code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, batch->sql+usedBytes, freeBytes, &cTableSqlLen, info); + batch = &info->batches[info->numBatches]; + batch->sql = malloc(tsMaxSQLStringLen + 1); + freeBytes = tsMaxSQLStringLen; + usedBytes = sprintf(batch->sql, "insert into"); + freeBytes -= usedBytes; + } } pCTablePoints = taosHashIterate(cname2points, pCTablePoints); } + usedBytes += cTableSqlLen; + freeBytes -= cTableSqlLen; batch->sql[usedBytes] = '\0'; info->numBatches++; - + tscDebug("SML:0x%"PRIx64" sql: %s" , info->id, batch->sql); + if (info->numBatches >= MAX_SML_SQL_INSERT_BATCHES) { + tscError("SML:0x%"PRIx64" Apply points failed. exceeds max sql insert batches", info->id); + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto cleanup; + } bool batchesExecuted[MAX_SML_SQL_INSERT_BATCHES] = {false}; for (int i = 0; i < info->numBatches; ++i) {