diff --git a/src/client/inc/tscParseLine.h b/src/client/inc/tscParseLine.h index fef55011b0faec1d15876764b3fd9808ec2b4e39..16427c6328da041e733350d7a0a77164429a890b 100644 --- a/src/client/inc/tscParseLine.h +++ b/src/client/inc/tscParseLine.h @@ -58,6 +58,22 @@ typedef enum { SML_TIME_STAMP_NOW } SMLTimeStampType; +typedef struct SSmlSqlInsertBatch { + uint64_t id; + int32_t index; + + char* sql; + int32_t code; + int32_t tryTimes; + sem_t sem; + int32_t affectedRows; + bool tryAgain; + bool resetQueryCache; + bool sleep; +} SSmlSqlInsertBatch; + +#define MAX_SML_SQL_INSERT_BATCHES 512 + typedef struct { uint64_t id; SMLProtocolType protocol; @@ -65,7 +81,13 @@ typedef struct { SHashObj* smlDataToSchema; int32_t affectedRows; + + pthread_mutex_t batchMutex; + pthread_cond_t batchCond; + int32_t numBatches; + SSmlSqlInsertBatch batches[MAX_SML_SQL_INSERT_BATCHES]; } SSmlLinesInfo; + char* addEscapeCharToString(char *str, int32_t len); int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info); bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info); diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index ffc58e56d94cf0c6e168822a09d13605aeb15dea..4a32ec451ac55d42e21503bca76a5d8d335c483d 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -877,52 +877,91 @@ static int32_t addChildTableDataPointsToInsertSql(char* cTableName, char* sTable return 0; } -static int32_t doRunInsertSQL(TAOS* taos, char* sql, SSmlLinesInfo* info) { - int32_t code = 0; - bool tryAgain = false; - int32_t try = 0; - do { - TAOS_RES* res = taos_query(taos, sql); - code = taos_errno(res); - if (code != 0) { - tscError("SML:0x%"PRIx64 " taos_query return %d:%s", info->id, code, taos_errstr(res)); - } - - tscDebug("SML:0x%"PRIx64 " taos_query inserted %d rows", info->id, taos_affected_rows(res)); - info->affectedRows += taos_affected_rows(res); - taos_free_result(res); - - tryAgain = false; - if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID - || code == TSDB_CODE_VND_INVALID_VGROUP_ID - || code == TSDB_CODE_TDB_TABLE_RECONFIGURE - || code == TSDB_CODE_APP_NOT_READY - || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) { - tryAgain = true; - } +static void insertCallback(void *param, TAOS_RES *res, int32_t notUsedCode) { + SSmlSqlInsertBatch *batch = (SSmlSqlInsertBatch *)param; + batch->code = taos_errno(res); - if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) { - TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE"); - int32_t code2 = taos_errno(res2); - if (code2 != TSDB_CODE_SUCCESS) { - tscError("SML:0x%" PRIx64 " insert child table by sql. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - if (tryAgain) { - taosMsleep(100 * (2 << try)); - } - } - - if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { - if (tryAgain) { - taosMsleep( 100 * (2 << try)); - } - } - } while (tryAgain); - - return code; + if (batch->code != 0) { + tscError("SML:0x%"PRIx64 " batch %d , taos_query_a return %d:%s", batch->id, batch->index, batch->code, taos_errstr(res)); + } + tscDebug("SML:0x%"PRIx64 " batch %d, taos_query inserted %d rows", batch->id, batch->index, taos_affected_rows(res)); + batch->affectedRows = taos_affected_rows(res); + taos_free_result(res); -} + int32_t code = batch->code; + batch->tryAgain = false; + batch->resetQueryCache = false; + batch->sleep = false; + if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID + || code == TSDB_CODE_VND_INVALID_VGROUP_ID + || code == TSDB_CODE_TDB_TABLE_RECONFIGURE + || code == TSDB_CODE_APP_NOT_READY + || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && batch->tryTimes < TSDB_MAX_REPLICA) { + batch->tryAgain = true; + } + + if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) { + batch->resetQueryCache = true; + if (batch->tryAgain) { + batch->sleep = true; + } + } + + if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + if (batch->tryAgain) { + batch->sleep = true; + } + } + + sem_post(&batch->sem); +} + +//static int32_t doRunInsertSQL(TAOS* taos, char* sql, SSmlLinesInfo* info) { +// int32_t code = 0; +// bool tryAgain = false; +// int32_t try = 0; +// do { +// TAOS_RES* res = taos_query(taos, sql); +// code = taos_errno(res); +// if (code != 0) { +// tscError("SML:0x%"PRIx64 " taos_query return %d:%s", info->id, code, taos_errstr(res)); +// } +// +// tscDebug("SML:0x%"PRIx64 " taos_query inserted %d rows", info->id, taos_affected_rows(res)); +// info->affectedRows += taos_affected_rows(res); +// taos_free_result(res); +// +// tryAgain = false; +// if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID +// || code == TSDB_CODE_VND_INVALID_VGROUP_ID +// || code == TSDB_CODE_TDB_TABLE_RECONFIGURE +// || code == TSDB_CODE_APP_NOT_READY +// || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) { +// tryAgain = true; +// } +// +// if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) { +// TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE"); +// int32_t code2 = taos_errno(res2); +// if (code2 != TSDB_CODE_SUCCESS) { +// tscError("SML:0x%" PRIx64 " insert child table by sql. reset query cache. error: %s", info->id, taos_errstr(res2)); +// } +// taos_free_result(res2); +// if (tryAgain) { +// taosMsleep(100 * (2 << try)); +// } +// } +// +// if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { +// if (tryAgain) { +// taosMsleep( 100 * (2 << try)); +// } +// } +// } while (tryAgain); +// +// return code; +// +//} static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) { int32_t code = TSDB_CODE_SUCCESS; @@ -930,14 +969,20 @@ static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* poi SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas, info); - char* sql = malloc(tsMaxSQLStringLen + 1); - if (sql == NULL) { - tscError("malloc sql memory error"); - return TSDB_CODE_TSC_OUT_OF_MEMORY; + for (int i = 0; i < MAX_SML_SQL_INSERT_BATCHES; ++i) { + info->batches[i].id = info->id; + info->batches[i].index = i; + info->batches[i].sql = NULL; + info->batches[i].tryTimes = 0; + sem_init(&info->batches[i].sem, 0, 0); } + + info->numBatches = 0; + SSmlSqlInsertBatch *batch = info->batches; + batch->sql = malloc(tsMaxSQLStringLen + 1); + //TODO batch->sql allocation errror int32_t freeBytes = tsMaxSQLStringLen; - int32_t usedBytes = 0; - usedBytes += sprintf(sql, "insert into"); + int32_t usedBytes = sprintf(batch->sql, "insert into"); freeBytes -= usedBytes; SArray** pCTablePoints = taosHashIterate(cname2points, NULL); @@ -947,48 +992,100 @@ static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* poi TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0); SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); - size_t rowSize = 0; - size_t numCols = taosArrayGetSize(sTableSchema->fields); - for (int i = 0; i < numCols; ++i) { - SSchema* colSchema = taosArrayGet(sTableSchema->fields, i); - rowSize += colSchema->bytes; - } - - tscDebug("SML:0x%"PRIx64" add child table points to SQL. child table: %s of super table %s, row size: %zu", - info->id, point->childTableName, point->stableName, rowSize); + tscDebug("SML:0x%"PRIx64" add child table points to SQL. child table: %s of super table %s", + info->id, point->childTableName, point->stableName); int32_t cTableSqlLen = 0; - code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, sql+usedBytes, freeBytes, &cTableSqlLen, info); - if (cTableSqlLen < freeBytes) { + code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, batch->sql+usedBytes, freeBytes, &cTableSqlLen, info); + int32_t safeBound = 2048; + if (cTableSqlLen < freeBytes - safeBound) { usedBytes += cTableSqlLen; freeBytes -= cTableSqlLen; } else { - sql[usedBytes] = '\0'; - code = doRunInsertSQL(taos, sql, info); - if (code != 0) { - tscError("SML:0x%"PRIx64" Apply points failed. sql: %s, error: %s", info->id, sql, tstrerror(code)); + batch->sql[usedBytes] = '\0'; + info->numBatches++; + if (info->numBatches >= MAX_SML_SQL_INSERT_BATCHES) { + tscError("SML:0x%"PRIx64" Apply points failed. exceeds max sql insert batches", info->id); + code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto cleanup; } + + batch = &info->batches[info->numBatches]; + batch->sql = malloc(tsMaxSQLStringLen + 1); freeBytes = tsMaxSQLStringLen; - usedBytes = 0; - usedBytes += sprintf(sql, "insert into"); + usedBytes = sprintf(batch->sql, "insert into"); freeBytes -= usedBytes; //TODO deal with one child table rows exceeds columns - code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, sql+usedBytes, freeBytes, &cTableSqlLen, info); + code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, batch->sql+usedBytes, freeBytes, &cTableSqlLen, info); } pCTablePoints = taosHashIterate(cname2points, pCTablePoints); } + batch->sql[usedBytes] = '\0'; + info->numBatches++; - sql[usedBytes] = '\0'; - code = doRunInsertSQL(taos, sql, info); - if (code != 0) { - tscError("SML:0x%"PRIx64" Apply points failed. sql: %s, error: %s", info->id, sql, tstrerror(code)); - goto cleanup; + bool batchesExecuted[MAX_SML_SQL_INSERT_BATCHES] = {false}; + + for (int i = 0; i < info->numBatches; ++i) { + SSmlSqlInsertBatch* insertBatch = &info->batches[i]; + insertBatch->tryTimes = 1; + taos_query_a(taos, insertBatch->sql, insertCallback, batch); + batchesExecuted[i] = true; + } + int32_t triedBatches = info->numBatches; + + while (triedBatches > 0) { + for (int i = 0; i < info->numBatches; ++i) { + if (batchesExecuted[i]) { + sem_wait(&info->batches[i].sem); + info->affectedRows += info->batches[i].affectedRows; + } + } + + for (int i = 0; i < info->numBatches; ++i) { + SSmlSqlInsertBatch* b = info->batches + i; + if (b->resetQueryCache) { + TAOS_RES* res = taos_query(taos, "RESET QUERY CACHE"); + taos_free_result(res); + break; + } + } + + for (int i = 0; i < info->numBatches; ++i) { + SSmlSqlInsertBatch* b = info->batches + i; + if (b->sleep) { + taosMsleep(100 * (2 << b->tryTimes)); + break; + } + } + + memset(batchesExecuted, 0, sizeof(batchesExecuted)); + triedBatches = 0; + for (int i = 0; i < info->numBatches; ++i) { + SSmlSqlInsertBatch* insertBatch = &info->batches[i]; + if (insertBatch->tryAgain) { + insertBatch->tryTimes++; + taos_query_a(taos, insertBatch->sql, insertCallback, batch); + batchesExecuted[i] = true; + triedBatches++; + } + } + } + + code = 0; + for (int i = 0; i < info->numBatches; ++i) { + SSmlSqlInsertBatch* b = info->batches + i; + if (b->code != 0) { + code = b->code; + } } - tscDebug("SML:0x%"PRIx64" successfully applied data points", info->id); cleanup: - free(sql); + for (int i = 0; i < MAX_SML_SQL_INSERT_BATCHES; ++i) { + free(info->batches[i].sql); + info->batches[i].sql = NULL; + sem_destroy(&info->batches[i].sem); + } + pCTablePoints = taosHashIterate(cname2points, NULL); while (pCTablePoints) { SArray* pPoints = *pCTablePoints;