shellWebsocket.c 9.8 KB
Newer Older
Y
Yang Zhao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16

/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifdef WEBSOCKET
17 18
#include <taosws.h>
#include <shellInt.h>
Y
Yang Zhao 已提交
19 20

int shell_conn_ws_server(bool first) {
21 22 23 24 25 26
  char cuttedDsn[SHELL_WS_DSN_BUFF] = {0};
  int dsnLen = strlen(shell.args.dsn);
  snprintf(cuttedDsn,
           ((dsnLen-SHELL_WS_DSN_MASK) > SHELL_WS_DSN_BUFF)?
            SHELL_WS_DSN_BUFF:(dsnLen-SHELL_WS_DSN_MASK),
           "%s", shell.args.dsn);
27
  fprintf(stdout, "trying to connect %s****** ", cuttedDsn);
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
  fflush(stdout);
  for (int i = 0; i < shell.args.timeout; i++) {
    shell.ws_conn = ws_connect_with_dsn(shell.args.dsn);
    if (NULL == shell.ws_conn) {
      int errNo = ws_errno(NULL);
      if (0xE001 == errNo) {
        fprintf(stdout, ".");
        fflush(stdout);
        taosMsleep(1000);  // sleep 1 second then try again
        continue;
      } else {
        fprintf(stderr, "\nfailed to connect %s***, reason: %s\n",
            cuttedDsn, ws_errstr(NULL));
        return -1;
      }
    } else {
      break;
    }
  }
  if (NULL == shell.ws_conn) {
    fprintf(stdout, "\n timeout\n");
    fprintf(stderr, "\nfailed to connect %s***, reason: %s\n",
      cuttedDsn, ws_errstr(NULL));
wafwerar's avatar
wafwerar 已提交
51
    return -1;
52 53
  } else {
    fprintf(stdout, "\n");
wafwerar's avatar
wafwerar 已提交
54 55
  }
  if (first && shell.args.restful) {
56
    fprintf(stdout, "successfully connected to %s\n\n",
wafwerar's avatar
wafwerar 已提交
57 58
        shell.args.dsn);
  } else if (first && shell.args.cloud) {
59
    fprintf(stdout, "successfully connected to cloud service\n");
wafwerar's avatar
wafwerar 已提交
60
  }
61
  fflush(stdout);
wafwerar's avatar
wafwerar 已提交
62
  return 0;
Y
Yang Zhao 已提交
63 64
}

65
static int horizontalPrintWebsocket(WS_RES* wres, double* execute_time) {
Y
Yang Zhao 已提交
66 67 68
  const void* data = NULL;
  int rows;
  ws_fetch_block(wres, &data, &rows);
69 70 71
  if (wres) {
    *execute_time += (double)(ws_take_timing(wres)/1E6);
  }
Y
Yang Zhao 已提交
72
  if (!rows) {
wafwerar's avatar
wafwerar 已提交
73
    return 0;
Y
Yang Zhao 已提交
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
  }
  int num_fields = ws_field_count(wres);
  TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres);
  int precision = ws_result_precision(wres);

  int width[TSDB_MAX_COLUMNS];
  for (int col = 0; col < num_fields; col++) {
    width[col] = shellCalcColWidth(fields + col, precision);
  }

  shellPrintHeader(fields, width, num_fields);

  int numOfRows = 0;
  do {
    uint8_t ty;
    uint32_t len;
    for (int i = 0; i < rows; i++) {
      for (int j = 0; j < num_fields; j++) {
        putchar(' ');
        const void *value = ws_get_value_in_block(wres, i, j, &ty, &len);
        shellPrintField((const char*)value, fields+j, width[j], len, precision);
        putchar(' ');
        putchar('|');
      }
wafwerar's avatar
wafwerar 已提交
98
      putchar('\r');
Y
Yang Zhao 已提交
99 100 101 102 103 104 105 106
      putchar('\n');
    }
    numOfRows += rows;
    ws_fetch_block(wres, &data, &rows);
  } while (rows && !shell.stop_query);
  return numOfRows;
}

