提交 ba68a1bc 编写于 作者: S shenglian zhou

use taos_query instead of taos_stmt for child table batch less than 10 rows

上级 2e23e9b7
...@@ -750,7 +750,96 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu ...@@ -750,7 +750,96 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
return 0; return 0;
} }
static int32_t applyChildTableDataPoints(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t numCols = taosArrayGetSize(sTableSchema->fields);
size_t rows = taosArrayGetSize(cTablePoints);
SArray* tagsSchema = sTableSchema->tags;
SArray* colsSchema = sTableSchema->fields;
TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
for (int i= 0; i < rows; ++i) {
TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
for (int j = 0; j < pDataPoint->tagNum; ++j) {
TAOS_SML_KV* kv = pDataPoint->tags + j;
tagKVs[kv->fieldSchemaIdx] = kv;
}
}
char* sql = malloc(tsMaxSQLStringLen+1);
if (sql == NULL) {
tscError("malloc sql memory error");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t freeBytes = tsMaxSQLStringLen + 1 ;
sprintf(sql, "insert into ? using %s (", sTableName);
for (int i = 0; i < numTags; ++i) {
SSchema* tagSchema = taosArrayGet(tagsSchema, i);
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name);
}
snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");
snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags (");
// for (int i = 0; i < numTags; ++i) {
// snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
// }
for (int i = 0; i < numTags; ++i) {
if (tagKVs[i] == NULL) {
snprintf(sql + strlen(sql), freeBytes-strlen(sql), "NULL,");
} else {
TAOS_SML_KV* kv = tagKVs[i];
int32_t len = 0;
converToStr(sql+strlen(sql), kv->type, kv->value, kv->length, &len);
*(sql+strlen(sql)+len)='\0';
}
}
snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ") (");
for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(colsSchema, i);
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name);
}
snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values ");
TAOS_SML_KV** colKVs = malloc(numCols*sizeof(TAOS_SML_KV*));
for (int r = 0; r < rows; ++r) {
snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, "(");
memset(colKVs, 0, numCols*sizeof(TAOS_SML_KV*));
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, r);
for (int i = 0; i < point->fieldNum; ++i) {
TAOS_SML_KV* kv = point->fields + i;
colKVs[kv->fieldSchemaIdx] = kv;
}
for (int i = 0; i < numCols; ++i) {
if (colKVs[i] == NULL) {
snprintf(sql + strlen(sql), freeBytes-strlen(sql), "NULL,");
} else {
TAOS_SML_KV* kv = colKVs[i];
int32_t len = 0;
converToStr(sql+strlen(sql), kv->type, kv->value, kv->length, &len);
*(sql+strlen(sql)+len)='\0';
}
}
snprintf(sql + strlen(sql) - 1, freeBytes - strlen(sql) + 1, ")");
}
free(colKVs);
sql[strlen(sql)] = '\0';
tscDebug("SML:0x%"PRIx64" insert child table table %s of super table %s : %s", info->id, cTableName, sTableName, sql);
TAOS_RES* res = taos_query(taos, sql);
code = taos_errno(res);
info->affectedRows = taos_affected_rows(res);
return code;
}
static int32_t applyChildTableDataPointsWithStmt(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
size_t numTags = taosArrayGetSize(sTableSchema->tags); size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t numCols = taosArrayGetSize(sTableSchema->fields); size_t numCols = taosArrayGetSize(sTableSchema->fields);
...@@ -836,6 +925,7 @@ static int32_t applyChildTableDataPoints(TAOS* taos, char* cTableName, char* sTa ...@@ -836,6 +925,7 @@ static int32_t applyChildTableDataPoints(TAOS* taos, char* cTableName, char* sTa
taosArrayDestroy(tagBinds); taosArrayDestroy(tagBinds);
return code; return code;
} }
static int32_t insertChildTablePointsBatch(TAOS* taos, char* cTableName, char* sTableName, static int32_t insertChildTablePointsBatch(TAOS* taos, char* cTableName, char* sTableName,
SArray* tagsSchema, SArray* tagsBind, SArray* tagsSchema, SArray* tagsBind,
SArray* colsSchema, SArray* rowsBind, SArray* colsSchema, SArray* rowsBind,
...@@ -1004,6 +1094,18 @@ static int32_t doInsertChildTablePoints(TAOS* taos, char* sql, char* cTableName, ...@@ -1004,6 +1094,18 @@ static int32_t doInsertChildTablePoints(TAOS* taos, char* sql, char* cTableName,
return 0; return 0;
} }
static int32_t applyChildTableDataPoints(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
size_t childTableDataPoints = taosArrayGetSize(cTablePoints);
if (childTableDataPoints < 10) {
applyChildTableDataPointsWithInsertSQL(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info);
} else {
applyChildTableDataPointsWithStmt(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info);
}
return code;
}
static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) { static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册