asyncdemo.c 7.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

// TAOS asynchronous API example
// this example opens multiple tables, insert/retrieve multiple tables
// it is used by TAOS internally for one performance testing
// to compiple: gcc -o asyncdemo asyncdemo.c -ltaos

#include <stdio.h>
#include <stdlib.h>
Z
zhaoyanggh 已提交
23
#include <string.h>
H
hzcheng 已提交
24 25 26
#include <sys/time.h>
#include <unistd.h>

27
#include <taos.h>
H
hzcheng 已提交
28 29 30

int     points = 5;
int     numOfTables = 3;
D
dapan1121 已提交
31 32
int     tablesInsertProcessed = 0;
int     tablesSelectProcessed = 0;
H
hzcheng 已提交
33 34 35
int64_t st, et;

typedef struct {
Z
zhaoyanggh 已提交
36 37 38 39 40 41 42 43
  int    id;
  TAOS * taos;
  char   name[16];
  time_t timeStamp;
  int    value;
  int    rowsInserted;
  int    rowsTried;
  int    rowsRetrieved;
H
hzcheng 已提交
44 45 46 47 48 49
} STable;

void taos_insert_call_back(void *param, TAOS_RES *tres, int code);
void taos_select_call_back(void *param, TAOS_RES *tres, int code);
void taos_error(TAOS *taos);

50
static void queryDB(TAOS *taos, char *command) {
Z
zhaoyanggh 已提交
51
  int       i;
52 53 54 55 56 57 58 59
  TAOS_RES *pSql = NULL;
  int32_t   code = -1;

  for (i = 0; i < 5; i++) {
    if (NULL != pSql) {
      taos_free_result(pSql);
      pSql = NULL;
    }
Z
zhaoyanggh 已提交
60

61 62 63 64
    pSql = taos_query(taos, command);
    code = taos_errno(pSql);
    if (0 == code) {
      break;
Z
zhaoyanggh 已提交
65
    }
66 67 68 69 70 71
  }

  if (code != 0) {
    fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql));
    taos_free_result(pSql);
    taos_close(taos);
H
Hui Li 已提交
72
    taos_cleanup();
73 74 75 76 77 78
    exit(EXIT_FAILURE);
  }

  taos_free_result(pSql);
}

