提交 3dd72e84 编写于 作者: S shenglian zhou

schemaless insert perf

上级 e354f13b
...@@ -58,6 +58,22 @@ typedef enum { ...@@ -58,6 +58,22 @@ typedef enum {
SML_TIME_STAMP_NOW SML_TIME_STAMP_NOW
} SMLTimeStampType; } SMLTimeStampType;
typedef struct SSmlSqlInsertBatch {
uint64_t id;
int32_t index;
char* sql;
int32_t code;
int32_t tryTimes;
sem_t sem;
int32_t affectedRows;
bool tryAgain;
bool resetQueryCache;
bool sleep;
} SSmlSqlInsertBatch;
#define MAX_SML_SQL_INSERT_BATCHES 512
typedef struct { typedef struct {
uint64_t id; uint64_t id;
SMLProtocolType protocol; SMLProtocolType protocol;
...@@ -65,7 +81,13 @@ typedef struct { ...@@ -65,7 +81,13 @@ typedef struct {
SHashObj* smlDataToSchema; SHashObj* smlDataToSchema;
int32_t affectedRows; int32_t affectedRows;
pthread_mutex_t batchMutex;
pthread_cond_t batchCond;
int32_t numBatches;
SSmlSqlInsertBatch batches[MAX_SML_SQL_INSERT_BATCHES];
} SSmlLinesInfo; } SSmlLinesInfo;
char* addEscapeCharToString(char *str, int32_t len); char* addEscapeCharToString(char *str, int32_t len);
int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info); int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info);
bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info); bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info);
......
...@@ -877,52 +877,91 @@ static int32_t addChildTableDataPointsToInsertSql(char* cTableName, char* sTable ...@@ -877,52 +877,91 @@ static int32_t addChildTableDataPointsToInsertSql(char* cTableName, char* sTable
return 0; return 0;
} }
static int32_t doRunInsertSQL(TAOS* taos, char* sql, SSmlLinesInfo* info) { static void insertCallback(void *param, TAOS_RES *res, int32_t notUsedCode) {
int32_t code = 0; SSmlSqlInsertBatch *batch = (SSmlSqlInsertBatch *)param;
bool tryAgain = false; batch->code = taos_errno(res);
int32_t try = 0;
do {
TAOS_RES* res = taos_query(taos, sql);
code = taos_errno(res);
if (code != 0) {
tscError("SML:0x%"PRIx64 " taos_query return %d:%s", info->id, code, taos_errstr(res));
}
tscDebug("SML:0x%"PRIx64 " taos_query inserted %d rows", info->id, taos_affected_rows(res)); if (batch->code != 0) {
info->affectedRows += taos_affected_rows(res); tscError("SML:0x%"PRIx64 " batch %d , taos_query_a return %d:%s", batch->id, batch->index, batch->code, taos_errstr(res));
}
tscDebug("SML:0x%"PRIx64 " batch %d, taos_query inserted %d rows", batch->id, batch->index, taos_affected_rows(res));
batch->affectedRows = taos_affected_rows(res);
taos_free_result(res); taos_free_result(res);
tryAgain = false; int32_t code = batch->code;
batch->tryAgain = false;
batch->resetQueryCache = false;
batch->sleep = false;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
|| code == TSDB_CODE_VND_INVALID_VGROUP_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID
|| code == TSDB_CODE_TDB_TABLE_RECONFIGURE || code == TSDB_CODE_TDB_TABLE_RECONFIGURE
|| code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_APP_NOT_READY
|| code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) { || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && batch->tryTimes < TSDB_MAX_REPLICA) {
tryAgain = true; batch->tryAgain = true;
} }
if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) { if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE"); batch->resetQueryCache = true;
int32_t code2 = taos_errno(res2); if (batch->tryAgain) {
if (code2 != TSDB_CODE_SUCCESS) { batch->sleep = true;
tscError("SML:0x%" PRIx64 " insert child table by sql. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
if (tryAgain) {
taosMsleep(100 * (2 << try));
} }
} }
if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (tryAgain) { if (batch->tryAgain) {
taosMsleep( 100 * (2 << try)); batch->sleep = true;
} }
} }
} while (tryAgain);
sem_post(&batch->sem);
return code; }
} //static int32_t doRunInsertSQL(TAOS* taos, char* sql, SSmlLinesInfo* info) {
// int32_t code = 0;
// bool tryAgain = false;
// int32_t try = 0;
// do {
// TAOS_RES* res = taos_query(taos, sql);
// code = taos_errno(res);
// if (code != 0) {
// tscError("SML:0x%"PRIx64 " taos_query return %d:%s", info->id, code, taos_errstr(res));
// }
//
// tscDebug("SML:0x%"PRIx64 " taos_query inserted %d rows", info->id, taos_affected_rows(res));
// info->affectedRows += taos_affected_rows(res);
// taos_free_result(res);
//
// tryAgain = false;
// if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
// || code == TSDB_CODE_VND_INVALID_VGROUP_ID
// || code == TSDB_CODE_TDB_TABLE_RECONFIGURE
// || code == TSDB_CODE_APP_NOT_READY
// || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) {
// tryAgain = true;
// }
//
// if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
// TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
// int32_t code2 = taos_errno(res2);
// if (code2 != TSDB_CODE_SUCCESS) {
// tscError("SML:0x%" PRIx64 " insert child table by sql. reset query cache. error: %s", info->id, taos_errstr(res2));
// }
// taos_free_result(res2);
// if (tryAgain) {
// taosMsleep(100 * (2 << try));
// }
// }
//
// if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
// if (tryAgain) {
// taosMsleep( 100 * (2 << try));
// }
// }
// } while (tryAgain);
//
// return code;
//
//}
static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) { static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -930,14 +969,20 @@ static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* poi ...@@ -930,14 +969,20 @@ static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* poi
SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas, info); arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas, info);
char* sql = malloc(tsMaxSQLStringLen + 1); for (int i = 0; i < MAX_SML_SQL_INSERT_BATCHES; ++i) {
if (sql == NULL) { info->batches[i].id = info->id;
tscError("malloc sql memory error"); info->batches[i].index = i;
return TSDB_CODE_TSC_OUT_OF_MEMORY; info->batches[i].sql = NULL;
info->batches[i].tryTimes = 0;
sem_init(&info->batches[i].sem, 0, 0);
} }
info->numBatches = 0;
SSmlSqlInsertBatch *batch = info->batches;
batch->sql = malloc(tsMaxSQLStringLen + 1);
//TODO batch->sql allocation errror
int32_t freeBytes = tsMaxSQLStringLen; int32_t freeBytes = tsMaxSQLStringLen;
int32_t usedBytes = 0; int32_t usedBytes = sprintf(batch->sql, "insert into");
usedBytes += sprintf(sql, "insert into");
freeBytes -= usedBytes; freeBytes -= usedBytes;
SArray** pCTablePoints = taosHashIterate(cname2points, NULL); SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
...@@ -947,48 +992,100 @@ static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* poi ...@@ -947,48 +992,100 @@ static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* poi
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0); TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
size_t rowSize = 0; tscDebug("SML:0x%"PRIx64" add child table points to SQL. child table: %s of super table %s",
size_t numCols = taosArrayGetSize(sTableSchema->fields); info->id, point->childTableName, point->stableName);
for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(sTableSchema->fields, i);
rowSize += colSchema->bytes;
}
tscDebug("SML:0x%"PRIx64" add child table points to SQL. child table: %s of super table %s, row size: %zu",
info->id, point->childTableName, point->stableName, rowSize);
int32_t cTableSqlLen = 0; int32_t cTableSqlLen = 0;
code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, sql+usedBytes, freeBytes, &cTableSqlLen, info); code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, batch->sql+usedBytes, freeBytes, &cTableSqlLen, info);
if (cTableSqlLen < freeBytes) { int32_t safeBound = 2048;
if (cTableSqlLen < freeBytes - safeBound) {
usedBytes += cTableSqlLen; usedBytes += cTableSqlLen;
freeBytes -= cTableSqlLen; freeBytes -= cTableSqlLen;
} else { } else {
sql[usedBytes] = '\0'; batch->sql[usedBytes] = '\0';
code = doRunInsertSQL(taos, sql, info); info->numBatches++;
if (code != 0) { if (info->numBatches >= MAX_SML_SQL_INSERT_BATCHES) {
tscError("SML:0x%"PRIx64" Apply points failed. sql: %s, error: %s", info->id, sql, tstrerror(code)); tscError("SML:0x%"PRIx64" Apply points failed. exceeds max sql insert batches", info->id);
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto cleanup; goto cleanup;
} }
batch = &info->batches[info->numBatches];
batch->sql = malloc(tsMaxSQLStringLen + 1);
freeBytes = tsMaxSQLStringLen; freeBytes = tsMaxSQLStringLen;
usedBytes = 0; usedBytes = sprintf(batch->sql, "insert into");
usedBytes += sprintf(sql, "insert into");
freeBytes -= usedBytes; freeBytes -= usedBytes;
//TODO deal with one child table rows exceeds columns //TODO deal with one child table rows exceeds columns
code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, sql+usedBytes, freeBytes, &cTableSqlLen, info); code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, batch->sql+usedBytes, freeBytes, &cTableSqlLen, info);
} }
pCTablePoints = taosHashIterate(cname2points, pCTablePoints); pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
} }
batch->sql[usedBytes] = '\0';
info->numBatches++;
sql[usedBytes] = '\0'; bool batchesExecuted[MAX_SML_SQL_INSERT_BATCHES] = {false};
code = doRunInsertSQL(taos, sql, info);
if (code != 0) { for (int i = 0; i < info->numBatches; ++i) {
tscError("SML:0x%"PRIx64" Apply points failed. sql: %s, error: %s", info->id, sql, tstrerror(code)); SSmlSqlInsertBatch* insertBatch = &info->batches[i];
goto cleanup; insertBatch->tryTimes = 1;
taos_query_a(taos, insertBatch->sql, insertCallback, batch);
batchesExecuted[i] = true;
}
int32_t triedBatches = info->numBatches;
while (triedBatches > 0) {
for (int i = 0; i < info->numBatches; ++i) {
if (batchesExecuted[i]) {
sem_wait(&info->batches[i].sem);
info->affectedRows += info->batches[i].affectedRows;
}
}
for (int i = 0; i < info->numBatches; ++i) {
SSmlSqlInsertBatch* b = info->batches + i;
if (b->resetQueryCache) {
TAOS_RES* res = taos_query(taos, "RESET QUERY CACHE");
taos_free_result(res);
break;
}
}
for (int i = 0; i < info->numBatches; ++i) {
SSmlSqlInsertBatch* b = info->batches + i;
if (b->sleep) {
taosMsleep(100 * (2 << b->tryTimes));
break;
}
}
memset(batchesExecuted, 0, sizeof(batchesExecuted));
triedBatches = 0;
for (int i = 0; i < info->numBatches; ++i) {
SSmlSqlInsertBatch* insertBatch = &info->batches[i];
if (insertBatch->tryAgain) {
insertBatch->tryTimes++;
taos_query_a(taos, insertBatch->sql, insertCallback, batch);
batchesExecuted[i] = true;
triedBatches++;
}
}
}
code = 0;
for (int i = 0; i < info->numBatches; ++i) {
SSmlSqlInsertBatch* b = info->batches + i;
if (b->code != 0) {
code = b->code;
}
} }
tscDebug("SML:0x%"PRIx64" successfully applied data points", info->id);
cleanup: cleanup:
free(sql); for (int i = 0; i < MAX_SML_SQL_INSERT_BATCHES; ++i) {
free(info->batches[i].sql);
info->batches[i].sql = NULL;
sem_destroy(&info->batches[i].sem);
}
pCTablePoints = taosHashIterate(cname2points, NULL); pCTablePoints = taosHashIterate(cname2points, NULL);
while (pCTablePoints) { while (pCTablePoints) {
SArray* pPoints = *pCTablePoints; SArray* pPoints = *pCTablePoints;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册