提交 4208fa0b 编写于 作者: S shenglian zhou

improve performance

上级 ec007459
...@@ -792,6 +792,214 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu ...@@ -792,6 +792,214 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
return 0; return 0;
} }
static int32_t addChildTableDataPointsToInsertSql(char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, SArray* cTablePoints,
char* sql, int32_t capacity, int32_t* cTableSqlLen, SSmlLinesInfo* info) {
size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t numCols = taosArrayGetSize(sTableSchema->fields);
size_t rows = taosArrayGetSize(cTablePoints);
SArray* tagsSchema = sTableSchema->tags;
SArray* colsSchema = sTableSchema->fields;
TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
for (int i = 0; i < rows; ++i) {
TAOS_SML_DATA_POINT* pDataPoint = taosArrayGetP(cTablePoints, i);
for (int j = 0; j < pDataPoint->tagNum; ++j) {
TAOS_SML_KV* kv = pDataPoint->tags + j;
tagKVs[kv->fieldSchemaIdx] = kv;
}
}
int32_t freeBytes = capacity;
int32_t totalLen = 0;
totalLen += sprintf(sql, " %s using %s (", cTableName, sTableName);
for (int i = 0; i < numTags; ++i) {
SSchema* tagSchema = taosArrayGet(tagsSchema, i);
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", tagSchema->name);
}
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")");
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, " tags (");
for (int i = 0; i < numTags; ++i) {
if (tagKVs[i] == NULL) {
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,");
} else {
TAOS_SML_KV* kv = tagKVs[i];
size_t beforeLen = totalLen;
int32_t len = 0;
converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len);
totalLen += len;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ",");
}
}
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") (");
for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(colsSchema, i);
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", colSchema->name);
}
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") values ");
TAOS_SML_KV** colKVs = malloc(numCols * sizeof(TAOS_SML_KV*));
for (int r = 0; r < rows; ++r) {
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "(");
memset(colKVs, 0, numCols * sizeof(TAOS_SML_KV*));
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, r);
for (int i = 0; i < point->fieldNum; ++i) {
TAOS_SML_KV* kv = point->fields + i;
colKVs[kv->fieldSchemaIdx] = kv;
}
for (int i = 0; i < numCols; ++i) {
if (colKVs[i] == NULL) {
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,");
} else {
TAOS_SML_KV* kv = colKVs[i];
size_t beforeLen = totalLen;
int32_t len = 0;
converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len);
totalLen += len;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ",");
}
}
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")");
}
free(colKVs);
*cTableSqlLen = totalLen;
return 0;
}
static int32_t doRunInsertSQL(TAOS* taos, char* sql, SSmlLinesInfo* info) {
int32_t code = 0;
bool tryAgain = false;
int32_t try = 0;
do {
TAOS_RES* res = taos_query(taos, sql);
code = taos_errno(res);
if (code != 0) {
tscError("SML:0x%"PRIx64 " taos_query return %d:%s", info->id, code, taos_errstr(res));
}
tscDebug("SML:0x%"PRIx64 " taos_query inserted %d rows", info->id, taos_affected_rows(res));
info->affectedRows += taos_affected_rows(res);
taos_free_result(res);
tryAgain = false;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
|| code == TSDB_CODE_VND_INVALID_VGROUP_ID
|| code == TSDB_CODE_TDB_TABLE_RECONFIGURE
|| code == TSDB_CODE_APP_NOT_READY
|| code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) {
tryAgain = true;
}
if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
int32_t code2 = taos_errno(res2);
if (code2 != TSDB_CODE_SUCCESS) {
tscError("SML:0x%" PRIx64 " insert child table by sql. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
if (tryAgain) {
taosMsleep(100 * (2 << try));
}
}
if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (tryAgain) {
taosMsleep( 100 * (2 << try));
}
}
} while (tryAgain);
free(sql);
return code;
}
static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas, info);
char* sql = malloc(tsMaxSQLStringLen + 1);
if (sql == NULL) {
tscError("malloc sql memory error");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t freeBytes = tsMaxSQLStringLen;
int32_t usedBytes = 0;
usedBytes += sprintf(sql, "insert into");
freeBytes -= usedBytes;
SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
while (pCTablePoints) {
SArray* cTablePoints = *pCTablePoints;
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
size_t rowSize = 0;
size_t numCols = taosArrayGetSize(sTableSchema->fields);
for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(sTableSchema->fields, i);
rowSize += colSchema->bytes;
}
tscDebug("SML:0x%"PRIx64" add child table points to SQL. child table: %s of super table %s, row size: %zu",
info->id, point->childTableName, point->stableName, rowSize);
int32_t cTableSqlLen = 0;
code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, sql+usedBytes, freeBytes, &cTableSqlLen, info);
if (cTableSqlLen < freeBytes) {
usedBytes += cTableSqlLen;
freeBytes -= cTableSqlLen;
} else {
sql[usedBytes] = '\0';
code = doRunInsertSQL(taos, sql, info);
if (code != 0) {
tscError("SML:0x%"PRIx64" Apply points failed. sql: %s, error: %s", info->id, sql, tstrerror(code));
goto cleanup;
}
freeBytes = tsMaxSQLStringLen;
usedBytes = 0;
usedBytes += sprintf(sql, "insert into");
freeBytes -= usedBytes;
//TODO deal with one child table rows exceeds columns
code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, sql+usedBytes, freeBytes, &cTableSqlLen, info);
}
pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
}
sql[usedBytes] = '\0';
code = doRunInsertSQL(taos, sql, info);
if (code != 0) {
tscError("SML:0x%"PRIx64" Apply points failed. sql: %s, error: %s", info->id, sql, tstrerror(code));
goto cleanup;
}
tscDebug("SML:0x%"PRIx64" successfully applied data points", info->id);
cleanup:
pCTablePoints = taosHashIterate(cname2points, NULL);
while (pCTablePoints) {
SArray* pPoints = *pCTablePoints;
taosArrayDestroy(&pPoints);
pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
}
taosHashCleanup(cname2points);
return code;
}
static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -1328,7 +1536,12 @@ int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLine ...@@ -1328,7 +1536,12 @@ int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLine
} }
tscDebug("SML:0x%"PRIx64" apply data points", info->id); tscDebug("SML:0x%"PRIx64" apply data points", info->id);
bool tableByTable = false;
if (tableByTable) {
code = applyDataPoints(taos, points, numPoint, stableSchemas, info); code = applyDataPoints(taos, points, numPoint, stableSchemas, info);
} else {
code = applyDataPointsWithSqlInsert(taos, points, numPoint, stableSchemas, info);
}
if (code != 0) { if (code != 0) {
tscError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code)); tscError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code));
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册