提交 3e64ef25 编写于 作者: S Shuduo Sang

[TD-3192] <feature>: support stb limit and offset. generate data refactor.

上级 7f2116fb
...@@ -98,6 +98,8 @@ extern char configDir[]; ...@@ -98,6 +98,8 @@ extern char configDir[];
#define MAX_DATABASE_COUNT 256 #define MAX_DATABASE_COUNT 256
#define INPUT_BUF_LEN 256 #define INPUT_BUF_LEN 256
#define DEFAULT_TIMESTAMP_STEP 10
typedef enum CREATE_SUB_TALBE_MOD_EN { typedef enum CREATE_SUB_TALBE_MOD_EN {
PRE_CREATE_SUBTBL, PRE_CREATE_SUBTBL,
AUTO_CREATE_SUBTBL, AUTO_CREATE_SUBTBL,
...@@ -3307,7 +3309,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3307,7 +3309,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
if (timestampStep && timestampStep->type == cJSON_Number) { if (timestampStep && timestampStep->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].timeStampStep = timestampStep->valueint; g_Dbs.db[i].superTbls[j].timeStampStep = timestampStep->valueint;
} else if (!timestampStep) { } else if (!timestampStep) {
g_Dbs.db[i].superTbls[j].timeStampStep = 1000; g_Dbs.db[i].superTbls[j].timeStampStep = DEFAULT_TIMESTAMP_STEP;
} else { } else {
printf("ERROR: failed to read json, timestamp_step not found\n"); printf("ERROR: failed to read json, timestamp_step not found\n");
goto PARSE_OVER; goto PARSE_OVER;
...@@ -4344,217 +4346,233 @@ static int execInsert(threadInfo *winfo, char *buffer, int k) ...@@ -4344,217 +4346,233 @@ static int execInsert(threadInfo *winfo, char *buffer, int k)
return affectedRows; return affectedRows;
} }
// sync insertion static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *buffer,
/* int64_t insertRows,
1 thread: 100 tables * 2000 rows/s int64_t startFrom, int64_t startTime, int *pSampleUsePos)
1 thread: 10 tables * 20000 rows/s {
6 thread: 300 tables * 2000 rows/s SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s
*/
static void* syncWrite(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo;
int ncols_per_record = 1; // count first col ts int ncols_per_record = 1; // count first col ts
int samplePos = 0; if (superTblInfo == NULL) {
if (superTblInfo) {
if (0 != prepareSampleData(superTblInfo))
return NULL;
if (superTblInfo->numberOfTblInOneSql > 0) {
syncWriteForNumberOfTblInOneSql(winfo, superTblInfo->sampleDataBuf);
tmfree(superTblInfo->sampleDataBuf);
return NULL;
}
} else {
int datatypeSeq = 0; int datatypeSeq = 0;
while(g_args.datatype[datatypeSeq]) { while(g_args.datatype[datatypeSeq]) {
datatypeSeq ++; datatypeSeq ++;
ncols_per_record ++; ncols_per_record ++;
} }
}
char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1);
if (NULL == buffer) {
fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n",
superTblInfo->maxSqlLen,
strerror(errno));
tmfree(superTblInfo->sampleDataBuf);
return NULL;
}
int64_t lastPrintTime = taosGetTimestampMs();
int64_t startTs = taosGetTimestampUs();
int64_t endTs;
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
uint64_t st = 0;
uint64_t et = 0xffffffff;
winfo->totalInsertRows = 0;
winfo->totalAffectedRows = 0;
int sampleUsePos;
if (superTblInfo && superTblInfo->childTblLimit ) {
// TODO
} }
for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len);
tID++) {
int64_t start_time = winfo->start_time;
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows);
for (int64_t i = 0; i < insertRows;) {
int64_t prepared = i;
if (insert_interval) {
st = taosGetTimestampUs();
}
sampleUsePos = samplePos;
memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len);
char *pstr = buffer; char *pstr = buffer;
if (superTblInfo) {
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { if (superTblInfo) {
char* tagsValBuf = NULL; if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
if (0 == superTblInfo->tagSource) { char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagVaulesForStb(superTblInfo); tagsValBuf = generateTagVaulesForStb(superTblInfo);
} else { } else {
tagsValBuf = getTagValueFromTagSample( tagsValBuf = getTagValueFromTagSample(
superTblInfo, superTblInfo,
tID % superTblInfo->tagSampleCount); threadID % superTblInfo->tagSampleCount);
} }
if (NULL == tagsValBuf) { if (NULL == tagsValBuf) {
goto free_and_statistics_2; fprintf(stderr, "tag buf failed to allocate memory\n");
} return -1;
}
pstr += snprintf(pstr, pstr += snprintf(pstr,
superTblInfo->maxSqlLen, superTblInfo->maxSqlLen,
"insert into %s.%s%d using %s.%s tags %s values", "insert into %s.%s%d using %s.%s tags %s values",
winfo->db_name, pThreadInfo->db_name,
superTblInfo->childTblPrefix, superTblInfo->childTblPrefix,
tID, threadID,
winfo->db_name, pThreadInfo->db_name,
superTblInfo->sTblName, superTblInfo->sTblName,
tagsValBuf); tagsValBuf);
tmfree(tagsValBuf); tmfree(tagsValBuf);
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
pstr += snprintf(pstr, pstr += snprintf(pstr,
superTblInfo->maxSqlLen, superTblInfo->maxSqlLen,
"insert into %s.%s values", "insert into %s.%s values",
winfo->db_name, pThreadInfo->db_name,
superTblInfo->childTblName + tID * TSDB_TABLE_NAME_LEN); superTblInfo->childTblName + threadID * TSDB_TABLE_NAME_LEN);
} else { } else {
pstr += snprintf(pstr, pstr += snprintf(pstr,
(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len),
"insert into %s.%s%d values", "insert into %s.%s%d values",
winfo->db_name, pThreadInfo->db_name,
superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix,
tID); threadID);
} }
} else { } else {
pstr += snprintf(pstr,
pstr += snprintf(pstr,
(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len),
"insert into %s.%s%d values", "insert into %s.%s%d values",
winfo->db_name, pThreadInfo->db_name,
superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix,
tID); threadID);
} }
int k;
int len = 0;
verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); int k;
for (k = 0; k < g_args.num_of_RPR;) { int len = 0;
if (superTblInfo) { verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR);
for (k = 0; k < g_args.num_of_RPR;) {
if (superTblInfo) {
int retLen = 0; int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
retLen = getRowDataFromSample( retLen = getRowDataFromSample(
pstr + len, pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
start_time + superTblInfo->timeStampStep * i, startTime + superTblInfo->timeStampStep * startFrom,
superTblInfo, superTblInfo,
&sampleUsePos); pSampleUsePos);
} else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) {
int rand_num = rand_tinyint() % 100; int rand_num = rand_tinyint() % 100;
if (0 != superTblInfo->disorderRatio if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) { && rand_num < superTblInfo->disorderRatio) {
int64_t d = start_time - rand() % superTblInfo->disorderRange; int64_t d = startTime - rand() % superTblInfo->disorderRange;
retLen = generateRowData( retLen = generateRowData(
pstr + len, pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
d, d,
superTblInfo); superTblInfo);
//printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, start_time, d); //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, start_time, d);
} else { } else {
retLen = generateRowData( retLen = generateRowData(
pstr + len, pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
start_time + superTblInfo->timeStampStep * i, startTime + superTblInfo->timeStampStep * startFrom,
superTblInfo); superTblInfo);
} }
if (retLen < 0) { if (retLen < 0) {
goto free_and_statistics_2; return -1;
} }
len += retLen; len += retLen;
} }
} else { } else {
int rand_num = rand() % 100; int rand_num = rand() % 100;
char data[MAX_DATA_SIZE]; char data[MAX_DATA_SIZE];
char **data_type = g_args.datatype; char **data_type = g_args.datatype;
int lenOfBinary = g_args.len_of_binary; int lenOfBinary = g_args.len_of_binary;
if ((g_args.disorderRatio != 0) if ((g_args.disorderRatio != 0)
&& (rand_num < g_args.disorderRange)) { && (rand_num < g_args.disorderRange)) {
int64_t d = start_time - rand() % 1000000 + rand_num; int64_t d = startTime - rand() % 1000000 + rand_num;
len = generateData(data, data_type, len = generateData(data, data_type,
ncols_per_record, d, lenOfBinary); ncols_per_record, d, lenOfBinary);
} else { } else {
len = generateData(data, data_type, len = generateData(data, data_type,
ncols_per_record, start_time += 1000, lenOfBinary); ncols_per_record,
} startTime + DEFAULT_TIMESTAMP_STEP * startFrom,
lenOfBinary);
}
//assert(len + pstr - buffer < BUFFER_SIZE); //assert(len + pstr - buffer < BUFFER_SIZE);
if (len + pstr - buffer >= g_args.max_sql_len) { // too long if (len + pstr - buffer >= g_args.max_sql_len) { // too long
break; break;
} }
pstr += sprintf(pstr, " %s", data); pstr += sprintf(pstr, " %s", data);
} }
verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer);
k++;
startFrom ++;
verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer); debugPrint("%s() LN%d k=%d startFrom=%ld insertRows=%ld\n", __func__, __LINE__, k, startFrom, insertRows);
if (startFrom >= insertRows)
break;
}
prepared ++; return k;
k++; }
i++;
if (prepared >= insertRows) // sync insertion
break; /*
1 thread: 100 tables * 2000 rows/s
1 thread: 10 tables * 20000 rows/s
6 thread: 300 tables * 2000 rows/s
2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s
*/
static void* syncWrite(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo;
if (superTblInfo) {
if (0 != prepareSampleData(superTblInfo))
return NULL;
if (superTblInfo->numberOfTblInOneSql > 0) {
syncWriteForNumberOfTblInOneSql(winfo, superTblInfo->sampleDataBuf);
tmfree(superTblInfo->sampleDataBuf);
return NULL;
}
}
int samplePos = 0;
char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1);
if (NULL == buffer) {
fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n",
superTblInfo->maxSqlLen,
strerror(errno));
tmfree(superTblInfo->sampleDataBuf);
return NULL;
}
int64_t lastPrintTime = taosGetTimestampMs();
int64_t startTs = taosGetTimestampUs();
int64_t endTs;
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
uint64_t st = 0;
uint64_t et = 0xffffffff;
winfo->totalInsertRows = 0;
winfo->totalAffectedRows = 0;
int sampleUsePos;
if (superTblInfo && superTblInfo->childTblLimit ) {
// TODO
}
for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id;
tID++) {
int64_t start_time = winfo->start_time;
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows);
for (int64_t i = 0; i < insertRows;) {
if (insert_interval) {
st = taosGetTimestampUs();
} }
int affectedRows = execInsert(winfo, buffer, k); sampleUsePos = samplePos;
int generated = generateDataBuffer(tID, winfo, buffer, insertRows,
i, start_time, &sampleUsePos);
if (generated > 0)
i += generated;
else
goto free_and_statistics_2;
int affectedRows = execInsert(winfo, buffer, generated);
if (affectedRows < 0) if (affectedRows < 0)
goto free_and_statistics_2; goto free_and_statistics_2;
winfo->totalInsertRows += k; winfo->totalInsertRows += generated;
winfo->totalAffectedRows += affectedRows; winfo->totalAffectedRows += affectedRows;
endTs = taosGetTimestampUs(); endTs = taosGetTimestampUs();
...@@ -4573,7 +4591,7 @@ static void* syncWrite(void *sarg) { ...@@ -4573,7 +4591,7 @@ static void* syncWrite(void *sarg) {
lastPrintTime = currentPrintTime; lastPrintTime = currentPrintTime;
} }
if (prepared >= insertRows) if (i >= insertRows)
break; break;
if (insert_interval) { if (insert_interval) {
...@@ -4581,7 +4599,7 @@ static void* syncWrite(void *sarg) { ...@@ -4581,7 +4599,7 @@ static void* syncWrite(void *sarg) {
if (insert_interval > ((et - st)/1000) ) { if (insert_interval > ((et - st)/1000) ) {
int sleep_time = insert_interval - (et -st)/1000; int sleep_time = insert_interval - (et -st)/1000;
printf("sleep: %d ms for insert interval\n", sleep_time); verbosePrint("%s() LN%d sleep: %d ms for insert interval\n", __func__, __LINE__, sleep_time);
taosMsleep(sleep_time); // ms taosMsleep(sleep_time); // ms
} }
} }
...@@ -5675,7 +5693,7 @@ void setParaFromArg(){ ...@@ -5675,7 +5693,7 @@ void setParaFromArg(){
tstrncpy(g_Dbs.db[0].superTbls[0].insertMode, "taosc", MAX_TB_NAME_SIZE); tstrncpy(g_Dbs.db[0].superTbls[0].insertMode, "taosc", MAX_TB_NAME_SIZE);
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp, tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].timeStampStep = 10; g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP;
g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT;
g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE; g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册