diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index acf3248518811784bfd9ce5228388075666cc428..de3b7a4b3da6971b2c84bcaaaee2478787fc15cc 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -32,6 +32,10 @@ typedef struct { static uint64_t linesSmlHandleId = 0; +static int32_t insertChildTablePointsBatch(void* pVoid, char* name, char* name1, SArray* pArray, SArray* pArray1, + SArray* pArray2, SArray* pArray3, size_t size, SSmlLinesInfo* info); +static int32_t doInsertChildTablePoints(void* pVoid, char* sql, char* name, SArray* pArray, SArray* pArray1, + SSmlLinesInfo* info); uint64_t genLinesSmlId() { uint64_t id; @@ -177,11 +181,10 @@ static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableNa MD5Init(&context); MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); MD5Final(&context); + uint64_t digest1 = *(uint64_t*)(context.digest); + uint64_t digest2 = *(uint64_t*)(context.digest + 8); *tableNameLen = snprintf(tableName, *tableNameLen, - "t_%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], - context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6], - context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], - context.digest[12], context.digest[13], context.digest[14], context.digest[15]); + "t_%016"PRIx64"%016"PRIx64, digest1, digest2); taosStringBuilderDestroy(&sb); tscDebug("SML:0x%"PRIx64" child table name: %s", info->id, tableName); return 0; @@ -198,7 +201,6 @@ static int32_t buildSmlChildTableName(TAOS_SML_DATA_POINT* point, SSmlLinesInfo* return 0; } - static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas, SSmlLinesInfo* info) { int32_t code = 0; SHashObj* sname2shema = taosHashInit(32, @@ -219,8 +221,8 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, schema.sTableName[stableNameLen] = '\0'; schema.fields = taosArrayInit(64, sizeof(SSchema)); schema.tags = taosArrayInit(8, sizeof(SSchema)); - schema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + schema.tagHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + schema.fieldHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); pStableSchema = taosArrayPush(stableSchemas, &schema); stableIdx = taosArrayGetSize(stableSchemas) - 1; @@ -555,11 +557,75 @@ static int32_t fillDbSchema(STableMeta* tableMeta, char* tableName, SSmlSTableSc return TSDB_CODE_SUCCESS; } +static int32_t getSuperTableMetaFromLocalCache(TAOS* taos, char* tableName, STableMeta** outTableMeta, SSmlLinesInfo* info) { + int32_t code = 0; + STableMeta* tableMeta = NULL; + + SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + if (pSql == NULL) { + tscError("SML:0x%" PRIx64 " failed to allocate memory, reason:%s", info->id, strerror(errno)); + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + return code; + } + pSql->pTscObj = taos; + pSql->signature = pSql; + pSql->fp = NULL; + + registerSqlObj(pSql); + char tableNameBuf[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0}; + memcpy(tableNameBuf, tableName, strlen(tableName)); + SStrToken tableToken = {.z = tableNameBuf, .n = (uint32_t)strlen(tableName), .type = TK_ID}; + tGetToken(tableNameBuf, &tableToken.type); + bool dbIncluded = false; + // Check if the table name available or not + if (tscValidateName(&tableToken, true, &dbIncluded) != TSDB_CODE_SUCCESS) { + code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; + sprintf(pSql->cmd.payload, "table name is invalid"); + taosReleaseRef(tscObjRef, pSql->self); + return code; + } + + SName sname = {0}; + if ((code = tscSetTableFullName(&sname, &tableToken, pSql, dbIncluded)) != TSDB_CODE_SUCCESS) { + taosReleaseRef(tscObjRef, pSql->self); + return code; + } + + char fullTableName[TSDB_TABLE_FNAME_LEN] = {0}; + memset(fullTableName, 0, tListLen(fullTableName)); + tNameExtractFullName(&sname, fullTableName); + + size_t size = 0; + taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size); + + STableMeta* stableMeta = tableMeta; + if (tableMeta != NULL && tableMeta->tableType == TSDB_CHILD_TABLE) { + taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), tableMeta->sTableName, strlen(tableMeta->sTableName), NULL, + (void**)stableMeta, &size); + } + taosReleaseRef(tscObjRef, pSql->self); + + if (stableMeta != tableMeta) { + free(tableMeta); + } + + if (stableMeta != NULL) { + if (outTableMeta != NULL) { + *outTableMeta = stableMeta; + } else { + free(stableMeta); + } + return TSDB_CODE_SUCCESS; + } else { + return TSDB_CODE_TSC_NO_META_CACHED; + } +} + static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTableMeta, SSmlLinesInfo* info) { int32_t code = 0; int32_t retries = 0; STableMeta* tableMeta = NULL; - while (retries++ < TSDB_MAX_REPLICA && tableMeta == NULL) { + while (retries++ <= TSDB_MAX_REPLICA && tableMeta == NULL) { STscObj* pObj = (STscObj*)taos; if (pObj == NULL || pObj->signature != pObj) { terrno = TSDB_CODE_TSC_DISCONNECTED; @@ -567,55 +633,24 @@ static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTabl } tscDebug("SML:0x%" PRIx64 " retrieve table meta. super table name: %s", info->id, tableName); - - char sql[256]; - snprintf(sql, 256, "describe %s", tableName); - TAOS_RES* res = taos_query(taos, sql); - code = taos_errno(res); - if (code != 0) { - tscError("SML:0x%" PRIx64 " describe table failure. %s", info->id, taos_errstr(res)); + code = getSuperTableMetaFromLocalCache(taos, tableName, &tableMeta, info); + if (code == TSDB_CODE_SUCCESS) { + tscDebug("SML:0x%" PRIx64 " successfully retrieved table meta. super table name: %s", info->id, tableName); + break; + } else if (code == TSDB_CODE_TSC_NO_META_CACHED) { + char sql[256]; + snprintf(sql, 256, "describe %s", tableName); + TAOS_RES* res = taos_query(taos, sql); + code = taos_errno(res); + if (code != 0) { + tscError("SML:0x%" PRIx64 " describe table failure. %s", info->id, taos_errstr(res)); + taos_free_result(res); + return code; + } taos_free_result(res); + } else { return code; } - taos_free_result(res); - - SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); - if (pSql == NULL) { - tscError("SML:0x%" PRIx64 " failed to allocate memory, reason:%s", info->id, strerror(errno)); - code = TSDB_CODE_TSC_OUT_OF_MEMORY; - return code; - } - pSql->pTscObj = taos; - pSql->signature = pSql; - pSql->fp = NULL; - - registerSqlObj(pSql); - char tableNameBuf[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0}; - memcpy(tableNameBuf, tableName, strlen(tableName)); - SStrToken tableToken = {.z = tableNameBuf, .n = (uint32_t)strlen(tableName), .type = TK_ID}; - tGetToken(tableNameBuf, &tableToken.type); - bool dbIncluded = false; - // Check if the table name available or not - if (tscValidateName(&tableToken, true, &dbIncluded) != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; - sprintf(pSql->cmd.payload, "table name is invalid"); - taosReleaseRef(tscObjRef, pSql->self); - return code; - } - - SName sname = {0}; - if ((code = tscSetTableFullName(&sname, &tableToken, pSql, dbIncluded)) != TSDB_CODE_SUCCESS) { - taosReleaseRef(tscObjRef, pSql->self); - return code; - } - - char fullTableName[TSDB_TABLE_FNAME_LEN] = {0}; - memset(fullTableName, 0, tListLen(fullTableName)); - tNameExtractFullName(&sname, fullTableName); - - size_t size = 0; - taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size); - taosReleaseRef(tscObjRef, pSql->self); } if (tableMeta != NULL) { @@ -718,72 +753,303 @@ static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas, SSmlLinesInfo* return 0; } -static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName, - SArray* tagsSchema, SArray* tagsBind, SSmlLinesInfo* info) { - size_t numTags = taosArrayGetSize(tagsSchema); +static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints, + SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) { + for (int32_t i = 0; i < numPoints; ++i) { + TAOS_SML_DATA_POINT * point = points + i; + SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx); + + for (int j = 0; j < point->tagNum; ++j) { + TAOS_SML_KV* kv = point->tags + j; + if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) { + int64_t ts = *(int64_t*)(kv->value); + ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision); + *(int64_t*)(kv->value) = ts; + } + } + + for (int j = 0; j < point->fieldNum; ++j) { + TAOS_SML_KV* kv = point->fields + j; + if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) { + int64_t ts = *(int64_t*)(kv->value); + ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision); + *(int64_t*)(kv->value) = ts; + } + } + + SArray* cTablePoints = NULL; + SArray** pCTablePoints = taosHashGet(cname2points, point->childTableName, strlen(point->childTableName)); + if (pCTablePoints) { + cTablePoints = *pCTablePoints; + } else { + cTablePoints = taosArrayInit(64, sizeof(point)); + taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES); + } + taosArrayPush(cTablePoints, &point); + } + + return 0; +} + +static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, + SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { + int32_t code = TSDB_CODE_SUCCESS; + size_t numTags = taosArrayGetSize(sTableSchema->tags); + size_t numCols = taosArrayGetSize(sTableSchema->fields); + size_t rows = taosArrayGetSize(cTablePoints); + SArray* tagsSchema = sTableSchema->tags; + SArray* colsSchema = sTableSchema->fields; + + TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0}; + for (int i= 0; i < rows; ++i) { + TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i); + for (int j = 0; j < pDataPoint->tagNum; ++j) { + TAOS_SML_KV* kv = pDataPoint->tags + j; + tagKVs[kv->fieldSchemaIdx] = kv; + } + } + char* sql = malloc(tsMaxSQLStringLen+1); if (sql == NULL) { tscError("malloc sql memory error"); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - int freeBytes = tsMaxSQLStringLen + 1; - sprintf(sql, "create table if not exists %s using %s", cTableName, sTableName); - snprintf(sql+strlen(sql), freeBytes-strlen(sql), "("); + int32_t freeBytes = tsMaxSQLStringLen + 1 ; + int32_t totalLen = 0; + totalLen += sprintf(sql, "insert into %s using %s (", cTableName, sTableName); for (int i = 0; i < numTags; ++i) { SSchema* tagSchema = taosArrayGet(tagsSchema, i); - snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name); + totalLen += snprintf(sql+totalLen, freeBytes-totalLen, "%s,", tagSchema->name); } - snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")"); + --totalLen; + totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ")"); - snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags ("); + totalLen += snprintf(sql + totalLen, freeBytes-totalLen, " tags ("); +// for (int i = 0; i < numTags; ++i) { +// snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); +// } for (int i = 0; i < numTags; ++i) { - snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); + if (tagKVs[i] == NULL) { + totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "NULL,"); + } else { + TAOS_SML_KV* kv = tagKVs[i]; + size_t beforeLen = totalLen; + int32_t len = 0; + converToStr(sql+beforeLen, kv->type, kv->value, kv->length, &len); + totalLen += len; + totalLen += snprintf(sql+totalLen, freeBytes-totalLen, ","); + } } - snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")"); - sql[strlen(sql)] = '\0'; + --totalLen; + totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ") ("); - tscDebug("SML:0x%"PRIx64" create table : %s", info->id, sql); + for (int i = 0; i < numCols; ++i) { + SSchema* colSchema = taosArrayGet(colsSchema, i); + totalLen += snprintf(sql+totalLen, freeBytes-totalLen, "%s,", colSchema->name); + } + --totalLen; + totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ") values "); - TAOS_STMT* stmt = taos_stmt_init(taos); - if (stmt == NULL) { - free(sql); - return TSDB_CODE_TSC_OUT_OF_MEMORY; + TAOS_SML_KV** colKVs = malloc(numCols*sizeof(TAOS_SML_KV*)); + for (int r = 0; r < rows; ++r) { + totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "("); + + memset(colKVs, 0, numCols*sizeof(TAOS_SML_KV*)); + + TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, r); + for (int i = 0; i < point->fieldNum; ++i) { + TAOS_SML_KV* kv = point->fields + i; + colKVs[kv->fieldSchemaIdx] = kv; + } + + for (int i = 0; i < numCols; ++i) { + if (colKVs[i] == NULL) { + totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "NULL,"); + } else { + TAOS_SML_KV* kv = colKVs[i]; + size_t beforeLen = totalLen; + int32_t len = 0; + converToStr(sql+beforeLen, kv->type, kv->value, kv->length, &len); + totalLen += len; + totalLen += snprintf(sql+totalLen, freeBytes-totalLen, ","); + } + } + --totalLen; + totalLen += snprintf(sql+totalLen, freeBytes - totalLen, ")"); } - int32_t code; - code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql)); + free(colKVs); + sql[totalLen] = '\0'; + + tscDebug("SML:0x%"PRIx64" insert child table table %s of super table %s sql: %s", info->id, cTableName, sTableName, sql); + TAOS_RES* res = taos_query(taos, sql); free(sql); + code = taos_errno(res); + info->affectedRows = taos_affected_rows(res); + taos_free_result(res); + return code; +} - if (code != 0) { - tscError("SML:0x%"PRIx64" taos_stmt_prepare returns %d:%s", info->id, code, tstrerror(code)); - taos_stmt_close(stmt); - return code; +static int32_t applyChildTableDataPointsWithStmt(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, + SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { + size_t numTags = taosArrayGetSize(sTableSchema->tags); + size_t numCols = taosArrayGetSize(sTableSchema->fields); + size_t rows = taosArrayGetSize(cTablePoints); + + TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0}; + for (int i= 0; i < rows; ++i) { + TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i); + for (int j = 0; j < pDataPoint->tagNum; ++j) { + TAOS_SML_KV* kv = pDataPoint->tags + j; + tagKVs[kv->fieldSchemaIdx] = kv; + } } - code = taos_stmt_bind_param(stmt, TARRAY_GET_START(tagsBind)); - if (code != 0) { - tscError("SML:0x%"PRIx64" taos_stmt_bind_param returns %d:%s", info->id, code, tstrerror(code)); - taos_stmt_close(stmt); - return code; + //tag bind + SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); + taosArraySetSize(tagBinds, numTags); + int isNullColBind = TSDB_TRUE; + for (int j = 0; j < numTags; ++j) { + TAOS_BIND* bind = taosArrayGet(tagBinds, j); + bind->is_null = &isNullColBind; + } + for (int j = 0; j < numTags; ++j) { + if (tagKVs[j] == NULL) continue; + TAOS_SML_KV* kv = tagKVs[j]; + TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx); + bind->buffer_type = kv->type; + bind->length = malloc(sizeof(uintptr_t*)); + *bind->length = kv->length; + bind->buffer = kv->value; + bind->is_null = NULL; } - code = taos_stmt_execute(stmt); - if (code != 0) { - tscError("SML:0x%"PRIx64" taos_stmt_execute returns %d:%s", info->id, code, tstrerror(code)); - taos_stmt_close(stmt); - return code; + //rows bind + SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES); + for (int i = 0; i < rows; ++i) { + TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, i); + + TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND)); + if (colBinds == NULL) { + tscError("SML:0x%"PRIx64" taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, " + "num of rows: %zu, num of cols: %zu", info->id, rows, numCols); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + for (int j = 0; j < numCols; ++j) { + TAOS_BIND* bind = colBinds + j; + bind->is_null = &isNullColBind; + } + for (int j = 0; j < point->fieldNum; ++j) { + TAOS_SML_KV* kv = point->fields + j; + TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx; + bind->buffer_type = kv->type; + bind->length = malloc(sizeof(uintptr_t*)); + *bind->length = kv->length; + bind->buffer = kv->value; + bind->is_null = NULL; + } + taosArrayPush(rowsBind, &colBinds); } - code = taos_stmt_close(stmt); + int32_t code = 0; + code = insertChildTablePointsBatch(taos, cTableName, sTableName, sTableSchema->tags, tagBinds, sTableSchema->fields, rowsBind, rowSize, info); if (code != 0) { - tscError("SML:0x%"PRIx64" taos_stmt_close return %d:%s", info->id, code, tstrerror(code)); - return code; + tscError("SML:0x%"PRIx64" insert into child table %s failed. error %s", info->id, cTableName, tstrerror(code)); + } + + //free rows bind + for (int i = 0; i < rows; ++i) { + TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i); + for (int j = 0; j < numCols; ++j) { + TAOS_BIND* bind = colBinds + j; + free(bind->length); + } + free(colBinds); } + taosArrayDestroy(rowsBind); + //free tag bind + for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) { + TAOS_BIND* bind = taosArrayGet(tagBinds, i); + free(bind->length); + } + taosArrayDestroy(tagBinds); return code; } -static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableName, SArray* batchBind, SSmlLinesInfo* info) { +static int32_t insertChildTablePointsBatch(TAOS* taos, char* cTableName, char* sTableName, + SArray* tagsSchema, SArray* tagsBind, + SArray* colsSchema, SArray* rowsBind, + size_t rowSize, SSmlLinesInfo* info) { + size_t numTags = taosArrayGetSize(tagsSchema); + size_t numCols = taosArrayGetSize(colsSchema); + char* sql = malloc(tsMaxSQLStringLen+1); + if (sql == NULL) { + tscError("malloc sql memory error"); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + int32_t freeBytes = tsMaxSQLStringLen + 1 ; + sprintf(sql, "insert into ? using %s (", sTableName); + for (int i = 0; i < numTags; ++i) { + SSchema* tagSchema = taosArrayGet(tagsSchema, i); + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name); + } + snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")"); + + snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags ("); + + for (int i = 0; i < numTags; ++i) { + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); + } + snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ") ("); + + for (int i = 0; i < numCols; ++i) { + SSchema* colSchema = taosArrayGet(colsSchema, i); + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name); + } + snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values ("); + + for (int i = 0; i < numCols; ++i) { + snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); + } + snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")"); + sql[strlen(sql)] = '\0'; + + tscDebug("SML:0x%"PRIx64" insert child table table %s of super table %s : %s", info->id, cTableName, sTableName, sql); + + size_t rows = taosArrayGetSize(rowsBind); + size_t maxBatchSize = TSDB_MAX_WAL_SIZE/rowSize * 4 / 5; + size_t batchSize = MIN(maxBatchSize, rows); + tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu, batch size: %zu", + info->id, cTableName, rows, batchSize); + SArray* batchBind = taosArrayInit(batchSize, POINTER_BYTES); + int32_t code = TSDB_CODE_SUCCESS; + for (int i = 0; i < rows;) { + int j = i; + for (; j < i + batchSize && j i) { + tscDebug("SML:0x%"PRIx64" insert child table batch from line %d to line %d.", info->id, i, j - 1); + code = doInsertChildTablePoints(taos, sql, cTableName, tagsBind, batchBind, info); + if (code != 0) { + taosArrayDestroy(batchBind); + tfree(sql); + return code; + } + taosArrayClear(batchBind); + } + i = j; + } + taosArrayDestroy(batchBind); + tfree(sql); + return code; + +} +static int32_t doInsertChildTablePoints(TAOS* taos, char* sql, char* cTableName, SArray* tagsBind, SArray* batchBind, + SSmlLinesInfo* info) { int32_t code = 0; TAOS_STMT* stmt = taos_stmt_init(taos); @@ -802,7 +1068,7 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam bool tryAgain = false; int32_t try = 0; do { - code = taos_stmt_set_tbname(stmt, cTableName); + code = taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind)); if (code != 0) { tscError("SML:0x%"PRIx64" taos_stmt_set_tbname return %d:%s", info->id, code, taos_stmt_errstr(stmt)); @@ -843,7 +1109,7 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam tscError("SML:0x%"PRIx64" taos_stmt_execute return %d:%s, try:%d", info->id, code, taos_stmt_errstr(stmt), try); } tscDebug("SML:0x%"PRIx64" taos_stmt_execute inserted %d rows", info->id, taos_stmt_affected_rows(stmt)); - + tryAgain = false; if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID @@ -876,189 +1142,19 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam taos_stmt_close(stmt); return code; -} - -static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* rowsBind, size_t rowSize, SSmlLinesInfo* info) { - size_t numCols = taosArrayGetSize(colsSchema); - char* sql = malloc(tsMaxSQLStringLen+1); - if (sql == NULL) { - tscError("malloc sql memory error"); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - int32_t freeBytes = tsMaxSQLStringLen + 1 ; - sprintf(sql, "insert into ? ("); - - for (int i = 0; i < numCols; ++i) { - SSchema* colSchema = taosArrayGet(colsSchema, i); - snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name); - } - snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values ("); - - for (int i = 0; i < numCols; ++i) { - snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,"); - } - snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")"); - sql[strlen(sql)] = '\0'; - - size_t rows = taosArrayGetSize(rowsBind); - size_t maxBatchSize = TSDB_MAX_WAL_SIZE/rowSize * 4 / 5; - size_t batchSize = MIN(maxBatchSize, rows); - tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu, batch size: %zu", - info->id, cTableName, rows, batchSize); - SArray* batchBind = taosArrayInit(batchSize, POINTER_BYTES); - int32_t code = TSDB_CODE_SUCCESS; - for (int i = 0; i < rows;) { - int j = i; - for (; j < i + batchSize && j i) { - tscDebug("SML:0x%"PRIx64" insert child table batch from line %d to line %d.", info->id, i, j - 1); - code = doInsertChildTableWithStmt(taos, sql, cTableName, batchBind, info); - if (code != 0) { - taosArrayDestroy(batchBind); - tfree(sql); - return code; - } - taosArrayClear(batchBind); - } - i = j; - } - taosArrayDestroy(batchBind); - tfree(sql); - return code; -} - -static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints, - SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) { - for (int32_t i = 0; i < numPoints; ++i) { - TAOS_SML_DATA_POINT * point = points + i; - SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx); - - for (int j = 0; j < point->tagNum; ++j) { - TAOS_SML_KV* kv = point->tags + j; - if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) { - int64_t ts = *(int64_t*)(kv->value); - ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision); - *(int64_t*)(kv->value) = ts; - } - } - - for (int j = 0; j < point->fieldNum; ++j) { - TAOS_SML_KV* kv = point->fields + j; - if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) { - int64_t ts = *(int64_t*)(kv->value); - ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision); - *(int64_t*)(kv->value) = ts; - } - } - - SArray* cTablePoints = NULL; - SArray** pCTablePoints = taosHashGet(cname2points, point->childTableName, strlen(point->childTableName)); - if (pCTablePoints) { - cTablePoints = *pCTablePoints; - } else { - cTablePoints = taosArrayInit(64, sizeof(point)); - taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES); - } - taosArrayPush(cTablePoints, &point); - } return 0; } -static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableName, - SSmlSTableSchema* sTableSchema, SArray* cTablePoints, SSmlLinesInfo* info) { - size_t numTags = taosArrayGetSize(sTableSchema->tags); - size_t rows = taosArrayGetSize(cTablePoints); - - TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0}; - for (int i= 0; i < rows; ++i) { - TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i); - for (int j = 0; j < pDataPoint->tagNum; ++j) { - TAOS_SML_KV* kv = pDataPoint->tags + j; - tagKVs[kv->fieldSchemaIdx] = kv; - } - } - - SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); - taosArraySetSize(tagBinds, numTags); - int isNullColBind = TSDB_TRUE; - for (int j = 0; j < numTags; ++j) { - TAOS_BIND* bind = taosArrayGet(tagBinds, j); - bind->is_null = &isNullColBind; - } - for (int j = 0; j < numTags; ++j) { - if (tagKVs[j] == NULL) continue; - TAOS_SML_KV* kv = tagKVs[j]; - TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx); - bind->buffer_type = kv->type; - bind->length = malloc(sizeof(uintptr_t*)); - *bind->length = kv->length; - bind->buffer = kv->value; - bind->is_null = NULL; - } - - int32_t code = creatChildTableIfNotExists(taos, cTableName, sTableName, sTableSchema->tags, tagBinds, info); - - for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) { - TAOS_BIND* bind = taosArrayGet(tagBinds, i); - free(bind->length); - } - taosArrayDestroy(tagBinds); - return code; -} - -static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, char* cTableName, - SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { +static int32_t applyChildTableDataPoints(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema, + SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) { int32_t code = TSDB_CODE_SUCCESS; - - size_t numCols = taosArrayGetSize(sTableSchema->fields); - size_t rows = taosArrayGetSize(cTablePoints); - SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES); - - int isNullColBind = TSDB_TRUE; - for (int i = 0; i < rows; ++i) { - TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, i); - - TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND)); - if (colBinds == NULL) { - tscError("SML:0x%"PRIx64" taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, " - "num of rows: %zu, num of cols: %zu", info->id, rows, numCols); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - for (int j = 0; j < numCols; ++j) { - TAOS_BIND* bind = colBinds + j; - bind->is_null = &isNullColBind; - } - for (int j = 0; j < point->fieldNum; ++j) { - TAOS_SML_KV* kv = point->fields + j; - TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx; - bind->buffer_type = kv->type; - bind->length = malloc(sizeof(uintptr_t*)); - *bind->length = kv->length; - bind->buffer = kv->value; - bind->is_null = NULL; - } - taosArrayPush(rowsBind, &colBinds); - } - - code = insertChildTableBatch(taos, cTableName, sTableSchema->fields, rowsBind, rowSize, info); - if (code != 0) { - tscError("SML:0x%"PRIx64" insert into child table %s failed. error %s", info->id, cTableName, tstrerror(code)); - } - - for (int i = 0; i < rows; ++i) { - TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i); - for (int j = 0; j < numCols; ++j) { - TAOS_BIND* bind = colBinds + j; - free(bind->length); - } - free(colBinds); + size_t childTableDataPoints = taosArrayGetSize(cTablePoints); + if (childTableDataPoints < 10) { + code = applyChildTableDataPointsWithInsertSQL(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info); + } else { + code = applyChildTableDataPointsWithStmt(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info); } - taosArrayDestroy(rowsBind); return code; } @@ -1075,13 +1171,6 @@ static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0); SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); - tscDebug("SML:0x%"PRIx64" apply child table tags. child table: %s", info->id, point->childTableName); - code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, info); - if (code != 0) { - tscError("apply child table tags failed. child table %s, error %s", point->childTableName, tstrerror(code)); - goto cleanup; - } - size_t rowSize = 0; size_t numCols = taosArrayGetSize(sTableSchema->fields); for (int i = 0; i < numCols; ++i) { @@ -1089,10 +1178,11 @@ static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t rowSize += colSchema->bytes; } - tscDebug("SML:0x%"PRIx64" apply child table points. child table: %s, row size: %zu", info->id, point->childTableName, rowSize); - code = applyChildTableFields(taos, sTableSchema, point->childTableName, cTablePoints, rowSize, info); + tscDebug("SML:0x%"PRIx64" apply child table points. child table: %s of super table %s, row size: %zu", + info->id, point->childTableName, point->stableName, rowSize); + code = applyChildTableDataPoints(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, rowSize, info); if (code != 0) { - tscError("SML:0x%"PRIx64" Apply child table fields failed. child table %s, error %s", info->id, point->childTableName, tstrerror(code)); + tscError("SML:0x%"PRIx64" Apply child table points failed. child table %s, error %s", info->id, point->childTableName, tstrerror(code)); goto cleanup; } @@ -1112,6 +1202,60 @@ cleanup: return code; } +static int doSmlInsertOneDataPoint(TAOS* taos, TAOS_SML_DATA_POINT* point, SSmlLinesInfo* info) { + int32_t code = TSDB_CODE_SUCCESS; + + if (!point->childTableName) { + int tableNameLen = TSDB_TABLE_NAME_LEN; + point->childTableName = calloc(1, tableNameLen + 1); + getSmlMd5ChildTableName(point, point->childTableName, &tableNameLen, info); + point->childTableName[tableNameLen] = '\0'; + } + + STableMeta* tableMeta = NULL; + int32_t ret = getSuperTableMetaFromLocalCache(taos, point->stableName, &tableMeta, info); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + uint8_t precision = tableMeta->tableInfo.precision; + free(tableMeta); + + char* sql = malloc(TSDB_MAX_SQL_LEN + 1); + int freeBytes = TSDB_MAX_SQL_LEN; + int sqlLen = 0; + sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, "insert into %s(", point->childTableName); + for (int col = 0; col < point->fieldNum; ++col) { + TAOS_SML_KV* kv = point->fields + col; + sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, "%s,", kv->key); + } + --sqlLen; + sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, ") values ("); + TAOS_SML_KV* tsField = point->fields + 0; + int64_t ts = *(int64_t*)(tsField->value); + ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, precision); + sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, "%" PRId64 ",", ts); + for (int col = 1; col < point->fieldNum; ++col) { + TAOS_SML_KV* kv = point->fields + col; + int32_t len = 0; + converToStr(sql + sqlLen, kv->type, kv->value, kv->length, &len); + sqlLen += len; + sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, ","); + } + --sqlLen; + sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, ")"); + sql[sqlLen] = 0; + + tscDebug("SML:0x%" PRIx64 " insert child table table %s of super table %s sql: %s", info->id, + point->childTableName, point->stableName, sql); + TAOS_RES* res = taos_query(taos, sql); + free(sql); + code = taos_errno(res); + info->affectedRows = taos_affected_rows(res); + taos_free_result(res); + + return code; +} + int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info) { tscDebug("SML:0x%"PRIx64" taos_sml_insert. number of points: %d", info->id, numPoint); @@ -1119,6 +1263,14 @@ int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLine info->affectedRows = 0; + if (numPoint == 1) { + TAOS_SML_DATA_POINT* point = points + 0; + code = doSmlInsertOneDataPoint(taos, point, info); + if (code == TSDB_CODE_SUCCESS) { + return code; + } + } + tscDebug("SML:0x%"PRIx64" build data point schemas", info->id); SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray code = buildDataPointSchemas(points, numPoint, stableSchemas, info); diff --git a/src/client/src/tscParseOpenTSDB.c b/src/client/src/tscParseOpenTSDB.c index d064ede134129d796928768910c56573712319d1..82c554ee0a0edb3d9fec32cc4b09ba96e32e9285 100644 --- a/src/client/src/tscParseOpenTSDB.c +++ b/src/client/src/tscParseOpenTSDB.c @@ -125,8 +125,9 @@ static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char } tfree(value); - (*pTS)->key = tcalloc(sizeof(key), 1); + (*pTS)->key = tcalloc(sizeof(key) + TS_ESCAPE_CHAR_SIZE, 1); memcpy((*pTS)->key, key, sizeof(key)); + addEscapeCharToString((*pTS)->key, (int32_t)strlen(key)); *num_kvs += 1; *index = cur + 1; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 835b32eaaa198445945aff5ddd72cedc444f8318..e6c4e12e34cee28b0402fe3104d298a4080e6e1c 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -81,23 +81,42 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le break; case TSDB_DATA_TYPE_FLOAT: - n = sprintf(str, "%e", GET_FLOAT_VAL(buf)); + n = sprintf(str, "%.*e", DECIMAL_DIG, GET_FLOAT_VAL(buf)); break; case TSDB_DATA_TYPE_DOUBLE: - n = sprintf(str, "%e", GET_DOUBLE_VAL(buf)); + n = sprintf(str, "%.*e", DECIMAL_DIG, GET_DOUBLE_VAL(buf)); break; case TSDB_DATA_TYPE_BINARY: + if (bufSize < 0) { + tscError("invalid buf size"); + return TSDB_CODE_TSC_INVALID_VALUE; + } + int32_t escapeSize = 0; + *str++ = '\''; + ++escapeSize; + char* data = buf; + for (int32_t i = 0; i < bufSize; ++i) { + if (data[i] == '\'' || data[i] == '"') { + *str++ = '\\'; + ++escapeSize; + } + *str++ = data[i]; + } + *str = '\''; + ++escapeSize; + n = bufSize + escapeSize; + break; case TSDB_DATA_TYPE_NCHAR: if (bufSize < 0) { tscError("invalid buf size"); return TSDB_CODE_TSC_INVALID_VALUE; } - *str = '"'; + *str = '\''; memcpy(str + 1, buf, bufSize); - *(str + bufSize + 1) = '"'; + *(str + bufSize + 1) = '\''; n = bufSize + 2; break; diff --git a/tests/examples/c/schemaless.c b/tests/examples/c/schemaless.c index 9a2d2f063573d26093bd5032e2f68cc54fc5f908..0b46d2ff134689992d648065511dd5c21766759e 100644 --- a/tests/examples/c/schemaless.c +++ b/tests/examples/c/schemaless.c @@ -8,7 +8,8 @@ #include #include -#define MAX_THREAD_LINE_BATCHES 1024 +bool verbose = false; + void printThreadId(pthread_t id, char* buf) { @@ -30,11 +31,10 @@ typedef struct { typedef struct { TAOS* taos; + int protocol; int numBatches; - SThreadLinesBatch batches[MAX_THREAD_LINE_BATCHES]; + SThreadLinesBatch *batches; int64_t costTime; - int tsPrecision; - int lineProtocol; } SThreadInsertArgs; static void* insertLines(void* args) { @@ -43,27 +43,33 @@ static void* insertLines(void* args) { printThreadId(pthread_self(), tidBuf); for (int i = 0; i < insertArgs->numBatches; ++i) { SThreadLinesBatch* batch = insertArgs->batches + i; - printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf); + if (verbose) printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf); int64_t begin = getTimeInUs(); - TAOS_RES *res = taos_schemaless_insert(insertArgs->taos, batch->lines, batch->numLines, insertArgs->lineProtocol, insertArgs->tsPrecision); + //int32_t code = taos_insert_lines(insertArgs->taos, batch->lines, batch->numLines); + TAOS_RES * res = taos_schemaless_insert(insertArgs->taos, batch->lines, batch->numLines, insertArgs->protocol, TSDB_SML_TIMESTAMP_MILLI_SECONDS); int32_t code = taos_errno(res); int64_t end = getTimeInUs(); insertArgs->costTime += end - begin; - printf("code: %d, %s. affected lines:%d time used:%"PRId64", thread: 0x%s\n", code, taos_errstr(res), taos_affected_rows(res), end - begin, tidBuf); - taos_free_result(res); + if (verbose) printf("code: %d, %s. time used:%"PRId64", thread: 0x%s\n", code, tstrerror(code), end - begin, tidBuf); } return NULL; } +int32_t getTelenetTemplate(char* lineTemplate, int templateLen) { + char* sample = "sta%d %lld 44.3 t0=False t1=127i8 t2=32 t3=%di32 t4=9223372036854775807i64 t5=11.12345f32 t6=22.123456789f64 t7=\"hpxzrdiw\" t8=\"ncharTagValue\" t9=127i8"; + snprintf(lineTemplate, templateLen, "%s", sample); + return 0; +} + int32_t getLineTemplate(char* lineTemplate, int templateLen, int numFields) { if (numFields <= 4) { - char* sample = "sta%d,t3=%di32 c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64 %lldms"; + char* sample = "sta%d,t3=%di32 c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64 %lld"; snprintf(lineTemplate, templateLen, "%s", sample); return 0; } if (numFields <= 13) { - char* sample = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms"; + char* sample = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lld"; snprintf(lineTemplate, templateLen, "%s", sample); return 0; } @@ -84,14 +90,24 @@ int32_t getLineTemplate(char* lineTemplate, int templateLen, int numFields) { for (int i = offset[1]+1; i < offset[2]; ++i) { snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "c%d=\"%d\",", i, i); } - char* lineFormatTs = " %lldms"; + char* lineFormatTs = " %lld"; snprintf(lineTemplate+strlen(lineTemplate)-1, templateLen-strlen(lineTemplate)+1, "%s", lineFormatTs); return 0; } +int32_t generateLine(char* line, int lineLen, char* lineTemplate, int protocol, int superTable, int childTable, int64_t ts) { + if (protocol == TSDB_SML_LINE_PROTOCOL) { + snprintf(line, lineLen, lineTemplate, superTable, childTable, ts); + } else if (protocol == TSDB_SML_TELNET_PROTOCOL) { + snprintf(line, lineLen, lineTemplate, superTable, ts, childTable); + } + return TSDB_CODE_SUCCESS; +} + int main(int argc, char* argv[]) { int numThreads = 8; + int maxBatchesPerThread = 1024; int numSuperTables = 1; int numChildTables = 256; @@ -99,11 +115,11 @@ int main(int argc, char* argv[]) { int numFields = 13; int maxLinesPerBatch = 16384; - int tsPrecision = TSDB_SML_TIMESTAMP_NOT_CONFIGURED; - int lineProtocol = TSDB_SML_UNKNOWN_PROTOCOL; + + int protocol = TSDB_SML_TELNET_PROTOCOL; int opt; - while ((opt = getopt(argc, argv, "s:c:r:f:t:m:p:P:h")) != -1) { + while ((opt = getopt(argc, argv, "s:c:r:f:t:b:p:hv")) != -1) { switch (opt) { case 's': numSuperTables = atoi(optarg); @@ -120,28 +136,35 @@ int main(int argc, char* argv[]) { case 't': numThreads = atoi(optarg); break; - case 'm': + case 'b': maxLinesPerBatch = atoi(optarg); break; - case 'p': - tsPrecision = atoi(optarg); + case 'v': + verbose = true; break; - case 'P': - lineProtocol = atoi(optarg); + case 'p': + if (optarg[0] == 't') { + protocol = TSDB_SML_TELNET_PROTOCOL; + } else if (optarg[0] == 'l') { + protocol = TSDB_SML_LINE_PROTOCOL; + } else if (optarg[0] == 'j') { + protocol = TSDB_SML_JSON_PROTOCOL; + } break; case 'h': - fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -m maxlines_per_batch\n", + fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -b maxlines_per_batch -p [t|l|j] -v\n", argv[0]); exit(0); default: /* '?' */ - fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -m maxlines_per_batch\n", + fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -b maxlines_per_batch -p [t|l|j] -v\n", argv[0]); exit(-1); } } TAOS_RES* result; - const char* host = "127.0.0.1"; + //const char* host = "127.0.0.1"; + const char* host = NULL; const char* user = "root"; const char* passwd = "taosdata"; @@ -152,10 +175,7 @@ int main(int argc, char* argv[]) { exit(1); } - if (numThreads * MAX_THREAD_LINE_BATCHES* maxLinesPerBatch < numSuperTables*numChildTables*numRowsPerChildTable) { - printf("too many rows to be handle by threads with %d batches", MAX_THREAD_LINE_BATCHES); - exit(2); - } + maxBatchesPerThread = (numSuperTables*numChildTables*numRowsPerChildTable)/(numThreads * maxLinesPerBatch) + 1; char* info = taos_get_server_info(taos); printf("server info: %s\n", info); @@ -171,28 +191,33 @@ int main(int argc, char* argv[]) { (void)taos_select_db(taos, "db"); time_t ct = time(0); - int64_t ts = ct * 1000; + int64_t ts = ct * 1000 ; char* lineTemplate = calloc(65536, sizeof(char)); - getLineTemplate(lineTemplate, 65535, numFields); + if (protocol == TSDB_SML_LINE_PROTOCOL) { + getLineTemplate(lineTemplate, 65535, numFields); + } else if (protocol == TSDB_SML_TELNET_PROTOCOL ) { + getTelenetTemplate(lineTemplate, 65535); + } printf("setup supertables..."); { char** linesStb = calloc(numSuperTables, sizeof(char*)); for (int i = 0; i < numSuperTables; i++) { char* lineStb = calloc(strlen(lineTemplate)+128, 1); - snprintf(lineStb, strlen(lineTemplate)+128, lineTemplate, i, + generateLine(lineStb, strlen(lineTemplate)+128, lineTemplate, protocol, i, numSuperTables * numChildTables, ts + numSuperTables * numChildTables * numRowsPerChildTable); linesStb[i] = lineStb; } SThreadInsertArgs args = {0}; + args.protocol = protocol; + args.batches = calloc(maxBatchesPerThread, sizeof(maxBatchesPerThread)); args.taos = taos; args.batches[0].lines = linesStb; args.batches[0].numLines = numSuperTables; - args.tsPrecision = tsPrecision; - args.lineProtocol = lineProtocol; insertLines(&args); + free(args.batches); for (int i = 0; i < numSuperTables; ++i) { free(linesStb[i]); } @@ -203,8 +228,10 @@ int main(int argc, char* argv[]) { pthread_t* tids = calloc(numThreads, sizeof(pthread_t)); SThreadInsertArgs* argsThread = calloc(numThreads, sizeof(SThreadInsertArgs)); for (int i = 0; i < numThreads; ++i) { + argsThread[i].batches = calloc(maxBatchesPerThread, sizeof(SThreadLinesBatch)); argsThread[i].taos = taos; argsThread[i].numBatches = 0; + argsThread[i].protocol = protocol; } int64_t totalLines = numSuperTables * numChildTables * numRowsPerChildTable; @@ -229,7 +256,7 @@ int main(int argc, char* argv[]) { int stIdx = i; int ctIdx = numSuperTables*numChildTables + j; char* line = calloc(strlen(lineTemplate)+128, 1); - snprintf(line, strlen(lineTemplate)+128, lineTemplate, stIdx, ctIdx, ts + l); + generateLine(line, strlen(lineTemplate)+128, lineTemplate, protocol, stIdx, ctIdx, ts + l); int batchNo = l / maxLinesPerBatch; int lineNo = l % maxLinesPerBatch; allBatches[batchNo][lineNo] = line; @@ -262,6 +289,9 @@ int main(int argc, char* argv[]) { } free(allBatches); + for (int i = 0; i < numThreads; i++) { + free(argsThread[i].batches); + } free(argsThread); free(tids);