未验证 提交 6b88d5db 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 5702 taosdemo remove memop (#7679)

* [TD-5702]<fix>: taosdemo remove memory operation.

* add remainderBufLen to check row data generation.

* row data generation with remainder buffer length checking.

* optimized memop for rand too.

* fix stmt rand file generation bug.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 bdfd8605
......@@ -5977,12 +5977,85 @@ static int64_t generateData(char *recBuf, char **data_type,
return (int32_t)strlen(recBuf);
}
static int generateSampleMemoryFromRand(SSuperTable *stbInfo)
{
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *buff = malloc(stbInfo->lenOfOneRow);
if (NULL == buff) {
errorPrint2("%s() LN%d, memory allocation %"PRId64" bytes failed\n",
__func__, __LINE__, stbInfo->lenOfOneRow);
exit(EXIT_FAILURE);
}
for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) {
uint64_t pos = 0;
memset(buff, 0, stbInfo->lenOfOneRow);
for (int c = 0; c < stbInfo->columnCount; c++) {
char *tmp;
if (0 == strncasecmp(stbInfo->columns[c].dataType,
"BINARY", strlen("BINARY"))) {
rand_string(data, stbInfo->columns[c].dataLen);
pos += sprintf(buff + pos, "%s,", data);
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"NCHAR", strlen("NCHAR"))) {
rand_string(data, stbInfo->columns[c].dataLen);
pos += sprintf(buff + pos, "%s,", data);
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"INT", strlen("INT"))) {
if ((g_args.demo_mode) && (c == 1)) {
tmp = demo_voltage_int_str();
} else {
tmp = rand_int_str();
}
pos += sprintf(buff + pos, "%s,", tmp);
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"BIGINT", strlen("BIGINT"))) {
pos += sprintf(buff + pos, "%s,", rand_bigint_str());
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"FLOAT", strlen("FLOAT"))) {
if (g_args.demo_mode) {
if (c == 0) {
tmp = demo_current_float_str();
} else {
tmp = demo_phase_float_str();
}
} else {
tmp = rand_float_str();
}
pos += sprintf(buff + pos, "%s,", tmp);
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"DOUBLE", strlen("DOUBLE"))) {
pos += sprintf(buff + pos, "%s,", rand_double_str());
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"SMALLINT", strlen("SMALLINT"))) {
pos += sprintf(buff + pos, "%s,", rand_smallint_str());
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"TINYINT", strlen("TINYINT"))) {
pos += sprintf(buff + pos, "%s,", rand_tinyint_str());
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"BOOL", strlen("BOOL"))) {
pos += sprintf(buff + pos, "%s,", rand_bool_str());
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"TIMESTAMP", strlen("TIMESTAMP"))) {
pos += sprintf(buff + pos, "%s,", rand_bigint_str());
}
}
*(buff + pos - 1) = 0;
memcpy(stbInfo->sampleDataBuf + i * stbInfo->lenOfOneRow, buff, pos);
}
free(buff);
return 0;
}
static int prepareSampleDataForSTable(SSuperTable *stbInfo) {
char* sampleDataBuf = NULL;
sampleDataBuf = calloc(
stbInfo->sampleDataBuf = calloc(
stbInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
if (sampleDataBuf == NULL) {
if (NULL == stbInfo->sampleDataBuf) {
errorPrint2("%s() LN%d, Failed to calloc %"PRIu64" Bytes, reason:%s\n",
__func__, __LINE__,
stbInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
......@@ -5990,13 +6063,16 @@ static int prepareSampleDataForSTable(SSuperTable *stbInfo) {
return -1;
}
stbInfo->sampleDataBuf = sampleDataBuf;
int ret = readSampleFromCsvFileToMem(stbInfo);
int ret;
if (0 == strncasecmp(stbInfo->dataSource, "sample", strlen("sample")))
ret = readSampleFromCsvFileToMem(stbInfo);
else
ret = generateSampleMemoryFromRand(stbInfo);
if (0 != ret) {
errorPrint2("%s() LN%d, read sample from csv file failed.\n",
__func__, __LINE__);
tmfree(sampleDataBuf);
tmfree(stbInfo->sampleDataBuf);
stbInfo->sampleDataBuf = NULL;
return -1;
}
......@@ -7714,11 +7790,14 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pstr += len;
remainderBufLen -= len;
// measure prepare + insert
startTs = taosGetTimestampUs();
int32_t generated;
if (stbInfo) {
if (stbInfo->iface == STMT_IFACE) {
if (sourceRand) {
generated = prepareStbStmtRand(
/* generated = prepareStbStmtRand(
pThreadInfo,
tableName,
tableSeq,
......@@ -7726,6 +7805,14 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
insertRows,
i, start_time
);
*/
generated = prepareStbStmtWithSample(
pThreadInfo,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows, i, start_time,
&(pThreadInfo->samplePos));
} else {
generated = prepareStbStmtWithSample(
pThreadInfo,
......@@ -7770,7 +7857,8 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
start_time += generated * timeStampStep;
pThreadInfo->totalInsertRows += generated;
startTs = taosGetTimestampUs();
// only measure insert
// startTs = taosGetTimestampUs();
int32_t affectedRows = execInsert(pThreadInfo, generated);
......@@ -7980,7 +8068,6 @@ static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec)
return -1;
}
for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) {
char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
if (bindArray == NULL) {
......@@ -7989,7 +8076,6 @@ static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec)
return -1;
}
TAOS_BIND *bind;
int cursor = 0;
......@@ -8078,11 +8164,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
debugPrint("%s() LN%d, start_time= %"PRId64"\n",
__func__, __LINE__, start_time);
int64_t start = taosGetTimestampMs();
// read sample data from file first
if ((stbInfo) && (0 == strncasecmp(stbInfo->dataSource,
"sample", strlen("sample")))) {
if (stbInfo) {
if (0 != prepareSampleDataForSTable(stbInfo)) {
errorPrint2("%s() LN%d, prepare sample data for stable failed!\n",
__func__, __LINE__);
......@@ -8230,8 +8313,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
debugPrint("%s() LN%d, stmtBuffer: %s", __func__, __LINE__, stmtBuffer);
if ((stbInfo) && (0 == strncasecmp(stbInfo->dataSource,
"sample", strlen("sample")))) {
if (stbInfo) {
parseSampleFileToStmt(stbInfo, timePrec);
}
}
......@@ -8316,6 +8398,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
free(stmtBuffer);
int64_t start = taosGetTimestampMs();
for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL);
}
......@@ -8358,22 +8442,22 @@ static void startMultiThreadInsertData(int threads, char* db_name,
if (cntDelay == 0) cntDelay = 1;
avgDelay = (double)totalDelay / cntDelay;
int64_t end = taosGetTimestampMs();
int64_t end = taosGetTimestampMs();
int64_t t = end - start;
double tInMs = t/1000.0;
double tInMs = (double) t / 1000.0;
if (stbInfo) {
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
fprintf(stderr, "Spent %.4f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
tInMs, stbInfo->totalInsertRows,
stbInfo->totalAffectedRows,
threads, db_name, stbInfo->sTblName,
(tInMs)?
(double) tInMs?
(double)(stbInfo->totalInsertRows/tInMs):FLT_MAX);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
"Spent %.4f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
tInMs, stbInfo->totalInsertRows,
stbInfo->totalAffectedRows,
threads, db_name, stbInfo->sTblName,
......@@ -8381,7 +8465,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
(double)(stbInfo->totalInsertRows/tInMs):FLT_MAX);
}
} else {
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
fprintf(stderr, "Spent %.4f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
tInMs, g_args.totalInsertRows,
g_args.totalAffectedRows,
threads, db_name,
......@@ -8389,7 +8473,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
(double)(g_args.totalInsertRows/tInMs):FLT_MAX);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
"Spent %.4f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
tInMs, g_args.totalInsertRows,
g_args.totalAffectedRows,
threads, db_name,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册