Z
zhaoyanggh 已提交
79 80 81 82 83 84 85 86
int main(int argc, char *argv[]) {
  TAOS *         taos;
  struct timeval systemTime;
  int            i;
  char           sql[1024] = {0};
  char           prefix[20] = {0};
  char           db[128] = {0};
  STable *       tableList;
H
hzcheng 已提交
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102

  if (argc != 5) {
    printf("usage: %s server-ip dbname rowsPerTable numOfTables\n", argv[0]);
    exit(0);
  }

  // a simple way to parse input parameters
  if (argc >= 3) strcpy(db, argv[2]);
  if (argc >= 4) points = atoi(argv[3]);
  if (argc >= 5) numOfTables = atoi(argv[4]);

  size_t size = sizeof(STable) * (size_t)numOfTables;
  tableList = (STable *)malloc(size);
  memset(tableList, 0, size);

  taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
Z
zhaoyanggh 已提交
103
  if (taos == NULL) taos_error(taos);
H
hzcheng 已提交
104 105 106

  printf("success to connect to server\n");

107 108
  sprintf(sql, "drop database if exists %s", db);
  queryDB(taos, sql);
H
hzcheng 已提交
109 110

  sprintf(sql, "create database %s", db);
111
  queryDB(taos, sql);
H
hzcheng 已提交
112 113

  sprintf(sql, "use %s", db);
114
  queryDB(taos, sql);
H
hzcheng 已提交
115 116 117 118 119 120 121

  strcpy(prefix, "asytbl_");
  for (i = 0; i < numOfTables; ++i) {
    tableList[i].id = i;
    tableList[i].taos = taos;
    sprintf(tableList[i].name, "%s%d", prefix, i);
    sprintf(sql, "create table %s%d (ts timestamp, volume bigint)", prefix, i);
122
    queryDB(taos, sql);
Z
zhaoyanggh 已提交
123
  }
H
hzcheng 已提交
124 125 126 127 128 129 130 131 132 133

  gettimeofday(&systemTime, NULL);
  for (i = 0; i < numOfTables; ++i)
    tableList[i].timeStamp = (time_t)(systemTime.tv_sec) * 1000 + systemTime.tv_usec / 1000;

  printf("success to create tables, press any key to insert\n");
  getchar();

  printf("start to insert...\n");
  gettimeofday(&systemTime, NULL);
H
huili 已提交
134
  st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
H
hzcheng 已提交
135

D
dapan1121 已提交
136 137 138
  tablesInsertProcessed = 0;
  tablesSelectProcessed = 0;

Z
zhaoyanggh 已提交
139
  for (i = 0; i < numOfTables; ++i) {
H
hzcheng 已提交
140
    // insert records in asynchronous API
H
huili 已提交
141
    sprintf(sql, "insert into %s values(%ld, 0)", tableList[i].name, 1546300800000 + i);
H
hzcheng 已提交
142 143 144 145 146 147
    taos_query_a(taos, sql, taos_insert_call_back, (void *)(tableList + i));
  }

  printf("once insert finished, presse any key to query\n");
  getchar();

Z
zhaoyanggh 已提交
148
  while (1) {
D
dapan1121 已提交
149
    if (tablesInsertProcessed < numOfTables) {
Z
zhaoyanggh 已提交
150 151 152 153
      printf("wait for process finished\n");
      sleep(1);
      continue;
    }
D
dapan1121 已提交
154 155 156 157

    break;
  }

H
hzcheng 已提交
158 159 160
  printf("start to query...\n");
  gettimeofday(&systemTime, NULL);
  st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
D
dapan1121 已提交
161

H
hzcheng 已提交
162
  for (i = 0; i < numOfTables; ++i) {
Z
zhaoyanggh 已提交
163
    // select records in asynchronous API
H
hzcheng 已提交
164 165 166 167 168 169 170
    sprintf(sql, "select * from %s", tableList[i].name);
    taos_query_a(taos, sql, taos_select_call_back, (void *)(tableList + i));
  }

  printf("\nonce finished, press any key to exit\n");
  getchar();

Z
zhaoyanggh 已提交
171
  while (1) {
D
dapan1121 已提交
172
    if (tablesSelectProcessed < numOfTables) {
Z
zhaoyanggh 已提交
173 174 175 176
      printf("wait for process finished\n");
      sleep(1);
      continue;
    }
P
Ping Xiao 已提交
177 178 179 180

    break;
  }

Z
zhaoyanggh 已提交
181
  for (i = 0; i < numOfTables; ++i) {
D
dapan1121 已提交
182 183 184
    printf("%s inserted:%d retrieved:%d\n", tableList[i].name, tableList[i].rowsInserted, tableList[i].rowsRetrieved);
  }

H
hzcheng 已提交
185 186 187 188 189 190 191 192
  taos_close(taos);
  free(tableList);

  printf("==== async demo end====\n");
  printf("\n");
  return 0;
}

Z
zhaoyanggh 已提交
193
void taos_error(TAOS *con) {
H
hzcheng 已提交
194 195
  fprintf(stderr, "TDengine error: %s\n", taos_errstr(con));
  taos_close(con);
H
Hui Li 已提交
196
  taos_cleanup();
H
hzcheng 已提交
197 198 199
  exit(1);
}

