From ba68a1bc0efb88e2a949d20c8338ccdb34707711 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 2 Dec 2021 11:28:49 +0800 Subject: [PATCH] use taos_query instead of taos_stmt for child table batch less than 10 rows --- src/client/src/tscParseLineProtocol.c | 104 +++++++++++++++++++++++++- 1 file changed, 103 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index a216fb631e..4fdccfac6f 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -750,7 +750,96 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu return 0; } -static int32_t applyChildTableDataPoints(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, +static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, + SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { + int32_t code = TSDB_CODE_SUCCESS; + size_t numTags = taosArrayGetSize(sTableSchema->tags); + size_t numCols = taosArrayGetSize(sTableSchema->fields); + size_t rows = taosArrayGetSize(cTablePoints); + SArray* tagsSchema = sTableSchema->tags; + SArray* colsSchema = sTableSchema->fields; + + TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0}; + for (int i= 0; i < rows; ++i) { + TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i); + for (int j = 0; j < pDataPoint->tagNum; ++j) { + TAOS_SML_KV* kv = pDataPoint->tags + j; + tagKVs[kv->fieldSchemaIdx] = kv; + } + } + + char* sql = malloc(tsMaxSQLStringLen+1); + if (sql == NULL) { + tscError("malloc sql memory error"); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + int32_t freeBytes = tsMaxSQLStringLen + 1 ; + sprintf(sql, "insert into ? using %s (", sTableName); + for (int i = 0; i < numTags; ++i) { + SSchema* tagSchema = taosArrayGet(tagsSchema, i); + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name); + } + snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")"); + + snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags ("); + +// for (int i = 0; i < numTags; ++i) { +// snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); +// } + for (int i = 0; i < numTags; ++i) { + if (tagKVs[i] == NULL) { + snprintf(sql + strlen(sql), freeBytes-strlen(sql), "NULL,"); + } else { + TAOS_SML_KV* kv = tagKVs[i]; + int32_t len = 0; + converToStr(sql+strlen(sql), kv->type, kv->value, kv->length, &len); + *(sql+strlen(sql)+len)='\0'; + } + } + snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ") ("); + + for (int i = 0; i < numCols; ++i) { + SSchema* colSchema = taosArrayGet(colsSchema, i); + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name); + } + snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values "); + + TAOS_SML_KV** colKVs = malloc(numCols*sizeof(TAOS_SML_KV*)); + for (int r = 0; r < rows; ++r) { + snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, "("); + + memset(colKVs, 0, numCols*sizeof(TAOS_SML_KV*)); + + TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, r); + for (int i = 0; i < point->fieldNum; ++i) { + TAOS_SML_KV* kv = point->fields + i; + colKVs[kv->fieldSchemaIdx] = kv; + } + + for (int i = 0; i < numCols; ++i) { + if (colKVs[i] == NULL) { + snprintf(sql + strlen(sql), freeBytes-strlen(sql), "NULL,"); + } else { + TAOS_SML_KV* kv = colKVs[i]; + int32_t len = 0; + converToStr(sql+strlen(sql), kv->type, kv->value, kv->length, &len); + *(sql+strlen(sql)+len)='\0'; + } + } + snprintf(sql + strlen(sql) - 1, freeBytes - strlen(sql) + 1, ")"); + } + free(colKVs); + sql[strlen(sql)] = '\0'; + + tscDebug("SML:0x%"PRIx64" insert child table table %s of super table %s : %s", info->id, cTableName, sTableName, sql); + TAOS_RES* res = taos_query(taos, sql); + code = taos_errno(res); + info->affectedRows = taos_affected_rows(res); + return code; +} + +static int32_t applyChildTableDataPointsWithStmt(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { size_t numTags = taosArrayGetSize(sTableSchema->tags); size_t numCols = taosArrayGetSize(sTableSchema->fields); @@ -836,6 +925,7 @@ static int32_t applyChildTableDataPoints(TAOS* taos, char* cTableName, char* sTa taosArrayDestroy(tagBinds); return code; } + static int32_t insertChildTablePointsBatch(TAOS* taos, char* cTableName, char* sTableName, SArray* tagsSchema, SArray* tagsBind, SArray* colsSchema, SArray* rowsBind, @@ -1004,6 +1094,18 @@ static int32_t doInsertChildTablePoints(TAOS* taos, char* sql, char* cTableName, return 0; } +static int32_t applyChildTableDataPoints(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, + SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { + int32_t code = TSDB_CODE_SUCCESS; + size_t childTableDataPoints = taosArrayGetSize(cTablePoints); + if (childTableDataPoints < 10) { + applyChildTableDataPointsWithInsertSQL(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info); + } else { + applyChildTableDataPointsWithStmt(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info); + } + return code; +} + static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) { int32_t code = TSDB_CODE_SUCCESS; -- GitLab