107
static int verticalPrintWebsocket(WS_RES* wres, double* pexecute_time) {
Y
Yang Zhao 已提交
108 109 110
  int rows = 0;
  const void* data = NULL;
  ws_fetch_block(wres, &data, &rows);
111 112 113
  if (wres) {
    *pexecute_time += (double)(ws_take_timing(wres)/1E6);
  }
Y
Yang Zhao 已提交
114
  if (!rows) {
wafwerar's avatar
wafwerar 已提交
115
    return 0;
Y
Yang Zhao 已提交
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
  }
  int num_fields = ws_field_count(wres);
  TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres);
  int precision = ws_result_precision(wres);

  int maxColNameLen = 0;
  for (int col = 0; col < num_fields; col++) {
    int len = (int)strlen(fields[col].name);
    if (len > maxColNameLen) {
      maxColNameLen = len;
    }
  }
  int numOfRows = 0;
  do {
    uint8_t ty;
    uint32_t len;
    for (int i = 0; i < rows; i++) {
      printf("*************************** %d.row ***************************\n",
wafwerar's avatar
wafwerar 已提交
134
        numOfRows + 1);
Y
Yang Zhao 已提交
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
      for (int j = 0; j < num_fields; j++) {
        TAOS_FIELD* field = fields + j;
        int padding = (int)(maxColNameLen - strlen(field->name));
        printf("%*.s%s: ", padding, " ", field->name);
        const void *value = ws_get_value_in_block(wres, i, j, &ty, &len);
        shellPrintField((const char*)value, field, 0, len, precision);
        putchar('\n');
      }
      numOfRows++;
    }
    ws_fetch_block(wres, &data, &rows);
  } while (rows && !shell.stop_query);
  return numOfRows;
}

150 151
static int dumpWebsocketToFile(const char* fname, WS_RES* wres,
                               double* pexecute_time) {
Y
Yang Zhao 已提交
152 153 154 155 156 157
  char fullname[PATH_MAX] = {0};
  if (taosExpandDir(fname, fullname, PATH_MAX) != 0) {
    tstrncpy(fullname, fname, PATH_MAX);
  }

  TdFilePtr pFile = taosOpenFile(fullname,
wafwerar's avatar
wafwerar 已提交
158
      TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
Y
Yang Zhao 已提交
159 160 161 162 163 164 165
  if (pFile == NULL) {
    fprintf(stderr, "failed to open file: %s\r\n", fullname);
    return -1;
  }
  int rows = 0;
  const void* data = NULL;
  ws_fetch_block(wres, &data, &rows);
166 167 168
  if (wres) {
    *pexecute_time += (double)(ws_take_timing(wres)/1E6);
  }
Y
Yang Zhao 已提交
169 170
  if (!rows) {
    taosCloseFile(&pFile);
wafwerar's avatar
wafwerar 已提交
171
    return 0;
Y
Yang Zhao 已提交
172 173 174 175 176 177 178 179 180 181 182
  }
  int numOfRows = 0;
  TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres);
  int num_fields = ws_field_count(wres);
  int precision = ws_result_precision(wres);
  for (int col = 0; col < num_fields; col++) {
    if (col > 0) {
      taosFprintfFile(pFile, ",");
    }
    taosFprintfFile(pFile, "%s", fields[col].name);
  }
183
  taosFprintfFile(pFile, "\r\n");
Y
Yang Zhao 已提交
184 185 186 187 188 189 190 191 192 193
  do {
    uint8_t ty;
    uint32_t len;
    numOfRows += rows;
    for (int i = 0; i < rows; i++) {
      for (int j = 0; j < num_fields; j++) {
        if (j > 0) {
          taosFprintfFile(pFile, ",");
        }
        const void *value = ws_get_value_in_block(wres, i, j, &ty, &len);
194 195
        shellDumpFieldToFile(pFile, (const char*)value,
                             fields + j, len, precision);
Y
Yang Zhao 已提交
196 197 198 199 200 201 202 203 204
      }
      taosFprintfFile(pFile, "\r\n");
    }
    ws_fetch_block(wres, &data, &rows);
  } while (rows && !shell.stop_query);
  taosCloseFile(&pFile);
  return numOfRows;
}

205 206 207
static int shellDumpWebsocket(WS_RES *wres, char *fname,
                              int *error_no, bool vertical,
                              double* pexecute_time) {
Y
Yang Zhao 已提交
208 209
  int numOfRows = 0;
  if (fname != NULL) {
210
    numOfRows = dumpWebsocketToFile(fname, wres, pexecute_time);
Y
Yang Zhao 已提交
211
  } else if (vertical) {
212
    numOfRows = verticalPrintWebsocket(wres, pexecute_time);
Y
Yang Zhao 已提交
213
  } else {
214
    numOfRows = horizontalPrintWebsocket(wres, pexecute_time);
Y
Yang Zhao 已提交
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
  }
  *error_no = ws_errno(wres);
  return numOfRows;
}

