未验证 提交 e7b7b86a 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #8928 from taosdata/szhou/hotfix/sml-tiny-batch

TD-11623: improve schemaless insert performance
......@@ -32,6 +32,10 @@ typedef struct {
static uint64_t linesSmlHandleId = 0;
static int32_t insertChildTablePointsBatch(void* pVoid, char* name, char* name1, SArray* pArray, SArray* pArray1,
SArray* pArray2, SArray* pArray3, size_t size, SSmlLinesInfo* info);
static int32_t doInsertChildTablePoints(void* pVoid, char* sql, char* name, SArray* pArray, SArray* pArray1,
SSmlLinesInfo* info);
uint64_t genLinesSmlId() {
uint64_t id;
......@@ -177,11 +181,10 @@ static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableNa
MD5Init(&context);
MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
MD5Final(&context);
uint64_t digest1 = *(uint64_t*)(context.digest);
uint64_t digest2 = *(uint64_t*)(context.digest + 8);
*tableNameLen = snprintf(tableName, *tableNameLen,
"t_%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0],
context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6],
context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11],
context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
"t_%016"PRIx64"%016"PRIx64, digest1, digest2);
taosStringBuilderDestroy(&sb);
tscDebug("SML:0x%"PRIx64" child table name: %s", info->id, tableName);
return 0;
......@@ -198,7 +201,6 @@ static int32_t buildSmlChildTableName(TAOS_SML_DATA_POINT* point, SSmlLinesInfo*
return 0;
}
static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas, SSmlLinesInfo* info) {
int32_t code = 0;
SHashObj* sname2shema = taosHashInit(32,
......@@ -219,8 +221,8 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
schema.sTableName[stableNameLen] = '\0';
schema.fields = taosArrayInit(64, sizeof(SSchema));
schema.tags = taosArrayInit(8, sizeof(SSchema));
schema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
schema.tagHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
schema.fieldHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
pStableSchema = taosArrayPush(stableSchemas, &schema);
stableIdx = taosArrayGetSize(stableSchemas) - 1;
......@@ -555,11 +557,75 @@ static int32_t fillDbSchema(STableMeta* tableMeta, char* tableName, SSmlSTableSc
return TSDB_CODE_SUCCESS;
}
static int32_t getSuperTableMetaFromLocalCache(TAOS* taos, char* tableName, STableMeta** outTableMeta, SSmlLinesInfo* info) {
int32_t code = 0;
STableMeta* tableMeta = NULL;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
tscError("SML:0x%" PRIx64 " failed to allocate memory, reason:%s", info->id, strerror(errno));
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return code;
}
pSql->pTscObj = taos;
pSql->signature = pSql;
pSql->fp = NULL;
registerSqlObj(pSql);
char tableNameBuf[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
memcpy(tableNameBuf, tableName, strlen(tableName));
SStrToken tableToken = {.z = tableNameBuf, .n = (uint32_t)strlen(tableName), .type = TK_ID};
tGetToken(tableNameBuf, &tableToken.type);
bool dbIncluded = false;
// Check if the table name available or not
if (tscValidateName(&tableToken, true, &dbIncluded) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pSql->cmd.payload, "table name is invalid");
taosReleaseRef(tscObjRef, pSql->self);
return code;
}
SName sname = {0};
if ((code = tscSetTableFullName(&sname, &tableToken, pSql, dbIncluded)) != TSDB_CODE_SUCCESS) {
taosReleaseRef(tscObjRef, pSql->self);
return code;
}
char fullTableName[TSDB_TABLE_FNAME_LEN] = {0};
memset(fullTableName, 0, tListLen(fullTableName));
tNameExtractFullName(&sname, fullTableName);
size_t size = 0;
taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size);
STableMeta* stableMeta = tableMeta;
if (tableMeta != NULL && tableMeta->tableType == TSDB_CHILD_TABLE) {
taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), tableMeta->sTableName, strlen(tableMeta->sTableName), NULL,
(void**)stableMeta, &size);
}
taosReleaseRef(tscObjRef, pSql->self);
if (stableMeta != tableMeta) {
free(tableMeta);
}
if (stableMeta != NULL) {
if (outTableMeta != NULL) {
*outTableMeta = stableMeta;
} else {
free(stableMeta);
}
return TSDB_CODE_SUCCESS;
} else {
return TSDB_CODE_TSC_NO_META_CACHED;
}
}
static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTableMeta, SSmlLinesInfo* info) {
int32_t code = 0;
int32_t retries = 0;
STableMeta* tableMeta = NULL;
while (retries++ < TSDB_MAX_REPLICA && tableMeta == NULL) {
while (retries++ <= TSDB_MAX_REPLICA && tableMeta == NULL) {
STscObj* pObj = (STscObj*)taos;
if (pObj == NULL || pObj->signature != pObj) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
......@@ -567,55 +633,24 @@ static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTabl
}
tscDebug("SML:0x%" PRIx64 " retrieve table meta. super table name: %s", info->id, tableName);
char sql[256];
snprintf(sql, 256, "describe %s", tableName);
TAOS_RES* res = taos_query(taos, sql);
code = taos_errno(res);
if (code != 0) {
tscError("SML:0x%" PRIx64 " describe table failure. %s", info->id, taos_errstr(res));
code = getSuperTableMetaFromLocalCache(taos, tableName, &tableMeta, info);
if (code == TSDB_CODE_SUCCESS) {
tscDebug("SML:0x%" PRIx64 " successfully retrieved table meta. super table name: %s", info->id, tableName);
break;
} else if (code == TSDB_CODE_TSC_NO_META_CACHED) {
char sql[256];
snprintf(sql, 256, "describe %s", tableName);
TAOS_RES* res = taos_query(taos, sql);
code = taos_errno(res);
if (code != 0) {
tscError("SML:0x%" PRIx64 " describe table failure. %s", info->id, taos_errstr(res));
taos_free_result(res);
return code;
}
taos_free_result(res);
} else {
return code;
}
taos_free_result(res);
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
tscError("SML:0x%" PRIx64 " failed to allocate memory, reason:%s", info->id, strerror(errno));
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return code;
}
pSql->pTscObj = taos;
pSql->signature = pSql;
pSql->fp = NULL;
registerSqlObj(pSql);
char tableNameBuf[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
memcpy(tableNameBuf, tableName, strlen(tableName));
SStrToken tableToken = {.z = tableNameBuf, .n = (uint32_t)strlen(tableName), .type = TK_ID};
tGetToken(tableNameBuf, &tableToken.type);
bool dbIncluded = false;
// Check if the table name available or not
if (tscValidateName(&tableToken, true, &dbIncluded) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pSql->cmd.payload, "table name is invalid");
taosReleaseRef(tscObjRef, pSql->self);
return code;
}
SName sname = {0};
if ((code = tscSetTableFullName(&sname, &tableToken, pSql, dbIncluded)) != TSDB_CODE_SUCCESS) {
taosReleaseRef(tscObjRef, pSql->self);
return code;
}
char fullTableName[TSDB_TABLE_FNAME_LEN] = {0};
memset(fullTableName, 0, tListLen(fullTableName));
tNameExtractFullName(&sname, fullTableName);
size_t size = 0;
taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size);
taosReleaseRef(tscObjRef, pSql->self);
}
if (tableMeta != NULL) {
......@@ -718,72 +753,303 @@ static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas, SSmlLinesInfo*
return 0;
}
static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName,
SArray* tagsSchema, SArray* tagsBind, SSmlLinesInfo* info) {
size_t numTags = taosArrayGetSize(tagsSchema);
static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints,
SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) {
for (int32_t i = 0; i < numPoints; ++i) {
TAOS_SML_DATA_POINT * point = points + i;
SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
for (int j = 0; j < point->tagNum; ++j) {
TAOS_SML_KV* kv = point->tags + j;
if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t ts = *(int64_t*)(kv->value);
ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
*(int64_t*)(kv->value) = ts;
}
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* kv = point->fields + j;
if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t ts = *(int64_t*)(kv->value);
ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
*(int64_t*)(kv->value) = ts;
}
}
SArray* cTablePoints = NULL;
SArray** pCTablePoints = taosHashGet(cname2points, point->childTableName, strlen(point->childTableName));
if (pCTablePoints) {
cTablePoints = *pCTablePoints;
} else {
cTablePoints = taosArrayInit(64, sizeof(point));
taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES);
}
taosArrayPush(cTablePoints, &point);
}
return 0;
}
static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t numCols = taosArrayGetSize(sTableSchema->fields);
size_t rows = taosArrayGetSize(cTablePoints);
SArray* tagsSchema = sTableSchema->tags;
SArray* colsSchema = sTableSchema->fields;
TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
for (int i= 0; i < rows; ++i) {
TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
for (int j = 0; j < pDataPoint->tagNum; ++j) {
TAOS_SML_KV* kv = pDataPoint->tags + j;
tagKVs[kv->fieldSchemaIdx] = kv;
}
}
char* sql = malloc(tsMaxSQLStringLen+1);
if (sql == NULL) {
tscError("malloc sql memory error");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int freeBytes = tsMaxSQLStringLen + 1;
sprintf(sql, "create table if not exists %s using %s", cTableName, sTableName);
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "(");
int32_t freeBytes = tsMaxSQLStringLen + 1 ;
int32_t totalLen = 0;
totalLen += sprintf(sql, "insert into %s using %s (", cTableName, sTableName);
for (int i = 0; i < numTags; ++i) {
SSchema* tagSchema = taosArrayGet(tagsSchema, i);
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name);
totalLen += snprintf(sql+totalLen, freeBytes-totalLen, "%s,", tagSchema->name);
}
snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ")");
snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags (");
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, " tags (");
// for (int i = 0; i < numTags; ++i) {
// snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
// }
for (int i = 0; i < numTags; ++i) {
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
if (tagKVs[i] == NULL) {
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "NULL,");
} else {
TAOS_SML_KV* kv = tagKVs[i];
size_t beforeLen = totalLen;
int32_t len = 0;
converToStr(sql+beforeLen, kv->type, kv->value, kv->length, &len);
totalLen += len;
totalLen += snprintf(sql+totalLen, freeBytes-totalLen, ",");
}
}
snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");
sql[strlen(sql)] = '\0';
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ") (");
tscDebug("SML:0x%"PRIx64" create table : %s", info->id, sql);
for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(colsSchema, i);
totalLen += snprintf(sql+totalLen, freeBytes-totalLen, "%s,", colSchema->name);
}
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ") values ");
TAOS_STMT* stmt = taos_stmt_init(taos);
if (stmt == NULL) {
free(sql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
TAOS_SML_KV** colKVs = malloc(numCols*sizeof(TAOS_SML_KV*));
for (int r = 0; r < rows; ++r) {
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "(");
memset(colKVs, 0, numCols*sizeof(TAOS_SML_KV*));
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, r);
for (int i = 0; i < point->fieldNum; ++i) {
TAOS_SML_KV* kv = point->fields + i;
colKVs[kv->fieldSchemaIdx] = kv;
}
for (int i = 0; i < numCols; ++i) {
if (colKVs[i] == NULL) {
totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "NULL,");
} else {
TAOS_SML_KV* kv = colKVs[i];
size_t beforeLen = totalLen;
int32_t len = 0;
converToStr(sql+beforeLen, kv->type, kv->value, kv->length, &len);
totalLen += len;
totalLen += snprintf(sql+totalLen, freeBytes-totalLen, ",");
}
}
--totalLen;
totalLen += snprintf(sql+totalLen, freeBytes - totalLen, ")");
}
int32_t code;
code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
free(colKVs);
sql[totalLen] = '\0';
tscDebug("SML:0x%"PRIx64" insert child table table %s of super table %s sql: %s", info->id, cTableName, sTableName, sql);
TAOS_RES* res = taos_query(taos, sql);
free(sql);
code = taos_errno(res);
info->affectedRows = taos_affected_rows(res);
taos_free_result(res);
return code;
}
if (code != 0) {
tscError("SML:0x%"PRIx64" taos_stmt_prepare returns %d:%s", info->id, code, tstrerror(code));
taos_stmt_close(stmt);
return code;
static int32_t applyChildTableDataPointsWithStmt(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t numCols = taosArrayGetSize(sTableSchema->fields);
size_t rows = taosArrayGetSize(cTablePoints);
TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
for (int i= 0; i < rows; ++i) {
TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
for (int j = 0; j < pDataPoint->tagNum; ++j) {
TAOS_SML_KV* kv = pDataPoint->tags + j;
tagKVs[kv->fieldSchemaIdx] = kv;
}
}
code = taos_stmt_bind_param(stmt, TARRAY_GET_START(tagsBind));
if (code != 0) {
tscError("SML:0x%"PRIx64" taos_stmt_bind_param returns %d:%s", info->id, code, tstrerror(code));
taos_stmt_close(stmt);
return code;
//tag bind
SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
taosArraySetSize(tagBinds, numTags);
int isNullColBind = TSDB_TRUE;
for (int j = 0; j < numTags; ++j) {
TAOS_BIND* bind = taosArrayGet(tagBinds, j);
bind->is_null = &isNullColBind;
}
for (int j = 0; j < numTags; ++j) {
if (tagKVs[j] == NULL) continue;
TAOS_SML_KV* kv = tagKVs[j];
TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
bind->buffer = kv->value;
bind->is_null = NULL;
}
code = taos_stmt_execute(stmt);
if (code != 0) {
tscError("SML:0x%"PRIx64" taos_stmt_execute returns %d:%s", info->id, code, tstrerror(code));
taos_stmt_close(stmt);
return code;
//rows bind
SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES);
for (int i = 0; i < rows; ++i) {
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, i);
TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND));
if (colBinds == NULL) {
tscError("SML:0x%"PRIx64" taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, "
"num of rows: %zu, num of cols: %zu", info->id, rows, numCols);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int j = 0; j < numCols; ++j) {
TAOS_BIND* bind = colBinds + j;
bind->is_null = &isNullColBind;
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* kv = point->fields + j;
TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
bind->buffer = kv->value;
bind->is_null = NULL;
}
taosArrayPush(rowsBind, &colBinds);
}
code = taos_stmt_close(stmt);
int32_t code = 0;
code = insertChildTablePointsBatch(taos, cTableName, sTableName, sTableSchema->tags, tagBinds, sTableSchema->fields, rowsBind, rowSize, info);
if (code != 0) {
tscError("SML:0x%"PRIx64" taos_stmt_close return %d:%s", info->id, code, tstrerror(code));
return code;
tscError("SML:0x%"PRIx64" insert into child table %s failed. error %s", info->id, cTableName, tstrerror(code));
}
//free rows bind
for (int i = 0; i < rows; ++i) {
TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
for (int j = 0; j < numCols; ++j) {
TAOS_BIND* bind = colBinds + j;
free(bind->length);
}
free(colBinds);
}
taosArrayDestroy(rowsBind);
//free tag bind
for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
TAOS_BIND* bind = taosArrayGet(tagBinds, i);
free(bind->length);
}
taosArrayDestroy(tagBinds);
return code;
}
static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableName, SArray* batchBind, SSmlLinesInfo* info) {
static int32_t insertChildTablePointsBatch(TAOS* taos, char* cTableName, char* sTableName,
SArray* tagsSchema, SArray* tagsBind,
SArray* colsSchema, SArray* rowsBind,
size_t rowSize, SSmlLinesInfo* info) {
size_t numTags = taosArrayGetSize(tagsSchema);
size_t numCols = taosArrayGetSize(colsSchema);
char* sql = malloc(tsMaxSQLStringLen+1);
if (sql == NULL) {
tscError("malloc sql memory error");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t freeBytes = tsMaxSQLStringLen + 1 ;
sprintf(sql, "insert into ? using %s (", sTableName);
for (int i = 0; i < numTags; ++i) {
SSchema* tagSchema = taosArrayGet(tagsSchema, i);
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name);
}
snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");
snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags (");
for (int i = 0; i < numTags; ++i) {
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
}
snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ") (");
for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(colsSchema, i);
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name);
}
snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values (");
for (int i = 0; i < numCols; ++i) {
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
}
snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")");
sql[strlen(sql)] = '\0';
tscDebug("SML:0x%"PRIx64" insert child table table %s of super table %s : %s", info->id, cTableName, sTableName, sql);
size_t rows = taosArrayGetSize(rowsBind);
size_t maxBatchSize = TSDB_MAX_WAL_SIZE/rowSize * 4 / 5;
size_t batchSize = MIN(maxBatchSize, rows);
tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu, batch size: %zu",
info->id, cTableName, rows, batchSize);
SArray* batchBind = taosArrayInit(batchSize, POINTER_BYTES);
int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < rows;) {
int j = i;
for (; j < i + batchSize && j<rows; ++j) {
taosArrayPush(batchBind, taosArrayGet(rowsBind, j));
}
if (j > i) {
tscDebug("SML:0x%"PRIx64" insert child table batch from line %d to line %d.", info->id, i, j - 1);
code = doInsertChildTablePoints(taos, sql, cTableName, tagsBind, batchBind, info);
if (code != 0) {
taosArrayDestroy(batchBind);
tfree(sql);
return code;
}
taosArrayClear(batchBind);
}
i = j;
}
taosArrayDestroy(batchBind);
tfree(sql);
return code;
}
static int32_t doInsertChildTablePoints(TAOS* taos, char* sql, char* cTableName, SArray* tagsBind, SArray* batchBind,
SSmlLinesInfo* info) {
int32_t code = 0;
TAOS_STMT* stmt = taos_stmt_init(taos);
......@@ -802,7 +1068,7 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam
bool tryAgain = false;
int32_t try = 0;
do {
code = taos_stmt_set_tbname(stmt, cTableName);
code = taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind));
if (code != 0) {
tscError("SML:0x%"PRIx64" taos_stmt_set_tbname return %d:%s", info->id, code, taos_stmt_errstr(stmt));
......@@ -843,7 +1109,7 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam
tscError("SML:0x%"PRIx64" taos_stmt_execute return %d:%s, try:%d", info->id, code, taos_stmt_errstr(stmt), try);
}
tscDebug("SML:0x%"PRIx64" taos_stmt_execute inserted %d rows", info->id, taos_stmt_affected_rows(stmt));
tryAgain = false;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
|| code == TSDB_CODE_VND_INVALID_VGROUP_ID
......@@ -876,189 +1142,19 @@ static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableNam
taos_stmt_close(stmt);
return code;
}
static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* colsSchema, SArray* rowsBind, size_t rowSize, SSmlLinesInfo* info) {
size_t numCols = taosArrayGetSize(colsSchema);
char* sql = malloc(tsMaxSQLStringLen+1);
if (sql == NULL) {
tscError("malloc sql memory error");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t freeBytes = tsMaxSQLStringLen + 1 ;
sprintf(sql, "insert into ? (");
for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(colsSchema, i);
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name);
}
snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values (");
for (int i = 0; i < numCols; ++i) {
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
}
snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")");
sql[strlen(sql)] = '\0';
size_t rows = taosArrayGetSize(rowsBind);
size_t maxBatchSize = TSDB_MAX_WAL_SIZE/rowSize * 4 / 5;
size_t batchSize = MIN(maxBatchSize, rows);
tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu, batch size: %zu",
info->id, cTableName, rows, batchSize);
SArray* batchBind = taosArrayInit(batchSize, POINTER_BYTES);
int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < rows;) {
int j = i;
for (; j < i + batchSize && j<rows; ++j) {
taosArrayPush(batchBind, taosArrayGet(rowsBind, j));
}
if (j > i) {
tscDebug("SML:0x%"PRIx64" insert child table batch from line %d to line %d.", info->id, i, j - 1);
code = doInsertChildTableWithStmt(taos, sql, cTableName, batchBind, info);
if (code != 0) {
taosArrayDestroy(batchBind);
tfree(sql);
return code;
}
taosArrayClear(batchBind);
}
i = j;
}
taosArrayDestroy(batchBind);
tfree(sql);
return code;
}
static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints,
SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) {
for (int32_t i = 0; i < numPoints; ++i) {
TAOS_SML_DATA_POINT * point = points + i;
SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
for (int j = 0; j < point->tagNum; ++j) {
TAOS_SML_KV* kv = point->tags + j;
if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t ts = *(int64_t*)(kv->value);
ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
*(int64_t*)(kv->value) = ts;
}
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* kv = point->fields + j;
if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t ts = *(int64_t*)(kv->value);
ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
*(int64_t*)(kv->value) = ts;
}
}
SArray* cTablePoints = NULL;
SArray** pCTablePoints = taosHashGet(cname2points, point->childTableName, strlen(point->childTableName));
if (pCTablePoints) {
cTablePoints = *pCTablePoints;
} else {
cTablePoints = taosArrayInit(64, sizeof(point));
taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES);
}
taosArrayPush(cTablePoints, &point);
}
return 0;
}
static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableName,
SSmlSTableSchema* sTableSchema, SArray* cTablePoints, SSmlLinesInfo* info) {
size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t rows = taosArrayGetSize(cTablePoints);
TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
for (int i= 0; i < rows; ++i) {
TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
for (int j = 0; j < pDataPoint->tagNum; ++j) {
TAOS_SML_KV* kv = pDataPoint->tags + j;
tagKVs[kv->fieldSchemaIdx] = kv;
}
}
SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
taosArraySetSize(tagBinds, numTags);
int isNullColBind = TSDB_TRUE;
for (int j = 0; j < numTags; ++j) {
TAOS_BIND* bind = taosArrayGet(tagBinds, j);
bind->is_null = &isNullColBind;
}
for (int j = 0; j < numTags; ++j) {
if (tagKVs[j] == NULL) continue;
TAOS_SML_KV* kv = tagKVs[j];
TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
bind->buffer = kv->value;
bind->is_null = NULL;
}
int32_t code = creatChildTableIfNotExists(taos, cTableName, sTableName, sTableSchema->tags, tagBinds, info);
for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
TAOS_BIND* bind = taosArrayGet(tagBinds, i);
free(bind->length);
}
taosArrayDestroy(tagBinds);
return code;
}
static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, char* cTableName,
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
static int32_t applyChildTableDataPoints(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
size_t numCols = taosArrayGetSize(sTableSchema->fields);
size_t rows = taosArrayGetSize(cTablePoints);
SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES);
int isNullColBind = TSDB_TRUE;
for (int i = 0; i < rows; ++i) {
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, i);
TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND));
if (colBinds == NULL) {
tscError("SML:0x%"PRIx64" taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, "
"num of rows: %zu, num of cols: %zu", info->id, rows, numCols);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int j = 0; j < numCols; ++j) {
TAOS_BIND* bind = colBinds + j;
bind->is_null = &isNullColBind;
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* kv = point->fields + j;
TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
bind->buffer = kv->value;
bind->is_null = NULL;
}
taosArrayPush(rowsBind, &colBinds);
}
code = insertChildTableBatch(taos, cTableName, sTableSchema->fields, rowsBind, rowSize, info);
if (code != 0) {
tscError("SML:0x%"PRIx64" insert into child table %s failed. error %s", info->id, cTableName, tstrerror(code));
}
for (int i = 0; i < rows; ++i) {
TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
for (int j = 0; j < numCols; ++j) {
TAOS_BIND* bind = colBinds + j;
free(bind->length);
}
free(colBinds);
size_t childTableDataPoints = taosArrayGetSize(cTablePoints);
if (childTableDataPoints < 10) {
code = applyChildTableDataPointsWithInsertSQL(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info);
} else {
code = applyChildTableDataPointsWithStmt(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info);
}
taosArrayDestroy(rowsBind);
return code;
}
......@@ -1075,13 +1171,6 @@ static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
tscDebug("SML:0x%"PRIx64" apply child table tags. child table: %s", info->id, point->childTableName);
code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, info);
if (code != 0) {
tscError("apply child table tags failed. child table %s, error %s", point->childTableName, tstrerror(code));
goto cleanup;
}
size_t rowSize = 0;
size_t numCols = taosArrayGetSize(sTableSchema->fields);
for (int i = 0; i < numCols; ++i) {
......@@ -1089,10 +1178,11 @@ static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t
rowSize += colSchema->bytes;
}
tscDebug("SML:0x%"PRIx64" apply child table points. child table: %s, row size: %zu", info->id, point->childTableName, rowSize);
code = applyChildTableFields(taos, sTableSchema, point->childTableName, cTablePoints, rowSize, info);
tscDebug("SML:0x%"PRIx64" apply child table points. child table: %s of super table %s, row size: %zu",
info->id, point->childTableName, point->stableName, rowSize);
code = applyChildTableDataPoints(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, rowSize, info);
if (code != 0) {
tscError("SML:0x%"PRIx64" Apply child table fields failed. child table %s, error %s", info->id, point->childTableName, tstrerror(code));
tscError("SML:0x%"PRIx64" Apply child table points failed. child table %s, error %s", info->id, point->childTableName, tstrerror(code));
goto cleanup;
}
......@@ -1112,6 +1202,60 @@ cleanup:
return code;
}
static int doSmlInsertOneDataPoint(TAOS* taos, TAOS_SML_DATA_POINT* point, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
if (!point->childTableName) {
int tableNameLen = TSDB_TABLE_NAME_LEN;
point->childTableName = calloc(1, tableNameLen + 1);
getSmlMd5ChildTableName(point, point->childTableName, &tableNameLen, info);
point->childTableName[tableNameLen] = '\0';
}
STableMeta* tableMeta = NULL;
int32_t ret = getSuperTableMetaFromLocalCache(taos, point->stableName, &tableMeta, info);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
uint8_t precision = tableMeta->tableInfo.precision;
free(tableMeta);
char* sql = malloc(TSDB_MAX_SQL_LEN + 1);
int freeBytes = TSDB_MAX_SQL_LEN;
int sqlLen = 0;
sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, "insert into %s(", point->childTableName);
for (int col = 0; col < point->fieldNum; ++col) {
TAOS_SML_KV* kv = point->fields + col;
sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, "%s,", kv->key);
}
--sqlLen;
sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, ") values (");
TAOS_SML_KV* tsField = point->fields + 0;
int64_t ts = *(int64_t*)(tsField->value);
ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, precision);
sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, "%" PRId64 ",", ts);
for (int col = 1; col < point->fieldNum; ++col) {
TAOS_SML_KV* kv = point->fields + col;
int32_t len = 0;
converToStr(sql + sqlLen, kv->type, kv->value, kv->length, &len);
sqlLen += len;
sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, ",");
}
--sqlLen;
sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, ")");
sql[sqlLen] = 0;
tscDebug("SML:0x%" PRIx64 " insert child table table %s of super table %s sql: %s", info->id,
point->childTableName, point->stableName, sql);
TAOS_RES* res = taos_query(taos, sql);
free(sql);
code = taos_errno(res);
info->affectedRows = taos_affected_rows(res);
taos_free_result(res);
return code;
}
int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info) {
tscDebug("SML:0x%"PRIx64" taos_sml_insert. number of points: %d", info->id, numPoint);
......@@ -1119,6 +1263,14 @@ int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLine
info->affectedRows = 0;
if (numPoint == 1) {
TAOS_SML_DATA_POINT* point = points + 0;
code = doSmlInsertOneDataPoint(taos, point, info);
if (code == TSDB_CODE_SUCCESS) {
return code;
}
}
tscDebug("SML:0x%"PRIx64" build data point schemas", info->id);
SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema>
code = buildDataPointSchemas(points, numPoint, stableSchemas, info);
......
......@@ -125,8 +125,9 @@ static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char
}
tfree(value);
(*pTS)->key = tcalloc(sizeof(key), 1);
(*pTS)->key = tcalloc(sizeof(key) + TS_ESCAPE_CHAR_SIZE, 1);
memcpy((*pTS)->key, key, sizeof(key));
addEscapeCharToString((*pTS)->key, (int32_t)strlen(key));
*num_kvs += 1;
*index = cur + 1;
......
......@@ -81,23 +81,42 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le
break;
case TSDB_DATA_TYPE_FLOAT:
n = sprintf(str, "%e", GET_FLOAT_VAL(buf));
n = sprintf(str, "%.*e", DECIMAL_DIG, GET_FLOAT_VAL(buf));
break;
case TSDB_DATA_TYPE_DOUBLE:
n = sprintf(str, "%e", GET_DOUBLE_VAL(buf));
n = sprintf(str, "%.*e", DECIMAL_DIG, GET_DOUBLE_VAL(buf));
break;
case TSDB_DATA_TYPE_BINARY:
if (bufSize < 0) {
tscError("invalid buf size");
return TSDB_CODE_TSC_INVALID_VALUE;
}
int32_t escapeSize = 0;
*str++ = '\'';
++escapeSize;
char* data = buf;
for (int32_t i = 0; i < bufSize; ++i) {
if (data[i] == '\'' || data[i] == '"') {
*str++ = '\\';
++escapeSize;
}
*str++ = data[i];
}
*str = '\'';
++escapeSize;
n = bufSize + escapeSize;
break;
case TSDB_DATA_TYPE_NCHAR:
if (bufSize < 0) {
tscError("invalid buf size");
return TSDB_CODE_TSC_INVALID_VALUE;
}
*str = '"';
*str = '\'';
memcpy(str + 1, buf, bufSize);
*(str + bufSize + 1) = '"';
*(str + bufSize + 1) = '\'';
n = bufSize + 2;
break;
......
......@@ -8,7 +8,8 @@
#include <time.h>
#include <unistd.h>
#define MAX_THREAD_LINE_BATCHES 1024
bool verbose = false;
void printThreadId(pthread_t id, char* buf)
{
......@@ -30,11 +31,10 @@ typedef struct {
typedef struct {
TAOS* taos;
int protocol;
int numBatches;
SThreadLinesBatch batches[MAX_THREAD_LINE_BATCHES];
SThreadLinesBatch *batches;
int64_t costTime;
int tsPrecision;
int lineProtocol;
} SThreadInsertArgs;
static void* insertLines(void* args) {
......@@ -43,27 +43,33 @@ static void* insertLines(void* args) {
printThreadId(pthread_self(), tidBuf);
for (int i = 0; i < insertArgs->numBatches; ++i) {
SThreadLinesBatch* batch = insertArgs->batches + i;
printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf);
if (verbose) printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf);
int64_t begin = getTimeInUs();
TAOS_RES *res = taos_schemaless_insert(insertArgs->taos, batch->lines, batch->numLines, insertArgs->lineProtocol, insertArgs->tsPrecision);
//int32_t code = taos_insert_lines(insertArgs->taos, batch->lines, batch->numLines);
TAOS_RES * res = taos_schemaless_insert(insertArgs->taos, batch->lines, batch->numLines, insertArgs->protocol, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
int32_t code = taos_errno(res);
int64_t end = getTimeInUs();
insertArgs->costTime += end - begin;
printf("code: %d, %s. affected lines:%d time used:%"PRId64", thread: 0x%s\n", code, taos_errstr(res), taos_affected_rows(res), end - begin, tidBuf);
taos_free_result(res);
if (verbose) printf("code: %d, %s. time used:%"PRId64", thread: 0x%s\n", code, tstrerror(code), end - begin, tidBuf);
}
return NULL;
}
int32_t getTelenetTemplate(char* lineTemplate, int templateLen) {
char* sample = "sta%d %lld 44.3 t0=False t1=127i8 t2=32 t3=%di32 t4=9223372036854775807i64 t5=11.12345f32 t6=22.123456789f64 t7=\"hpxzrdiw\" t8=\"ncharTagValue\" t9=127i8";
snprintf(lineTemplate, templateLen, "%s", sample);
return 0;
}
int32_t getLineTemplate(char* lineTemplate, int templateLen, int numFields) {
if (numFields <= 4) {
char* sample = "sta%d,t3=%di32 c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64 %lldms";
char* sample = "sta%d,t3=%di32 c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64 %lld";
snprintf(lineTemplate, templateLen, "%s", sample);
return 0;
}
if (numFields <= 13) {
char* sample = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms";
char* sample = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lld";
snprintf(lineTemplate, templateLen, "%s", sample);
return 0;
}
......@@ -84,14 +90,24 @@ int32_t getLineTemplate(char* lineTemplate, int templateLen, int numFields) {
for (int i = offset[1]+1; i < offset[2]; ++i) {
snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "c%d=\"%d\",", i, i);
}
char* lineFormatTs = " %lldms";
char* lineFormatTs = " %lld";
snprintf(lineTemplate+strlen(lineTemplate)-1, templateLen-strlen(lineTemplate)+1, "%s", lineFormatTs);
return 0;
}
int32_t generateLine(char* line, int lineLen, char* lineTemplate, int protocol, int superTable, int childTable, int64_t ts) {
if (protocol == TSDB_SML_LINE_PROTOCOL) {
snprintf(line, lineLen, lineTemplate, superTable, childTable, ts);
} else if (protocol == TSDB_SML_TELNET_PROTOCOL) {
snprintf(line, lineLen, lineTemplate, superTable, ts, childTable);
}
return TSDB_CODE_SUCCESS;
}
int main(int argc, char* argv[]) {
int numThreads = 8;
int maxBatchesPerThread = 1024;
int numSuperTables = 1;
int numChildTables = 256;
......@@ -99,11 +115,11 @@ int main(int argc, char* argv[]) {
int numFields = 13;
int maxLinesPerBatch = 16384;
int tsPrecision = TSDB_SML_TIMESTAMP_NOT_CONFIGURED;
int lineProtocol = TSDB_SML_UNKNOWN_PROTOCOL;
int protocol = TSDB_SML_TELNET_PROTOCOL;
int opt;
while ((opt = getopt(argc, argv, "s:c:r:f:t:m:p:P:h")) != -1) {
while ((opt = getopt(argc, argv, "s:c:r:f:t:b:p:hv")) != -1) {
switch (opt) {
case 's':
numSuperTables = atoi(optarg);
......@@ -120,28 +136,35 @@ int main(int argc, char* argv[]) {
case 't':
numThreads = atoi(optarg);
break;
case 'm':
case 'b':
maxLinesPerBatch = atoi(optarg);
break;
case 'p':
tsPrecision = atoi(optarg);
case 'v':
verbose = true;
break;
case 'P':
lineProtocol = atoi(optarg);
case 'p':
if (optarg[0] == 't') {
protocol = TSDB_SML_TELNET_PROTOCOL;
} else if (optarg[0] == 'l') {
protocol = TSDB_SML_LINE_PROTOCOL;
} else if (optarg[0] == 'j') {
protocol = TSDB_SML_JSON_PROTOCOL;
}
break;
case 'h':
fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -m maxlines_per_batch\n",
fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -b maxlines_per_batch -p [t|l|j] -v\n",
argv[0]);
exit(0);
default: /* '?' */
fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -m maxlines_per_batch\n",
fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -b maxlines_per_batch -p [t|l|j] -v\n",
argv[0]);
exit(-1);
}
}
TAOS_RES* result;
const char* host = "127.0.0.1";
//const char* host = "127.0.0.1";
const char* host = NULL;
const char* user = "root";
const char* passwd = "taosdata";
......@@ -152,10 +175,7 @@ int main(int argc, char* argv[]) {
exit(1);
}
if (numThreads * MAX_THREAD_LINE_BATCHES* maxLinesPerBatch < numSuperTables*numChildTables*numRowsPerChildTable) {
printf("too many rows to be handle by threads with %d batches", MAX_THREAD_LINE_BATCHES);
exit(2);
}
maxBatchesPerThread = (numSuperTables*numChildTables*numRowsPerChildTable)/(numThreads * maxLinesPerBatch) + 1;
char* info = taos_get_server_info(taos);
printf("server info: %s\n", info);
......@@ -171,28 +191,33 @@ int main(int argc, char* argv[]) {
(void)taos_select_db(taos, "db");
time_t ct = time(0);
int64_t ts = ct * 1000;
int64_t ts = ct * 1000 ;
char* lineTemplate = calloc(65536, sizeof(char));
getLineTemplate(lineTemplate, 65535, numFields);
if (protocol == TSDB_SML_LINE_PROTOCOL) {
getLineTemplate(lineTemplate, 65535, numFields);
} else if (protocol == TSDB_SML_TELNET_PROTOCOL ) {
getTelenetTemplate(lineTemplate, 65535);
}
printf("setup supertables...");
{
char** linesStb = calloc(numSuperTables, sizeof(char*));
for (int i = 0; i < numSuperTables; i++) {
char* lineStb = calloc(strlen(lineTemplate)+128, 1);
snprintf(lineStb, strlen(lineTemplate)+128, lineTemplate, i,
generateLine(lineStb, strlen(lineTemplate)+128, lineTemplate, protocol, i,
numSuperTables * numChildTables,
ts + numSuperTables * numChildTables * numRowsPerChildTable);
linesStb[i] = lineStb;
}
SThreadInsertArgs args = {0};
args.protocol = protocol;
args.batches = calloc(maxBatchesPerThread, sizeof(maxBatchesPerThread));
args.taos = taos;
args.batches[0].lines = linesStb;
args.batches[0].numLines = numSuperTables;
args.tsPrecision = tsPrecision;
args.lineProtocol = lineProtocol;
insertLines(&args);
free(args.batches);
for (int i = 0; i < numSuperTables; ++i) {
free(linesStb[i]);
}
......@@ -203,8 +228,10 @@ int main(int argc, char* argv[]) {
pthread_t* tids = calloc(numThreads, sizeof(pthread_t));
SThreadInsertArgs* argsThread = calloc(numThreads, sizeof(SThreadInsertArgs));
for (int i = 0; i < numThreads; ++i) {
argsThread[i].batches = calloc(maxBatchesPerThread, sizeof(SThreadLinesBatch));
argsThread[i].taos = taos;
argsThread[i].numBatches = 0;
argsThread[i].protocol = protocol;
}
int64_t totalLines = numSuperTables * numChildTables * numRowsPerChildTable;
......@@ -229,7 +256,7 @@ int main(int argc, char* argv[]) {
int stIdx = i;
int ctIdx = numSuperTables*numChildTables + j;
char* line = calloc(strlen(lineTemplate)+128, 1);
snprintf(line, strlen(lineTemplate)+128, lineTemplate, stIdx, ctIdx, ts + l);
generateLine(line, strlen(lineTemplate)+128, lineTemplate, protocol, stIdx, ctIdx, ts + l);
int batchNo = l / maxLinesPerBatch;
int lineNo = l % maxLinesPerBatch;
allBatches[batchNo][lineNo] = line;
......@@ -262,6 +289,9 @@ int main(int argc, char* argv[]) {
}
free(allBatches);
for (int i = 0; i < numThreads; i++) {
free(argsThread[i].batches);
}
free(argsThread);
free(tids);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册