Z
zhaoyanggh 已提交
200 201 202 203
void taos_insert_call_back(void *param, TAOS_RES *tres, int code) {
  STable *       pTable = (STable *)param;
  struct timeval systemTime;
  char           sql[128];
H
hzcheng 已提交
204 205 206

  pTable->rowsTried++;

Z
zhaoyanggh 已提交
207
  if (code < 0) {
H
hzcheng 已提交
208
    printf("%s insert failed, code:%d, rows:%d\n", pTable->name, code, pTable->rowsTried);
Z
zhaoyanggh 已提交
209
  } else if (code == 0) {
H
hzcheng 已提交
210
    printf("%s not inserted\n", pTable->name);
Z
zhaoyanggh 已提交
211
  } else {
H
hzcheng 已提交
212 213 214 215 216
    pTable->rowsInserted++;
  }

  if (pTable->rowsTried < points) {
    // for this demo, insert another record
Z
zhaoyanggh 已提交
217 218
    sprintf(sql, "insert into %s values(%ld, %d)", pTable->name, 1546300800000 + pTable->rowsTried * 1000,
            pTable->rowsTried);
H
hzcheng 已提交
219
    taos_query_a(pTable->taos, sql, taos_insert_call_back, (void *)pTable);
Z
zhaoyanggh 已提交
220
  } else {
H
hzcheng 已提交
221
    printf("%d rows data are inserted into %s\n", points, pTable->name);
D
dapan1121 已提交
222 223
    tablesInsertProcessed++;
    if (tablesInsertProcessed >= numOfTables) {
H
hzcheng 已提交
224 225
      gettimeofday(&systemTime, NULL);
      et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
Z
zhaoyanggh 已提交
226
      printf("%lld mseconds to insert %d data points\n", (et - st) / 1000, points * numOfTables);
H
hzcheng 已提交
227 228
    }
  }
Z
zhaoyanggh 已提交
229

230
  taos_free_result(tres);
H
hzcheng 已提交
231 232
}

Z
zhaoyanggh 已提交
233 234
void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows) {
  STable *       pTable = (STable *)param;
H
hzcheng 已提交
235 236 237
  struct timeval systemTime;

  if (numOfRows > 0) {
Z
zhaoyanggh 已提交
238
    for (int i = 0; i < numOfRows; ++i) {
H
hzcheng 已提交
239
      // synchronous API to retrieve a row from batch of records
Z
zhaoyanggh 已提交
240
      /*TAOS_ROW row = */ (void)taos_fetch_row(tres);
H
hzcheng 已提交
241 242 243 244 245 246 247 248
      // process row
    }

    pTable->rowsRetrieved += numOfRows;

    // retrieve next batch of rows
    taos_fetch_rows_a(tres, taos_retrieve_call_back, pTable);

Z
zhaoyanggh 已提交
249 250
  } else {
    if (numOfRows < 0) printf("%s retrieve failed, code:%d\n", pTable->name, numOfRows);
H
hzcheng 已提交
251

Z
zhaoyanggh 已提交
252
    // taos_free_result(tres);
H
hzcheng 已提交
253 254
    printf("%d rows data retrieved from %s\n", pTable->rowsRetrieved, pTable->name);

D
dapan1121 已提交
255 256
    tablesSelectProcessed++;
    if (tablesSelectProcessed >= numOfTables) {
H
hzcheng 已提交
257 258 259 260
      gettimeofday(&systemTime, NULL);
      et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
      printf("%lld mseconds to query %d data rows\n", (et - st) / 1000, points * numOfTables);
    }
D
dapan1121 已提交
261 262

    taos_free_result(tres);
H
hzcheng 已提交
263 264 265
  }
}

Z
zhaoyanggh 已提交
266
void taos_select_call_back(void *param, TAOS_RES *tres, int code) {
H
hzcheng 已提交
267 268 269 270 271
  STable *pTable = (STable *)param;

  if (code == 0 && tres) {
    // asynchronous API to fetch a batch of records
    taos_fetch_rows_a(tres, taos_retrieve_call_back, pTable);
Z
zhaoyanggh 已提交
272
  } else {
H
hzcheng 已提交
273
    printf("%s select failed, code:%d\n", pTable->name, code);
H
Hui Li 已提交
274 275
    taos_free_result(tres);
    taos_cleanup();
H
hzcheng 已提交
276 277 278
    exit(1);
  }
}