void shellRunSingleCommandWebsocketImp(char *command) {
  int64_t st, et;
  char   *sptr = NULL;
  char   *cptr = NULL;
  char   *fname = NULL;
  bool    printMode = false;

  if ((sptr = strstr(command, ">>")) != NULL) {
    cptr = strstr(command, ";");
    if (cptr != NULL) {
      *cptr = '\0';
    }

    fname = sptr + 2;
    while (*fname == ' ') fname++;
    *sptr = '\0';
  }

  if ((sptr = strstr(command, "\\G")) != NULL) {
    cptr = strstr(command, ";");
    if (cptr != NULL) {
      *cptr = '\0';
    }

    *sptr = '\0';
    printMode = true;  // When output to a file, the switch does not work.
  }

  shell.stop_query = false;
249
  WS_RES* res;
Y
Yang Zhao 已提交
250

251 252 253
  for (int reconnectNum = 0; reconnectNum < 2; reconnectNum++) {
    if (!shell.ws_conn && shell_conn_ws_server(0)) {
      return;
wafwerar's avatar
wafwerar 已提交
254
    }
255 256 257 258 259
    st = taosGetTimestampUs();

    res = ws_query_timeout(shell.ws_conn, command, shell.args.timeout);
    int code = ws_errno(res);
    if (code != 0 && !shell.stop_query) {
260 261
      // if it's not a ws connection error
      if (TSDB_CODE_WS_DSN_ERROR != (code&TSDB_CODE_WS_DSN_ERROR)) {
262
        et = taosGetTimestampUs();
263 264
        fprintf(stderr, "\nDB: error: %s (%.6fs)\n",
                ws_errstr(res), (et - st)/1E6);
265 266 267
        ws_free_result(res);
        return;
      }
268 269
      if (code == TSDB_CODE_WS_SEND_TIMEOUT
                || code == TSDB_CODE_WS_RECV_TIMEOUT) {
270
        fprintf(stderr, "Hint: use -t to increase the timeout in seconds\n");
271 272
      } else if (code == TSDB_CODE_WS_INTERNAL_ERRO
                    || code == TSDB_CODE_WS_CLOSED) {
273 274 275
        shell.ws_conn = NULL;
      }
      ws_free_result(res);
276 277 278
      if (reconnectNum == 0) {
        continue;
      } else {
sangshuduo's avatar
sangshuduo 已提交
279
        fprintf(stderr, "The server is disconnected, will try to reconnect\n");
280
      }
281 282 283
      return;
    }
    break;
Y
Yang Zhao 已提交
284 285
  }

286 287 288 289
  double execute_time = 0;
  if (res) {
    execute_time = ws_take_timing(res)/1E6;
  }
290

291 292
  if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$",
                      REG_EXTENDED | REG_ICASE)) {
Y
Yang Zhao 已提交
293 294
    fprintf(stdout, "Database changed.\r\n\r\n");
    fflush(stdout);
wafwerar's avatar
wafwerar 已提交
295
    ws_free_result(res);
Y
Yang Zhao 已提交
296 297 298 299 300
    return;
  }

  int numOfRows = 0;
  if (ws_is_update_query(res)) {
wafwerar's avatar
wafwerar 已提交
301 302
    numOfRows = ws_affected_rows(res);
    et = taosGetTimestampUs();
303 304
    double total_time = (et - st)/1E3;
    double net_time = total_time - (double)execute_time;
wafwerar's avatar
wafwerar 已提交
305
    printf("Query Ok, %d of %d row(s) in database\n", numOfRows, numOfRows);
306 307
    printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n",
           execute_time, net_time, total_time);
Y
Yang Zhao 已提交
308
  } else {
wafwerar's avatar
wafwerar 已提交
309
    int error_no = 0;
310 311
    numOfRows  = shellDumpWebsocket(res, fname, &error_no,
                                    printMode, &execute_time);
wafwerar's avatar
wafwerar 已提交
312 313 314 315 316 317 318 319 320
    if (numOfRows < 0) {
      ws_free_result(res);
      return;
    }
    et = taosGetTimestampUs();
    double total_time = (et - st) / 1E3;
    double net_time = total_time - execute_time;
    if (error_no == 0 && !shell.stop_query) {
      printf("Query OK, %d row(s) in set\n", numOfRows);
321 322
      printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n",
             execute_time, net_time, total_time);
wafwerar's avatar
wafwerar 已提交
323 324 325
    } else {
      printf("Query interrupted, %d row(s) in set (%.6fs)\n", numOfRows,
          (et - st)/1E6);
326 327
      printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n",
             execute_time, net_time, total_time);
wafwerar's avatar
wafwerar 已提交
328
    }
Y
Yang Zhao 已提交
329 330 331 332 333
  }
  printf("\n");
  ws_free_result(res);
}
#endif