diff --git a/src/client/inc/tscParseLine.h b/src/client/inc/tscParseLine.h index fef55011b0faec1d15876764b3fd9808ec2b4e39..30a316210c3d58539d3f6d0a5fb55fdd24108fad 100644 --- a/src/client/inc/tscParseLine.h +++ b/src/client/inc/tscParseLine.h @@ -58,6 +58,22 @@ typedef enum { SML_TIME_STAMP_NOW } SMLTimeStampType; +typedef struct SSmlSqlInsertBatch { + uint64_t id; + int32_t index; + + char* sql; + int32_t code; + int32_t tryTimes; + tsem_t sem; + int32_t affectedRows; + bool tryAgain; + bool resetQueryCache; + bool sleep; +} SSmlSqlInsertBatch; + +#define MAX_SML_SQL_INSERT_BATCHES 512 + typedef struct { uint64_t id; SMLProtocolType protocol; @@ -65,7 +81,13 @@ typedef struct { SHashObj* smlDataToSchema; int32_t affectedRows; + + pthread_mutex_t batchMutex; + pthread_cond_t batchCond; + int32_t numBatches; + SSmlSqlInsertBatch batches[MAX_SML_SQL_INSERT_BATCHES]; } SSmlLinesInfo; + char* addEscapeCharToString(char *str, int32_t len); int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info); bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info); diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 8ee4a2a6d98ecf1c2300983f4427abe46fe421e0..8c92c86c194034f17f3cb05b15dec0e8e1b87f1e 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -32,10 +32,6 @@ typedef struct { static uint64_t linesSmlHandleId = 0; -static int32_t insertChildTablePointsBatch(void* pVoid, char* name, char* name1, SArray* pArray, SArray* pArray1, - SArray* pArray2, SArray* pArray3, size_t size, SSmlLinesInfo* info); -static int32_t doInsertChildTablePoints(void* pVoid, char* sql, char* name, SArray* pArray, SArray* pArray1, - SSmlLinesInfo* info); uint64_t genLinesSmlId() { uint64_t id; @@ -91,16 +87,17 @@ static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes, uint64_t *bytes = tDataTypes[kv->type].bytes; } else { if (kv->type == TSDB_DATA_TYPE_NCHAR) { - char* ucs = malloc(kv->length * TSDB_NCHAR_SIZE + 1); - int32_t bytesNeeded = 0; - bool succ = taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded); - if (!succ) { - free(ucs); - tscError("SML:0x%"PRIx64" convert nchar string to UCS4_LE failed:%s", id, kv->value); - return TSDB_CODE_TSC_INVALID_VALUE; - } - free(ucs); - *bytes = bytesNeeded + VARSTR_HEADER_SIZE; +// char* ucs = malloc(kv->length * TSDB_NCHAR_SIZE + 1); +// int32_t bytesNeeded = 0; +// bool succ = taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded); +// if (!succ) { +// free(ucs); +// tscError("SML:0x%"PRIx64" convert nchar string to UCS4_LE failed:%s", id, kv->value); +// return TSDB_CODE_TSC_INVALID_VALUE; +// } +// free(ucs); +// *bytes = bytesNeeded + VARSTR_HEADER_SIZE; + *bytes = kv->length * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; } else if (kv->type == TSDB_DATA_TYPE_BINARY) { *bytes = kv->length + VARSTR_HEADER_SIZE; } @@ -792,9 +789,23 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu return 0; } -static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, - SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { - int32_t code = TSDB_CODE_SUCCESS; +static int smlSnprintf(char* buf, int32_t *total, int32_t cap, char* fmt, ...) { + if (*total > cap) { + return -1; + } + + va_list argp; + va_start(argp, fmt); + int len = vsnprintf(buf + *total, cap - *total, fmt, argp); + if (len < 0 || len >= cap - *total) { + return -2; + } + *total += len; + return 0; +} + +static int32_t addChildTableDataPointsToInsertSql(char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, SArray* cTablePoints, + char* sql, int32_t capacity, int32_t* cTableSqlLen, int fromIndex, int* nextIndex, SSmlLinesInfo* info) { size_t numTags = taosArrayGetSize(sTableSchema->tags); size_t numCols = taosArrayGetSize(sTableSchema->fields); size_t rows = taosArrayGetSize(cTablePoints); @@ -810,53 +821,79 @@ static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableNa } } - char* sql = malloc(tsMaxSQLStringLen + 1); - if (sql == NULL) { - tscError("malloc sql memory error"); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } + TAOS_SML_KV** colKVs = malloc(numCols * sizeof(TAOS_SML_KV*)); + int r = fromIndex; - int32_t freeBytes = tsMaxSQLStringLen + 1; int32_t totalLen = 0; - totalLen += sprintf(sql, "insert into %s using %s (", cTableName, sTableName); + int ret = 0; + ret = smlSnprintf(sql, &totalLen, capacity, " %s using %s (", cTableName, sTableName); + if (ret != 0) { + goto _cleanup; + } + for (int i = 0; i < numTags; ++i) { SSchema* tagSchema = taosArrayGet(tagsSchema, i); - totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", tagSchema->name); + ret = smlSnprintf(sql, &totalLen, capacity, "%s,", tagSchema->name); + if (ret != 0) { + goto _cleanup; + } } --totalLen; - totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")"); - totalLen += snprintf(sql + totalLen, freeBytes - totalLen, " tags ("); + ret = smlSnprintf(sql, &totalLen, capacity, ") tags ("); + if (ret != 0) { + goto _cleanup; + } - // for (int i = 0; i < numTags; ++i) { - // snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); - // } for (int i = 0; i < numTags; ++i) { + if (capacity - totalLen < TSDB_MAX_BYTES_PER_ROW) { + goto _cleanup; + } if (tagKVs[i] == NULL) { - totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,"); + ret = smlSnprintf(sql, &totalLen, capacity, "NULL,"); + if (ret != 0) { + goto _cleanup; + } } else { TAOS_SML_KV* kv = tagKVs[i]; size_t beforeLen = totalLen; int32_t len = 0; converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len); totalLen += len; - totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ","); + + ret = smlSnprintf(sql, &totalLen, capacity, ","); + if (ret != 0) { + goto _cleanup; + } } } --totalLen; - totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") ("); + ret = smlSnprintf(sql, &totalLen, capacity, ") ("); + if (ret != 0) { + goto _cleanup; + } for (int i = 0; i < numCols; ++i) { SSchema* colSchema = taosArrayGet(colsSchema, i); - totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", colSchema->name); + ret = smlSnprintf(sql, &totalLen, capacity, "%s,", colSchema->name); + if (ret != 0) { + goto _cleanup; + } } --totalLen; - totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") values "); - - TAOS_SML_KV** colKVs = malloc(numCols * sizeof(TAOS_SML_KV*)); - for (int r = 0; r < rows; ++r) { - totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "("); + ret = smlSnprintf(sql, &totalLen, capacity, ") values "); + if (ret != 0) { + goto _cleanup; + } + for (; r < rows; ++r) { + if (capacity - totalLen < TSDB_MAX_BYTES_PER_ROW) { + break; + } + ret = smlSnprintf(sql, &totalLen, capacity, "("); + if (ret != 0) { + goto _cleanup; + } memset(colKVs, 0, numCols * sizeof(TAOS_SML_KV*)); TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, r); @@ -867,372 +904,215 @@ static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableNa for (int i = 0; i < numCols; ++i) { if (colKVs[i] == NULL) { - totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,"); + ret = smlSnprintf(sql, &totalLen, capacity, "NULL,"); + if (ret != 0) { + goto _cleanup; + } } else { TAOS_SML_KV* kv = colKVs[i]; size_t beforeLen = totalLen; int32_t len = 0; converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len); totalLen += len; - totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ","); + ret = smlSnprintf(sql, &totalLen, capacity, ","); + if (ret != 0) { + goto _cleanup; + } } } --totalLen; - totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")"); + ret = smlSnprintf(sql, &totalLen, capacity, ")"); + if (ret != 0) { + goto _cleanup; + } } +_cleanup: free(colKVs); - sql[totalLen] = '\0'; - - tscDebug("SML:0x%" PRIx64 " insert child table table %s of super table %s sql: %s", info->id, cTableName, sTableName, - sql); - - bool tryAgain = false; - int32_t try = 0; - do { - TAOS_RES* res = taos_query(taos, sql); - code = taos_errno(res); - if (code != 0) { - tscError("SML:0x%"PRIx64 " taos_query return %d:%s", info->id, code, taos_errstr(res)); - } - - tscDebug("SML:0x%"PRIx64 " taos_query inserted %d rows", info->id, taos_affected_rows(res)); - info->affectedRows += taos_affected_rows(res); - taos_free_result(res); - - tryAgain = false; - if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID - || code == TSDB_CODE_VND_INVALID_VGROUP_ID - || code == TSDB_CODE_TDB_TABLE_RECONFIGURE - || code == TSDB_CODE_APP_NOT_READY - || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) { - tryAgain = true; - } - - if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) { - TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE"); - int32_t code2 = taos_errno(res2); - if (code2 != TSDB_CODE_SUCCESS) { - tscError("SML:0x%" PRIx64 " insert child table by sql. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - if (tryAgain) { - taosMsleep(100 * (2 << try)); - } - } - - if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { - if (tryAgain) { - taosMsleep( 100 * (2 << try)); - } - } - } while (tryAgain); - free(sql); + if (r == fromIndex) { + tscError("buffer can not fit one line"); + *cTableSqlLen = 0; + } else { + *cTableSqlLen = totalLen; + } + *nextIndex = r; - return code; + return 0; } -static int32_t applyChildTableDataPointsWithStmt(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, - SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { - size_t numTags = taosArrayGetSize(sTableSchema->tags); - size_t numCols = taosArrayGetSize(sTableSchema->fields); - size_t rows = taosArrayGetSize(cTablePoints); +static void insertCallback(void *param, TAOS_RES *res, int32_t notUsedCode) { + SSmlSqlInsertBatch *batch = (SSmlSqlInsertBatch *)param; + batch->code = taos_errno(res); - TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0}; - for (int i= 0; i < rows; ++i) { - TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i); - for (int j = 0; j < pDataPoint->tagNum; ++j) { - TAOS_SML_KV* kv = pDataPoint->tags + j; - tagKVs[kv->fieldSchemaIdx] = kv; - } + if (batch->code != 0) { + tscError("SML:0x%"PRIx64 " batch %d , taos_query_a return %d:%s", batch->id, batch->index, batch->code, taos_errstr(res)); } + tscDebug("SML:0x%"PRIx64 " batch %d, taos_query inserted %d rows", batch->id, batch->index, taos_affected_rows(res)); + batch->affectedRows = taos_affected_rows(res); + taos_free_result(res); - //tag bind - SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); - taosArraySetSize(tagBinds, numTags); - int isNullColBind = TSDB_TRUE; - for (int j = 0; j < numTags; ++j) { - TAOS_BIND* bind = taosArrayGet(tagBinds, j); - bind->is_null = &isNullColBind; - } - for (int j = 0; j < numTags; ++j) { - if (tagKVs[j] == NULL) continue; - TAOS_SML_KV* kv = tagKVs[j]; - TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx); - bind->buffer_type = kv->type; - bind->length = malloc(sizeof(uintptr_t*)); - *bind->length = kv->length; - bind->buffer = kv->value; - bind->is_null = NULL; + int32_t code = batch->code; + batch->tryAgain = false; + batch->resetQueryCache = false; + batch->sleep = false; + if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID + || code == TSDB_CODE_VND_INVALID_VGROUP_ID + || code == TSDB_CODE_TDB_TABLE_RECONFIGURE + || code == TSDB_CODE_APP_NOT_READY + || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && batch->tryTimes < TSDB_MAX_REPLICA) { + batch->tryAgain = true; } - //rows bind - SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES); - for (int i = 0; i < rows; ++i) { - TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, i); - - TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND)); - if (colBinds == NULL) { - tscError("SML:0x%"PRIx64" taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, " - "num of rows: %zu, num of cols: %zu", info->id, rows, numCols); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - for (int j = 0; j < numCols; ++j) { - TAOS_BIND* bind = colBinds + j; - bind->is_null = &isNullColBind; - } - for (int j = 0; j < point->fieldNum; ++j) { - TAOS_SML_KV* kv = point->fields + j; - TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx; - bind->buffer_type = kv->type; - bind->length = malloc(sizeof(uintptr_t*)); - *bind->length = kv->length; - bind->buffer = kv->value; - bind->is_null = NULL; + if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) { + batch->resetQueryCache = true; + if (batch->tryAgain) { + batch->sleep = true; } - taosArrayPush(rowsBind, &colBinds); - } - - int32_t code = 0; - code = insertChildTablePointsBatch(taos, cTableName, sTableName, sTableSchema->tags, tagBinds, sTableSchema->fields, rowsBind, rowSize, info); - if (code != 0) { - tscError("SML:0x%"PRIx64" insert into child table %s failed. error %s", info->id, cTableName, tstrerror(code)); } - //free rows bind - for (int i = 0; i < rows; ++i) { - TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i); - for (int j = 0; j < numCols; ++j) { - TAOS_BIND* bind = colBinds + j; - free(bind->length); + if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + if (batch->tryAgain) { + batch->sleep = true; } - free(colBinds); } - taosArrayDestroy(&rowsBind); - //free tag bind - for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) { - TAOS_BIND* bind = taosArrayGet(tagBinds, i); - free(bind->length); - } - taosArrayDestroy(&tagBinds); - return code; + + tsem_post(&batch->sem); } -static int32_t insertChildTablePointsBatch(TAOS* taos, char* cTableName, char* sTableName, - SArray* tagsSchema, SArray* tagsBind, - SArray* colsSchema, SArray* rowsBind, - size_t rowSize, SSmlLinesInfo* info) { - size_t numTags = taosArrayGetSize(tagsSchema); - size_t numCols = taosArrayGetSize(colsSchema); - char* sql = malloc(tsMaxSQLStringLen+1); - if (sql == NULL) { - tscError("malloc sql memory error"); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } +static int32_t applyDataPointsWithSqlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) { + int32_t code = TSDB_CODE_SUCCESS; - int32_t freeBytes = tsMaxSQLStringLen + 1 ; - sprintf(sql, "insert into ? using %s (", sTableName); - for (int i = 0; i < numTags; ++i) { - SSchema* tagSchema = taosArrayGet(tagsSchema, i); - snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name); + SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas, info); + + for (int i = 0; i < MAX_SML_SQL_INSERT_BATCHES; ++i) { + info->batches[i].id = info->id; + info->batches[i].index = i; + info->batches[i].sql = NULL; + info->batches[i].tryTimes = 0; + tsem_init(&info->batches[i].sem, 0, 0); } - snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")"); - snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags ("); + info->numBatches = 0; + SSmlSqlInsertBatch *batch = info->batches; + batch->sql = malloc(tsMaxSQLStringLen + 1); + int32_t freeBytes = tsMaxSQLStringLen; + int32_t usedBytes = sprintf(batch->sql, "insert into"); + freeBytes -= usedBytes; - for (int i = 0; i < numTags; ++i) { - snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); - } - snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ") ("); + int32_t cTableSqlLen = 0; - for (int i = 0; i < numCols; ++i) { - SSchema* colSchema = taosArrayGet(colsSchema, i); - snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name); - } - snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values ("); + SArray** pCTablePoints = taosHashIterate(cname2points, NULL); + while (pCTablePoints) { + SArray* cTablePoints = *pCTablePoints; - for (int i = 0; i < numCols; ++i) { - snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); - } - snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")"); - sql[strlen(sql)] = '\0'; + TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0); + SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); - tscDebug("SML:0x%"PRIx64" insert child table table %s of super table %s : %s", info->id, cTableName, sTableName, sql); + int32_t nextIndex = 0; + int32_t fromIndex = nextIndex; + + while (nextIndex != taosArrayGetSize(cTablePoints)) { + fromIndex = nextIndex; + code = addChildTableDataPointsToInsertSql(point->childTableName, point->stableName, sTableSchema, cTablePoints, + batch->sql + usedBytes, freeBytes, &cTableSqlLen, fromIndex, &nextIndex, + info); + tscDebug("SML:0x%"PRIx64" add child table points to SQL. child table: %s of super table %s. range[%d-%d).", + info->id, point->childTableName, point->stableName, fromIndex, nextIndex); + usedBytes += cTableSqlLen; + freeBytes -= cTableSqlLen; + if (nextIndex != taosArrayGetSize(cTablePoints)) { + batch->sql[usedBytes] = '\0'; + info->numBatches++; + tscDebug("SML:0x%"PRIx64" sql: %s" , info->id, batch->sql); + + if (info->numBatches >= MAX_SML_SQL_INSERT_BATCHES) { + tscError("SML:0x%"PRIx64" Apply points failed. exceeds max sql insert batches", info->id); + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto cleanup; + } - size_t maxBatchSize = TSDB_MAX_WAL_SIZE/rowSize * 2 / 3; - size_t rows = taosArrayGetSize(rowsBind); - size_t batchSize = MIN(maxBatchSize, rows); - tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu, batch size: %zu", - info->id, cTableName, rows, batchSize); - SArray* batchBind = taosArrayInit(batchSize, POINTER_BYTES); - int32_t code = TSDB_CODE_SUCCESS; - for (int i = 0; i < rows;) { - int j = i; - for (; j < i + batchSize && j i) { - tscDebug("SML:0x%"PRIx64" insert child table batch from line %d to line %d.", info->id, i, j - 1); - code = doInsertChildTablePoints(taos, sql, cTableName, tagsBind, batchBind, info); - if (code != 0) { - taosArrayDestroy(&batchBind); - tfree(sql); - return code; + batch = &info->batches[info->numBatches]; + batch->sql = malloc(tsMaxSQLStringLen + 1); + freeBytes = tsMaxSQLStringLen; + usedBytes = sprintf(batch->sql, "insert into"); + freeBytes -= usedBytes; } - taosArrayClear(batchBind); } - i = j; - } - taosArrayDestroy(&batchBind); - tfree(sql); - return code; - -} -static int32_t doInsertChildTablePoints(TAOS* taos, char* sql, char* cTableName, SArray* tagsBind, SArray* batchBind, - SSmlLinesInfo* info) { - int32_t code = 0; - TAOS_STMT* stmt = taos_stmt_init(taos); - if (stmt == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; + pCTablePoints = taosHashIterate(cname2points, pCTablePoints); } - - code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql)); - - if (code != 0) { - tscError("SML:0x%"PRIx64" taos_stmt_prepare return %d:%s", info->id, code, taos_stmt_errstr(stmt)); - taos_stmt_close(stmt); - return code; + batch->sql[usedBytes] = '\0'; + info->numBatches++; + tscDebug("SML:0x%"PRIx64" sql: %s" , info->id, batch->sql); + if (info->numBatches >= MAX_SML_SQL_INSERT_BATCHES) { + tscError("SML:0x%"PRIx64" Apply points failed. exceeds max sql insert batches", info->id); + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto cleanup; } + bool batchesExecuted[MAX_SML_SQL_INSERT_BATCHES] = {false}; - bool tryAgain = false; - int32_t try = 0; - do { - code = taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind)); - if (code != 0) { - tscError("SML:0x%"PRIx64" taos_stmt_set_tbname return %d:%s", info->id, code, taos_stmt_errstr(stmt)); - - int affectedRows = taos_stmt_affected_rows(stmt); - info->affectedRows += affectedRows; - - taos_stmt_close(stmt); - return code; - } - - size_t rows = taosArrayGetSize(batchBind); - for (int32_t i = 0; i < rows; ++i) { - TAOS_BIND* colsBinds = taosArrayGetP(batchBind, i); - code = taos_stmt_bind_param(stmt, colsBinds); - if (code != 0) { - tscError("SML:0x%"PRIx64" taos_stmt_bind_param return %d:%s", info->id, code, taos_stmt_errstr(stmt)); - - int affectedRows = taos_stmt_affected_rows(stmt); - info->affectedRows += affectedRows; - - taos_stmt_close(stmt); - return code; - } - code = taos_stmt_add_batch(stmt); - if (code != 0) { - tscError("SML:0x%"PRIx64" taos_stmt_add_batch return %d:%s", info->id, code, taos_stmt_errstr(stmt)); - - int affectedRows = taos_stmt_affected_rows(stmt); - info->affectedRows += affectedRows; + for (int i = 0; i < info->numBatches; ++i) { + SSmlSqlInsertBatch* insertBatch = &info->batches[i]; + insertBatch->tryTimes = 1; + taos_query_a(taos, insertBatch->sql, insertCallback, insertBatch); + batchesExecuted[i] = true; + } + int32_t triedBatches = info->numBatches; - taos_stmt_close(stmt); - return code; + while (triedBatches > 0) { + for (int i = 0; i < info->numBatches; ++i) { + if (batchesExecuted[i]) { + tsem_wait(&info->batches[i].sem); + info->affectedRows += info->batches[i].affectedRows; } } - code = taos_stmt_execute(stmt); - if (code != 0) { - tscError("SML:0x%"PRIx64" taos_stmt_execute return %d:%s, try:%d", info->id, code, taos_stmt_errstr(stmt), try); - } - tscDebug("SML:0x%"PRIx64" taos_stmt_execute inserted %d rows", info->id, taos_stmt_affected_rows(stmt)); - - tryAgain = false; - if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID - || code == TSDB_CODE_VND_INVALID_VGROUP_ID - || code == TSDB_CODE_TDB_TABLE_RECONFIGURE - || code == TSDB_CODE_APP_NOT_READY - || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) { - tryAgain = true; + for (int i = 0; i < info->numBatches; ++i) { + SSmlSqlInsertBatch* b = info->batches + i; + if (b->resetQueryCache) { + TAOS_RES* res = taos_query(taos, "RESET QUERY CACHE"); + taos_free_result(res); + break; + } } - if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) { - TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE"); - int32_t code2 = taos_errno(res2); - if (code2 != TSDB_CODE_SUCCESS) { - tscError("SML:0x%" PRIx64 " insert child table. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - if (tryAgain) { - taosMsleep(100 * (2 << try)); + for (int i = 0; i < info->numBatches; ++i) { + SSmlSqlInsertBatch* b = info->batches + i; + if (b->sleep) { + taosMsleep(100 * (2 << b->tryTimes)); + break; } } - if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { - if (tryAgain) { - taosMsleep( 100 * (2 << try)); + + memset(batchesExecuted, 0, sizeof(batchesExecuted)); + triedBatches = 0; + for (int i = 0; i < info->numBatches; ++i) { + SSmlSqlInsertBatch* insertBatch = &info->batches[i]; + if (insertBatch->tryAgain) { + insertBatch->tryTimes++; + taos_query_a(taos, insertBatch->sql, insertCallback, insertBatch); + batchesExecuted[i] = true; + triedBatches++; } } - } while (tryAgain); - - int affectedRows = taos_stmt_affected_rows(stmt); - info->affectedRows += affectedRows; - - taos_stmt_close(stmt); - return code; - - return 0; -} - -static int32_t applyChildTableDataPoints(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, - SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { - int32_t code = TSDB_CODE_SUCCESS; - size_t childTableDataPoints = taosArrayGetSize(cTablePoints); - if (childTableDataPoints < 10) { - code = applyChildTableDataPointsWithInsertSQL(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info); - } else { - code = applyChildTableDataPointsWithStmt(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info); } - return code; -} - -static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) { - int32_t code = TSDB_CODE_SUCCESS; - - SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas, info); - SArray** pCTablePoints = taosHashIterate(cname2points, NULL); - while (pCTablePoints) { - SArray* cTablePoints = *pCTablePoints; - - TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0); - SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); - - size_t rowSize = 0; - size_t numCols = taosArrayGetSize(sTableSchema->fields); - for (int i = 0; i < numCols; ++i) { - SSchema* colSchema = taosArrayGet(sTableSchema->fields, i); - rowSize += colSchema->bytes; - } - - tscDebug("SML:0x%"PRIx64" apply child table points. child table: %s of super table %s, row size: %zu", - info->id, point->childTableName, point->stableName, rowSize); - code = applyChildTableDataPoints(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, rowSize, info); - if (code != 0) { - tscError("SML:0x%"PRIx64" Apply child table points failed. child table %s, error %s", info->id, point->childTableName, tstrerror(code)); - goto cleanup; + code = 0; + for (int i = 0; i < info->numBatches; ++i) { + SSmlSqlInsertBatch* b = info->batches + i; + if (b->code != 0) { + code = b->code; } - - tscDebug("SML:0x%"PRIx64" successfully applied data points of child table %s", info->id, point->childTableName); - - pCTablePoints = taosHashIterate(cname2points, pCTablePoints); } cleanup: + for (int i = 0; i < MAX_SML_SQL_INSERT_BATCHES; ++i) { + free(info->batches[i].sql); + info->batches[i].sql = NULL; + tsem_destroy(&info->batches[i].sem); + } + pCTablePoints = taosHashIterate(cname2points, NULL); while (pCTablePoints) { SArray* pPoints = *pCTablePoints; @@ -1298,6 +1178,7 @@ static int doSmlInsertOneDataPoint(TAOS* taos, TAOS_SML_DATA_POINT* point, SSmlL for (int col = 1; col < point->fieldNum; ++col) { TAOS_SML_KV* kv = point->fields + col; int32_t len = 0; + if (freeBytes - sqlLen <= kv->length) { tscError("SML:0x%" PRIx64 " no free space for converToStr", info->id); return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -1362,7 +1243,7 @@ int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLine } tscDebug("SML:0x%"PRIx64" apply data points", info->id); - code = applyDataPoints(taos, points, numPoint, stableSchemas, info); + code = applyDataPointsWithSqlInsert(taos, points, numPoint, stableSchemas, info); if (code != 0) { tscError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code)); } @@ -2163,7 +2044,7 @@ static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **index, SHashObj *pHash return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; } - pKV->key = calloc(len + TS_BACKQUOTE_CHAR_SIZE + 1, 1); + pKV->key = malloc(len + TS_BACKQUOTE_CHAR_SIZE + 1); memcpy(pKV->key, key, len + 1); addEscapeCharToString(pKV->key, len); tscDebug("SML:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len); @@ -2206,7 +2087,7 @@ static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **index, switch (tag_state) { case tag_common: if (back_slash == true) { - if (*cur != ',' && *cur != '=' && *cur != ' ') { + if (*cur != ',' && *cur != '=' && *cur != ' ' && *cur != 'n' ) { tscError("SML:0x%"PRIx64" tag value: state(%d), incorrect character(%c) escaped", info->id, tag_state, *cur); ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR; goto error; @@ -2271,7 +2152,7 @@ static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **index, break; case tag_lqoute: if (back_slash == true) { - if (*cur != ',' && *cur != '=' && *cur != ' ') { + if (*cur != ',' && *cur != '=' && *cur != ' ' && *cur != 'n') { tscError("SML:0x%"PRIx64" tag value: state(%d), incorrect character(%c) escaped", info->id, tag_state, *cur); ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR; goto error; @@ -2342,7 +2223,7 @@ static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **index, switch (val_state) { case val_common: if (back_slash == true) { - if (*cur != '\\' && *cur != '"') { + if (*cur != '\\' && *cur != '"' && *cur != 'n') { tscError("SML:0x%"PRIx64" field value: state(%d), incorrect character(%c) escaped", info->id, val_state, *cur); ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR; goto error; @@ -2437,7 +2318,7 @@ static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **index, break; case val_lqoute: if (back_slash == true) { - if (*cur != '\\' && *cur != '"') { + if (*cur != '\\' && *cur != '"' && *cur != 'n') { tscError("SML:0x%"PRIx64" field value: state(%d), incorrect character(%c) escaped", info->id, val_state, *cur); ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR; goto error; @@ -2606,13 +2487,16 @@ static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs, int32_t capacity = 0; if (isField) { capacity = 64; - *pKVs = calloc(capacity, sizeof(TAOS_SML_KV)); + *pKVs = malloc(capacity * sizeof(TAOS_SML_KV)); + memset(*pKVs, 0, capacity * sizeof(TAOS_SML_KV)); // leave space for timestamp; pkv = *pKVs; pkv++; + *num_kvs = 1; // ts fixed column } else { capacity = 8; - *pKVs = calloc(capacity, sizeof(TAOS_SML_KV)); + *pKVs = malloc(capacity * sizeof(TAOS_SML_KV)); + memset(*pKVs, 0, capacity * sizeof(TAOS_SML_KV)); pkv = *pKVs; } @@ -2673,7 +2557,7 @@ static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs, *pKVs = more_kvs; //move pKV points to next TAOS_SML_KV block if (isField) { - pkv = *pKVs + *num_kvs + 1; + pkv = *pKVs + *num_kvs; // first ts column reserved } else { pkv = *pKVs + *num_kvs; } @@ -2695,7 +2579,7 @@ static void moveTimeStampToFirstKv(TAOS_SML_DATA_POINT** smlData, TAOS_SML_KV *t tsField->key = malloc(strlen(ts->key) + 1); memcpy(tsField->key, ts->key, strlen(ts->key) + 1); memcpy(tsField->value, ts->value, ts->length); - (*smlData)->fieldNum = (*smlData)->fieldNum + 1; + //(*smlData)->fieldNum = (*smlData)->fieldNum + 1; // already reserved for first ts column free(ts->key); free(ts->value); @@ -2707,7 +2591,7 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInf int32_t ret = TSDB_CODE_SUCCESS; uint8_t has_tags = 0; TAOS_SML_KV *timestamp = NULL; - SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + SHashObj *keyHashTable = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); ret = parseSmlMeasurement(smlData, &index, &has_tags, info); if (ret) { @@ -2753,14 +2637,21 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInf //========================================================================= void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) { + TAOS_SML_KV *pkv; for (int i=0; itagNum; ++i) { - free((point->tags+i)->key); - free((point->tags+i)->value); + pkv = point->tags + i; + if (pkv->key) + free(pkv->key); + if (pkv->value) + free(pkv->value); } free(point->tags); for (int i=0; ifieldNum; ++i) { - free((point->fields+i)->key); - free((point->fields+i)->value); + pkv = point->fields + i; + if (pkv->key) + free(pkv->key); + if (pkv->value) + free(pkv->value); } free(point->fields); free(point->stableName); @@ -2792,8 +2683,8 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType p info->tsType = tsType; info->protocol = protocol; - if (numLines <= 0 || numLines > 65536) { - tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines); + if (numLines <= 0 || numLines > 65536*32) { + tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536*32. numLines: %d", info->id, numLines); tfree(info); code = TSDB_CODE_TSC_APP_ERROR; return code;