提交 9e4f0679 编写于 作者: S Shuduo Sang

[TD-3147] <fix>: support insert interval. normal table working.

上级 7df0a866
...@@ -228,6 +228,7 @@ typedef struct SSuperTable_S { ...@@ -228,6 +228,7 @@ typedef struct SSuperTable_S {
int disorderRange; // ms or us by database precision int disorderRange; // ms or us by database precision
int maxSqlLen; // int maxSqlLen; //
int64_t insertRows; // 0: no limit
int timeStampStep; int timeStampStep;
char startTimestamp[MAX_TB_NAME_SIZE]; // char startTimestamp[MAX_TB_NAME_SIZE]; //
char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json
...@@ -1085,6 +1086,7 @@ static int printfInsertMeta() { ...@@ -1085,6 +1086,7 @@ static int printfInsertMeta() {
printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix); printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix);
printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource);
printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode); printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode);
printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows);
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
printf(" multiThreadWriteOneTbl: \033[33mno\033[0m\n"); printf(" multiThreadWriteOneTbl: \033[33mno\033[0m\n");
...@@ -1231,6 +1233,7 @@ static void printfInsertMetaToFile(FILE* fp) { ...@@ -1231,6 +1233,7 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix); fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix);
fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource);
fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode); fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode);
fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows);
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
fprintf(fp, " multiThreadWriteOneTbl: no\n"); fprintf(fp, " multiThreadWriteOneTbl: no\n");
...@@ -1991,7 +1994,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, ...@@ -1991,7 +1994,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
exit(-1); exit(-1);
} }
snprintf(superTbls->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols); snprintf(superTbls->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols);
debugPrint("%s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable); verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable);
if (use_metric) { if (use_metric) {
char tags[STRING_LEN] = "\0"; char tags[STRING_LEN] = "\0";
...@@ -2044,7 +2047,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, ...@@ -2044,7 +2047,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
snprintf(command, BUFFER_SIZE, snprintf(command, BUFFER_SIZE,
"create table if not exists %s.%s (ts timestamp%s) tags %s", "create table if not exists %s.%s (ts timestamp%s) tags %s",
dbName, superTbls->sTblName, cols, tags); dbName, superTbls->sTblName, cols, tags);
debugPrint("%s() LN%d: %s\n", __func__, __LINE__, command); verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
fprintf(stderr, "create supertable %s failed!\n\n", superTbls->sTblName); fprintf(stderr, "create supertable %s failed!\n\n", superTbls->sTblName);
...@@ -2237,7 +2240,7 @@ static void* createTable(void *sarg) ...@@ -2237,7 +2240,7 @@ static void* createTable(void *sarg)
} }
len = 0; len = 0;
debugPrint("%s() LN%d %s\n", __func__, __LINE__, buffer); verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){ if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){
free(buffer); free(buffer);
return NULL; return NULL;
...@@ -2252,7 +2255,7 @@ static void* createTable(void *sarg) ...@@ -2252,7 +2255,7 @@ static void* createTable(void *sarg)
} }
if (0 != len) { if (0 != len) {
debugPrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer); verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer);
(void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE); (void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE);
} }
...@@ -3228,6 +3231,20 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3228,6 +3231,20 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("failed to read json, disorderRange not found"); printf("failed to read json, disorderRange not found");
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows");
if (insertRows && insertRows->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint;
//if (0 == g_Dbs.db[i].superTbls[j].insertRows) {
// g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF;
//}
} else if (!insertRows) {
g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF;
} else {
printf("failed to read json, insert_rows not found");
goto PARSE_OVER;
}
if (NO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable if (NO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable
|| (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
continue; continue;
...@@ -3771,7 +3788,7 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -3771,7 +3788,7 @@ static void syncWriteForNumberOfTblInOneSql(
int64_t st = 0; int64_t st = 0;
int64_t et = 0; int64_t et = 0;
for (int i = 0; i < g_args.num_of_RPR;) { for (int i = 0; i < superTblInfo->insertRows;) {
int32_t tbl_id = 0; int32_t tbl_id = 0;
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) {
int inserted = i; int inserted = i;
...@@ -3894,7 +3911,7 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -3894,7 +3911,7 @@ static void syncWriteForNumberOfTblInOneSql(
k++; k++;
totalRowsInserted++; totalRowsInserted++;
if (inserted >= g_args.num_of_RPR || if (inserted >= superTblInfo->insertRows ||
(superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) {
tID = tbl_id + 1; tID = tbl_id + 1;
printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n", printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n",
...@@ -4080,10 +4097,13 @@ static void* syncWrite(void *sarg) { ...@@ -4080,10 +4097,13 @@ static void* syncWrite(void *sarg) {
uint64_t st = 0; uint64_t st = 0;
uint64_t et = 0; uint64_t et = 0;
for (int i = 0; i < g_args.num_of_DPT;) { winfo->totalRowsInserted = 0;
winfo->totalAffectedRows = 0;
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) {
int inserted = i; for (int i = 0; i < g_args.num_of_DPT;) {
int tblInserted = i;
int64_t tmp_time = time_counter; int64_t tmp_time = time_counter;
char *pstr = buffer; char *pstr = buffer;
...@@ -4112,13 +4132,15 @@ static void* syncWrite(void *sarg) { ...@@ -4112,13 +4132,15 @@ static void* syncWrite(void *sarg) {
} }
pstr += sprintf(pstr, " %s", data); pstr += sprintf(pstr, " %s", data);
inserted++; tblInserted++;
k++; k++;
i++;
if (inserted >= g_args.num_of_DPT) if (tblInserted >= g_args.num_of_DPT)
break; break;
} }
winfo->totalRowsInserted += k;
/* puts(buffer); */ /* puts(buffer); */
int64_t startTs; int64_t startTs;
int64_t endTs; int64_t endTs;
...@@ -4134,9 +4156,10 @@ static void* syncWrite(void *sarg) { ...@@ -4134,9 +4156,10 @@ static void* syncWrite(void *sarg) {
if (g_args.insert_interval) { if (g_args.insert_interval) {
st = taosGetTimestampMs(); st = taosGetTimestampMs();
} }
debugPrint("%s() LN%d %s\n", __func__, __LINE__, buffer); verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
int affectedRows = queryDbExec(winfo->taos, buffer, 1); int affectedRows = queryDbExec(winfo->taos, buffer, 1);
verbosePrint("%s() LN%d: affectedRows:%d\n", __func__, __LINE__, affectedRows);
if (0 <= affectedRows){ if (0 <= affectedRows){
endTs = taosGetTimestampUs(); endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs; int64_t delay = endTs - startTs;
...@@ -4146,20 +4169,26 @@ static void* syncWrite(void *sarg) { ...@@ -4146,20 +4169,26 @@ static void* syncWrite(void *sarg) {
winfo->minDelay = delay; winfo->minDelay = delay;
winfo->cntDelay++; winfo->cntDelay++;
winfo->totalDelay += delay; winfo->totalDelay += delay;
//winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; winfo->totalAffectedRows += affectedRows;
winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
} }
verbosePrint("%s() LN%d: totalaffectedRows:%"PRId64"\n", __func__, __LINE__, winfo->totalAffectedRows);
if (g_args.insert_interval) { if (g_args.insert_interval) {
et = taosGetTimestampMs(); et = taosGetTimestampMs();
} }
if (tID == winfo->end_table_id) { if (tblInserted >= g_args.num_of_DPT) {
i = inserted; break;
time_counter = tmp_time;
}
} }
} // num_of_DPT
} // tId
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
winfo->threadID,
winfo->totalRowsInserted,
winfo->totalAffectedRows);
}
return NULL; return NULL;
} }
...@@ -4225,11 +4254,14 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4225,11 +4254,14 @@ static void* syncWriteWithStb(void *sarg) {
int64_t time_counter = winfo->start_time; int64_t time_counter = winfo->start_time;
uint64_t st = 0; uint64_t st = 0;
uint64_t et = 0; uint64_t et = 0;
for (int i = 0; i < g_args.num_of_RPR;) {
debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__,
superTblInfo->insertRows);
for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id;
tID++) { tID++) {
int64_t inserted = 0; for (int i = 0; i < superTblInfo->insertRows;) {
int64_t inserted = i;
uint64_t tmp_time = time_counter; uint64_t tmp_time = time_counter;
if (i > 0 && g_args.insert_interval if (i > 0 && g_args.insert_interval
...@@ -4244,9 +4276,8 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4244,9 +4276,8 @@ static void* syncWriteWithStb(void *sarg) {
} }
int sampleUsePos = samplePos; int sampleUsePos = samplePos;
int k = 0;
verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR);
for (k = 0; k < g_args.num_of_RPR;) { for (int k = 0; k < g_args.num_of_RPR;) {
int len = 0; int len = 0;
memset(buffer, 0, superTblInfo->maxSqlLen); memset(buffer, 0, superTblInfo->maxSqlLen);
char *pstr = buffer; char *pstr = buffer;
...@@ -4329,10 +4360,14 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4329,10 +4360,14 @@ static void* syncWriteWithStb(void *sarg) {
k++; k++;
i++; i++;
totalRowsInserted++; totalRowsInserted++;
debugPrint("%s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); verbosePrint("%s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted);
if (inserted > g_args.num_of_RPR) if (inserted > superTblInfo->insertRows)
break; break;
/* if (inserted >= superTblInfo->insertRows
|| (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128))
break;
*/
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
//printf("===== sql: %s \n\n", buffer); //printf("===== sql: %s \n\n", buffer);
...@@ -4440,6 +4475,7 @@ void callBack(void *param, TAOS_RES *res, int code) { ...@@ -4440,6 +4475,7 @@ void callBack(void *param, TAOS_RES *res, int code) {
char *data = calloc(1, MAX_DATA_SIZE); char *data = calloc(1, MAX_DATA_SIZE);
char *pstr = buffer; char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id); pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id);
// if (winfo->counter >= winfo->superTblInfo->insertRows) {
if (winfo->counter >= g_args.num_of_RPR) { if (winfo->counter >= g_args.num_of_RPR) {
winfo->start_table_id++; winfo->start_table_id++;
winfo->counter = 0; winfo->counter = 0;
...@@ -4466,7 +4502,7 @@ void callBack(void *param, TAOS_RES *res, int code) { ...@@ -4466,7 +4502,7 @@ void callBack(void *param, TAOS_RES *res, int code) {
pstr += sprintf(pstr, "%s", data); pstr += sprintf(pstr, "%s", data);
winfo->counter++; winfo->counter++;
if (winfo->counter >= g_args.num_of_RPR) { if (winfo->counter >= winfo->superTblInfo->insertRows) {
break; break;
} }
} }
...@@ -4631,13 +4667,13 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, ...@@ -4631,13 +4667,13 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
if (superTblInfo) { if (superTblInfo) {
superTblInfo->totalAffectedRows += t_info->totalAffectedRows; superTblInfo->totalAffectedRows += t_info->totalAffectedRows;
superTblInfo->totalRowsInserted += t_info->totalRowsInserted; superTblInfo->totalRowsInserted += t_info->totalRowsInserted;
}
totalDelay += t_info->totalDelay; totalDelay += t_info->totalDelay;
cntDelay += t_info->cntDelay; cntDelay += t_info->cntDelay;
if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay;
if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; if (t_info->minDelay < minDelay) minDelay = t_info->minDelay;
} }
}
cntDelay -= 1; cntDelay -= 1;
if (cntDelay == 0) cntDelay = 1; if (cntDelay == 0) cntDelay = 1;
...@@ -4684,7 +4720,13 @@ void *readTable(void *sarg) { ...@@ -4684,7 +4720,13 @@ void *readTable(void *sarg) {
return NULL; return NULL;
} }
int num_of_DPT = g_args.num_of_DPT; int num_of_DPT;
/* if (rinfo->superTblInfo) {
num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table;
} else {
*/
num_of_DPT = g_args.num_of_DPT;
// }
int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1;
int totalData = num_of_DPT * num_of_tables; int totalData = num_of_DPT * num_of_tables;
...@@ -4747,7 +4789,7 @@ void *readMetric(void *sarg) { ...@@ -4747,7 +4789,7 @@ void *readMetric(void *sarg) {
return NULL; return NULL;
} }
int num_of_DPT = g_args.num_of_DPT; int num_of_DPT = rinfo->superTblInfo->insertRows;
int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1;
int totalData = num_of_DPT * num_of_tables; int totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc; bool do_aggreFunc = g_Dbs.do_aggreFunc;
...@@ -4866,7 +4908,7 @@ int insertTestProcess() { ...@@ -4866,7 +4908,7 @@ int insertTestProcess() {
if (g_Dbs.db[i].superTblCount > 0) { if (g_Dbs.db[i].superTblCount > 0) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j];
if (0 == g_args.num_of_DPT) { if (0 == g_Dbs.db[i].superTbls[j].insertRows) {
continue; continue;
} }
startMultiThreadInsertData( startMultiThreadInsertData(
...@@ -5491,6 +5533,7 @@ void setParaFromArg(){ ...@@ -5491,6 +5533,7 @@ void setParaFromArg(){
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].timeStampStep = 10; g_Dbs.db[0].superTbls[0].timeStampStep = 10;
g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT;
g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE; g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE;
g_Dbs.db[0].superTbls[0].columnCount = 0; g_Dbs.db[0].superTbls[0].columnCount = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册