提交 0920bb05 编写于 作者: L lihui

[modify]

上级 e3c85764
...@@ -40,6 +40,8 @@ typedef struct { ...@@ -40,6 +40,8 @@ typedef struct {
float createTableSpeed; float createTableSpeed;
float insertDataSpeed; float insertDataSpeed;
int64_t startMs; int64_t startMs;
int64_t maxDelay;
int64_t minDelay;
pthread_t thread; pthread_t thread;
} SThreadInfo; } SThreadInfo;
...@@ -58,12 +60,30 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -58,12 +60,30 @@ int32_t main(int32_t argc, char *argv[]) {
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
SThreadInfo *pInfo = (SThreadInfo *)calloc(numOfThreads, sizeof(SThreadInfo)); SThreadInfo *pInfo = (SThreadInfo *)calloc(numOfThreads, sizeof(SThreadInfo));
int64_t numOfTablesPerThread = numOfTables / numOfThreads; //int64_t numOfTablesPerThread = numOfTables / numOfThreads;
numOfTables = numOfTablesPerThread * numOfThreads; //numOfTables = numOfTablesPerThread * numOfThreads;
if (numOfThreads < 1) {
numOfThreads = 1;
}
int64_t a = numOfTables / numOfThreads;
if (a < 1) {
numOfThreads = numOfTables;
a = 1;
}
int64_t b = 0;
b = numOfTables % numOfThreads;
int64_t tableFrom = 0;
for (int32_t i = 0; i < numOfThreads; ++i) { for (int32_t i = 0; i < numOfThreads; ++i) {
pInfo[i].tableBeginIndex = i * numOfTablesPerThread; pInfo[i].tableBeginIndex = tableFrom;
pInfo[i].tableEndIndex = (i + 1) * numOfTablesPerThread; pInfo[i].tableEndIndex = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pInfo[i].tableEndIndex + 1;
pInfo[i].threadIndex = i; pInfo[i].threadIndex = i;
pInfo[i].minDelay = INT64_MAX;
strcpy(pInfo[i].dbName, dbName); strcpy(pInfo[i].dbName, dbName);
strcpy(pInfo[i].stbName, stbName); strcpy(pInfo[i].stbName, stbName);
pthread_create(&(pInfo[i].thread), &thattr, threadFunc, (void *)(pInfo + i)); pthread_create(&(pInfo[i].thread), &thattr, threadFunc, (void *)(pInfo + i));
...@@ -74,9 +94,15 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -74,9 +94,15 @@ int32_t main(int32_t argc, char *argv[]) {
pthread_join(pInfo[i].thread, NULL); pthread_join(pInfo[i].thread, NULL);
} }
int64_t maxDelay = 0;
int64_t minDelay = INT64_MAX;
float createTableSpeed = 0; float createTableSpeed = 0;
for (int32_t i = 0; i < numOfThreads; ++i) { for (int32_t i = 0; i < numOfThreads; ++i) {
createTableSpeed += pInfo[i].createTableSpeed; createTableSpeed += pInfo[i].createTableSpeed;
if (pInfo[i].maxDelay > maxDelay) maxDelay = pInfo[i].maxDelay;
if (pInfo[i].minDelay < minDelay) minDelay = pInfo[i].minDelay;
} }
float insertDataSpeed = 0; float insertDataSpeed = 0;
...@@ -84,10 +110,19 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -84,10 +110,19 @@ int32_t main(int32_t argc, char *argv[]) {
insertDataSpeed += pInfo[i].insertDataSpeed; insertDataSpeed += pInfo[i].insertDataSpeed;
} }
pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d %s", GREEN, numOfTables, createTableSpeed, pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d, maxDelay: %" PRId64 "us, minDelay: %" PRId64 "us %s",
numOfThreads, NC); GREEN,
pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed, numOfTables,
createTableSpeed,
numOfThreads,
maxDelay,
minDelay,
NC);
if (insertData) {
pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed,
numOfThreads, NC); numOfThreads, NC);
}
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
free(pInfo); free(pInfo);
...@@ -99,36 +134,36 @@ void createDbAndStb() { ...@@ -99,36 +134,36 @@ void createDbAndStb() {
TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (con == NULL) { if (con == NULL) {
pError("failed to connect to DB, reason:%s", taos_errstr(con)); pError("failed to connect to DB, reason:%s", taos_errstr(NULL));
exit(1); exit(1);
} }
sprintf(qstr, "create database if not exists %s vgroups %d", dbName, numOfVgroups); sprintf(qstr, "create database if not exists %s vgroups %d", dbName, numOfVgroups);
TAOS_RES *pSql = taos_query(con, qstr); TAOS_RES *pRes = taos_query(con, qstr);
int32_t code = taos_errno(pSql); int32_t code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(con), taos_errstr(con)); pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(pRes), taos_errstr(pRes));
exit(0); exit(0);
} }
taos_free_result(pSql); taos_free_result(pRes);
sprintf(qstr, "use %s", dbName); sprintf(qstr, "use %s", dbName);
pSql = taos_query(con, qstr); pRes = taos_query(con, qstr);
code = taos_errno(pSql); code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con)); pError("failed to use db, code:%d reason:%s", taos_errno(pRes), taos_errstr(pRes));
exit(0); exit(0);
} }
taos_free_result(pSql); taos_free_result(pRes);
sprintf(qstr, "create table %s (ts timestamp, i int) tags (j int)", stbName); sprintf(qstr, "create table %s (ts timestamp, i int) tags (j int)", stbName);
pSql = taos_query(con, qstr); pRes = taos_query(con, qstr);
code = taos_errno(pSql); code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con)); pError("failed to use db, code:%d reason:%s", taos_errno(pRes), taos_errstr(pRes));
exit(0); exit(0);
} }
taos_free_result(pSql); taos_free_result(pRes);
taos_close(con); taos_close(con);
} }
...@@ -160,16 +195,20 @@ void *threadFunc(void *param) { ...@@ -160,16 +195,20 @@ void *threadFunc(void *param) {
TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (con == NULL) { if (con == NULL) {
pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(con)); pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(NULL));
exit(1); exit(1);
} }
//printf("thread:%d, table range: %"PRId64 " - %"PRId64 "\n", pInfo->threadIndex, pInfo->tableBeginIndex, pInfo->tableEndIndex);
sprintf(qstr, "use %s", pInfo->dbName); sprintf(qstr, "use %s", pInfo->dbName);
TAOS_RES *pSql = taos_query(con, qstr); TAOS_RES *pRes = taos_query(con, qstr);
taos_free_result(pSql); taos_free_result(pRes);
if (createTable) { if (createTable) {
pInfo->startMs = taosGetTimestampMs(); int64_t curMs = 0;
int64_t beginMs = taosGetTimestampMs();
pInfo->startMs = beginMs;
for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
int64_t batch = (pInfo->tableEndIndex - t); int64_t batch = (pInfo->tableEndIndex - t);
batch = MIN(batch, batchNum); batch = MIN(batch, batchNum);
...@@ -179,14 +218,22 @@ void *threadFunc(void *param) { ...@@ -179,14 +218,22 @@ void *threadFunc(void *param) {
len += sprintf(qstr + len, " t%" PRId64 " using %s tags(%" PRId64 ")", t + i, stbName, t + i); len += sprintf(qstr + len, " t%" PRId64 " using %s tags(%" PRId64 ")", t + i, stbName, t + i);
} }
TAOS_RES *pSql = taos_query(con, qstr); int64_t startTs = taosGetTimestampUs();
code = taos_errno(pSql); TAOS_RES *pRes = taos_query(con, qstr);
code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code)); pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code));
} }
taos_free_result(pSql); taos_free_result(pRes);
int64_t endTs = taosGetTimestampUs();
if (t % 100000 == 0) { int64_t delay = endTs - startTs;
//printf("==== %"PRId64" - %"PRId64", %"PRId64"\n", startTs, endTs, delay);
if (delay > pInfo->maxDelay) pInfo->maxDelay = delay;
if (delay < pInfo->minDelay) pInfo->minDelay = delay;
curMs = taosGetTimestampMs();
if (curMs - beginMs > 10000) {
beginMs = curMs;
printCreateProgress(pInfo, t); printCreateProgress(pInfo, t);
} }
t += (batch - 1); t += (batch - 1);
...@@ -195,6 +242,9 @@ void *threadFunc(void *param) { ...@@ -195,6 +242,9 @@ void *threadFunc(void *param) {
} }
if (insertData) { if (insertData) {
int64_t curMs = 0;
int64_t beginMs = taosGetTimestampMs();;
pInfo->startMs = taosGetTimestampMs(); pInfo->startMs = taosGetTimestampMs();
for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
int64_t batch = (pInfo->tableEndIndex - t); int64_t batch = (pInfo->tableEndIndex - t);
...@@ -205,14 +255,15 @@ void *threadFunc(void *param) { ...@@ -205,14 +255,15 @@ void *threadFunc(void *param) {
len += sprintf(qstr + len, " t%" PRId64 " values(now, %" PRId64 ")", t + i, t + i); len += sprintf(qstr + len, " t%" PRId64 " values(now, %" PRId64 ")", t + i, t + i);
} }
TAOS_RES *pSql = taos_query(con, qstr); TAOS_RES *pRes = taos_query(con, qstr);
code = taos_errno(pSql); code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
pError("failed to insert table t%" PRId64 ", reason:%s", t, tstrerror(code)); pError("failed to insert table t%" PRId64 ", reason:%s", t, tstrerror(code));
} }
taos_free_result(pSql); taos_free_result(pRes);
if (t % 100000 == 0) { curMs = taosGetTimestampMs();
if (curMs - beginMs > 10000) {
printInsertProgress(pInfo, t); printInsertProgress(pInfo, t);
} }
t += (batch - 1); t += (batch - 1);
...@@ -266,7 +317,7 @@ void parseArgument(int32_t argc, char *argv[]) { ...@@ -266,7 +317,7 @@ void parseArgument(int32_t argc, char *argv[]) {
numOfThreads = atoi(argv[++i]); numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0) { } else if (strcmp(argv[i], "-n") == 0) {
numOfTables = atoll(argv[++i]); numOfTables = atoll(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0) { } else if (strcmp(argv[i], "-v") == 0) {
numOfVgroups = atoi(argv[++i]); numOfVgroups = atoi(argv[++i]);
} else if (strcmp(argv[i], "-a") == 0) { } else if (strcmp(argv[i], "-a") == 0) {
createTable = atoi(argv[++i]); createTable = atoi(argv[++i]);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册