未验证 提交 4a3619ec 编写于 作者: H huili 提交者: GitHub

Merge pull request #4196 from taosdata/hotfix/TD-1973

add async write
...@@ -475,6 +475,7 @@ typedef struct { ...@@ -475,6 +475,7 @@ typedef struct {
tsem_t mutex_sem; tsem_t mutex_sem;
int notFinished; int notFinished;
tsem_t lock_sem; tsem_t lock_sem;
int counter;
} info; } info;
typedef struct { typedef struct {
...@@ -766,6 +767,7 @@ int main(int argc, char *argv[]) { ...@@ -766,6 +767,7 @@ int main(int argc, char *argv[]) {
t_info->data_of_rate = rate; t_info->data_of_rate = rate;
t_info->end_table_id = i < b ? last + a : last + a - 1; t_info->end_table_id = i < b ? last + a : last + a - 1;
last = t_info->end_table_id + 1; last = t_info->end_table_id + 1;
t_info->counter = 0;
tsem_init(&(t_info->mutex_sem), 0, 1); tsem_init(&(t_info->mutex_sem), 0, 1);
t_info->notFinished = t_info->end_table_id - t_info->start_table_id + 1; t_info->notFinished = t_info->end_table_id - t_info->start_table_id + 1;
...@@ -788,14 +790,14 @@ int main(int argc, char *argv[]) { ...@@ -788,14 +790,14 @@ int main(int argc, char *argv[]) {
printf("ASYNC Insert with %d connections:\n", threads); printf("ASYNC Insert with %d connections:\n", threads);
} }
fprintf(fp, "|%10.d | %10.2f | %10.2f | %10.4f |\n\n", fprintf(fp, "|%"PRIu64" | %10.2f | %10.2f | %10.4f |\n\n",
ntables * nrecords_per_table, ntables * nrecords_per_table / t, (int64_t)ntables * nrecords_per_table, ntables * nrecords_per_table / t,
(ntables * nrecords_per_table) / (t * nrecords_per_request), ((int64_t)ntables * nrecords_per_table) / (t * nrecords_per_request),
t * 1000); t * 1000);
printf("Spent %.4f seconds to insert %lld records with %d record(s) per request: %.2f records/second\n", printf("Spent %.4f seconds to insert %"PRIu64" records with %d record(s) per request: %.2f records/second\n",
t, (long long int)ntables * nrecords_per_table, nrecords_per_request, t, (int64_t)ntables * nrecords_per_table, nrecords_per_request,
((long long int)ntables * nrecords_per_table) / t); (int64_t)ntables * nrecords_per_table / t);
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
info *t_info = infos + i; info *t_info = infos + i;
...@@ -1283,68 +1285,39 @@ void *syncWrite(void *sarg) { ...@@ -1283,68 +1285,39 @@ void *syncWrite(void *sarg) {
void *asyncWrite(void *sarg) { void *asyncWrite(void *sarg) {
info *winfo = (info *)sarg; info *winfo = (info *)sarg;
taos_query_a(winfo->taos, "show databases", callBack, winfo);
sTable *tb_infos = (sTable *)malloc(sizeof(sTable) * (winfo->end_table_id - winfo->start_table_id + 1));
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) {
sTable *tb_info = tb_infos + tID - winfo->start_table_id;
tb_info->data_type = winfo->datatype;
tb_info->ncols_per_record = winfo->ncols_per_record;
tb_info->taos = winfo->taos;
sprintf(tb_info->tb_name, "%s.%s%d", winfo->db_name, winfo->tb_prefix, tID);
tb_info->timestamp = winfo->start_time;
tb_info->counter = 0;
tb_info->target = winfo->nrecords_per_table;
tb_info->len_of_binary = winfo->len_of_binary;
tb_info->nrecords_per_request = winfo->nrecords_per_request;
tb_info->mutex_sem = &(winfo->mutex_sem);
tb_info->notFinished = &(winfo->notFinished);
tb_info->lock_sem = &(winfo->lock_sem);
tb_info->data_of_order = winfo->data_of_order;
tb_info->data_of_rate = winfo->data_of_rate;
/* char buff[BUFFER_SIZE] = "\0"; */
/* sprintf(buff, "insert into %s values (0, 0)", tb_info->tb_name); */
/* queryDB(tb_info->taos,buff); */
taos_query_a(winfo->taos, "show databases", callBack, tb_info);
}
tsem_wait(&(winfo->lock_sem)); tsem_wait(&(winfo->lock_sem));
free(tb_infos);
return NULL; return NULL;
} }
void callBack(void *param, TAOS_RES *res, int code) { void callBack(void *param, TAOS_RES *res, int code) {
sTable *tb_info = (sTable *)param; info* winfo = (info*)param;
char **datatype = tb_info->data_type; char **datatype = winfo->datatype;
int ncols_per_record = tb_info->ncols_per_record; int ncols_per_record = winfo->ncols_per_record;
int len_of_binary = tb_info->len_of_binary; int len_of_binary = winfo->len_of_binary;
int64_t tmp_time = tb_info->timestamp;
if (code < 0) {
fprintf(stderr, "failed to insert data %d:reason; %s\n", code, taos_errstr(res));
exit(EXIT_FAILURE);
}
// If finished; int64_t tmp_time = winfo->start_time;
if (tb_info->counter >= tb_info->target) { char *buffer = calloc(1, BUFFER_SIZE);
tsem_wait(tb_info->mutex_sem); char *data = calloc(1, MAX_DATA_SIZE);
(*(tb_info->notFinished))--; char *pstr = buffer;
if (*(tb_info->notFinished) == 0) tsem_post(tb_info->lock_sem); pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id);
tsem_post(tb_info->mutex_sem); if (winfo->counter >= winfo->nrecords_per_table) {
winfo->start_table_id++;
winfo->counter = 0;
}
if (winfo->start_table_id > winfo->end_table_id) {
tsem_post(&winfo->lock_sem);
free(buffer);
free(data);
taos_free_result(res);
return; return;
} }
char buffer[BUFFER_SIZE] = "\0"; for (int i = 0; i < winfo->nrecords_per_request; i++) {
char data[MAX_DATA_SIZE];
char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s values", tb_info->tb_name);
for (int i = 0; i < tb_info->nrecords_per_request; i++) {
int rand_num = rand() % 100; int rand_num = rand() % 100;
if (tb_info->data_of_order ==1 && rand_num < tb_info->data_of_rate) if (winfo->data_of_order ==1 && rand_num < winfo->data_of_rate)
{ {
int64_t d = tmp_time - rand() % 1000000 + rand_num; int64_t d = tmp_time - rand() % 1000000 + rand_num;
generateData(data, datatype, ncols_per_record, d, len_of_binary); generateData(data, datatype, ncols_per_record, d, len_of_binary);
...@@ -1353,15 +1326,15 @@ void callBack(void *param, TAOS_RES *res, int code) { ...@@ -1353,15 +1326,15 @@ void callBack(void *param, TAOS_RES *res, int code) {
generateData(data, datatype, ncols_per_record, tmp_time += 1000, len_of_binary); generateData(data, datatype, ncols_per_record, tmp_time += 1000, len_of_binary);
} }
pstr += sprintf(pstr, "%s", data); pstr += sprintf(pstr, "%s", data);
tb_info->counter++; winfo->counter++;
if (tb_info->counter >= tb_info->target) { if (winfo->counter >= winfo->nrecords_per_table) {
break; break;
} }
} }
tb_info->timestamp = tmp_time; taos_query_a(winfo->taos, buffer, callBack, winfo);
free(buffer);
taos_query_a(tb_info->taos, buffer, callBack, tb_info); free(data);
taos_free_result(res); taos_free_result(res);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册