shellWebsocket.c 10.9 KB
Newer Older
Y
Yang Zhao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16

/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifdef WEBSOCKET
17 18
#include <taosws.h>
#include <shellInt.h>
Y
Yang Zhao 已提交
19

20 21 22
// save current database name
char curDBName[128] = ""; // TDB_MAX_DBNAME_LEN is 24, put large

Y
Yang Zhao 已提交
23
int shell_conn_ws_server(bool first) {
24 25 26 27 28 29
  char cuttedDsn[SHELL_WS_DSN_BUFF] = {0};
  int dsnLen = strlen(shell.args.dsn);
  snprintf(cuttedDsn,
           ((dsnLen-SHELL_WS_DSN_MASK) > SHELL_WS_DSN_BUFF)?
            SHELL_WS_DSN_BUFF:(dsnLen-SHELL_WS_DSN_MASK),
           "%s", shell.args.dsn);
30
  fprintf(stdout, "trying to connect %s****** ", cuttedDsn);
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
  fflush(stdout);
  for (int i = 0; i < shell.args.timeout; i++) {
    shell.ws_conn = ws_connect_with_dsn(shell.args.dsn);
    if (NULL == shell.ws_conn) {
      int errNo = ws_errno(NULL);
      if (0xE001 == errNo) {
        fprintf(stdout, ".");
        fflush(stdout);
        taosMsleep(1000);  // sleep 1 second then try again
        continue;
      } else {
        fprintf(stderr, "\nfailed to connect %s***, reason: %s\n",
            cuttedDsn, ws_errstr(NULL));
        return -1;
      }
    } else {
      break;
    }
  }
  if (NULL == shell.ws_conn) {
    fprintf(stdout, "\n timeout\n");
    fprintf(stderr, "\nfailed to connect %s***, reason: %s\n",
      cuttedDsn, ws_errstr(NULL));
wafwerar's avatar
wafwerar 已提交
54
    return -1;
55 56
  } else {
    fprintf(stdout, "\n");
wafwerar's avatar
wafwerar 已提交
57 58
  }
  if (first && shell.args.restful) {
59
    fprintf(stdout, "successfully connected to %s\n\n",
wafwerar's avatar
wafwerar 已提交
60 61
        shell.args.dsn);
  } else if (first && shell.args.cloud) {
62
    fprintf(stdout, "successfully connected to cloud service\n");
wafwerar's avatar
wafwerar 已提交
63
  }
64
  fflush(stdout);
65 66 67 68 69 70 71 72

  // switch to current database if have
  if(curDBName[0] !=0) {
    char command[256];
    sprintf(command, "use %s;", curDBName);
    shellRunSingleCommandWebsocketImp(command);
  }

wafwerar's avatar
wafwerar 已提交
73
  return 0;
Y
Yang Zhao 已提交
74 75
}

76
static int horizontalPrintWebsocket(WS_RES* wres, double* execute_time) {
Y
Yang Zhao 已提交
77 78 79
  const void* data = NULL;
  int rows;
  ws_fetch_block(wres, &data, &rows);
80 81 82
  if (wres) {
    *execute_time += (double)(ws_take_timing(wres)/1E6);
  }
Y
Yang Zhao 已提交
83
  if (!rows) {
wafwerar's avatar
wafwerar 已提交
84
    return 0;
Y
Yang Zhao 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
  }
  int num_fields = ws_field_count(wres);
  TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres);
  int precision = ws_result_precision(wres);

  int width[TSDB_MAX_COLUMNS];
  for (int col = 0; col < num_fields; col++) {
    width[col] = shellCalcColWidth(fields + col, precision);
  }

  shellPrintHeader(fields, width, num_fields);

  int numOfRows = 0;
  do {
    uint8_t ty;
    uint32_t len;
    for (int i = 0; i < rows; i++) {
      for (int j = 0; j < num_fields; j++) {
        putchar(' ');
        const void *value = ws_get_value_in_block(wres, i, j, &ty, &len);
        shellPrintField((const char*)value, fields+j, width[j], len, precision);
        putchar(' ');
        putchar('|');
      }
wafwerar's avatar
wafwerar 已提交
109
      putchar('\r');
Y
Yang Zhao 已提交
110 111 112 113 114 115 116 117
      putchar('\n');
    }
    numOfRows += rows;
    ws_fetch_block(wres, &data, &rows);
  } while (rows && !shell.stop_query);
  return numOfRows;
}

118
static int verticalPrintWebsocket(WS_RES* wres, double* pexecute_time) {
Y
Yang Zhao 已提交
119 120 121
  int rows = 0;
  const void* data = NULL;
  ws_fetch_block(wres, &data, &rows);
122 123 124
  if (wres) {
    *pexecute_time += (double)(ws_take_timing(wres)/1E6);
  }
Y
Yang Zhao 已提交
125
  if (!rows) {
wafwerar's avatar
wafwerar 已提交
126
    return 0;
Y
Yang Zhao 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
  }
  int num_fields = ws_field_count(wres);
  TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres);
  int precision = ws_result_precision(wres);

  int maxColNameLen = 0;
  for (int col = 0; col < num_fields; col++) {
    int len = (int)strlen(fields[col].name);
    if (len > maxColNameLen) {
      maxColNameLen = len;
    }
  }
  int numOfRows = 0;
  do {
    uint8_t ty;
    uint32_t len;
    for (int i = 0; i < rows; i++) {
      printf("*************************** %d.row ***************************\n",
wafwerar's avatar
wafwerar 已提交
145
        numOfRows + 1);
Y
Yang Zhao 已提交
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
      for (int j = 0; j < num_fields; j++) {
        TAOS_FIELD* field = fields + j;
        int padding = (int)(maxColNameLen - strlen(field->name));
        printf("%*.s%s: ", padding, " ", field->name);
        const void *value = ws_get_value_in_block(wres, i, j, &ty, &len);
        shellPrintField((const char*)value, field, 0, len, precision);
        putchar('\n');
      }
      numOfRows++;
    }
    ws_fetch_block(wres, &data, &rows);
  } while (rows && !shell.stop_query);
  return numOfRows;
}

161 162
static int dumpWebsocketToFile(const char* fname, WS_RES* wres,
                               double* pexecute_time) {
Y
Yang Zhao 已提交
163 164 165 166 167 168
  char fullname[PATH_MAX] = {0};
  if (taosExpandDir(fname, fullname, PATH_MAX) != 0) {
    tstrncpy(fullname, fname, PATH_MAX);
  }

  TdFilePtr pFile = taosOpenFile(fullname,
wafwerar's avatar
wafwerar 已提交
169
      TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
Y
Yang Zhao 已提交
170 171 172 173 174 175 176
  if (pFile == NULL) {
    fprintf(stderr, "failed to open file: %s\r\n", fullname);
    return -1;
  }
  int rows = 0;
  const void* data = NULL;
  ws_fetch_block(wres, &data, &rows);
177 178 179
  if (wres) {
    *pexecute_time += (double)(ws_take_timing(wres)/1E6);
  }
Y
Yang Zhao 已提交
180 181
  if (!rows) {
    taosCloseFile(&pFile);
wafwerar's avatar
wafwerar 已提交
182
    return 0;
Y
Yang Zhao 已提交
183 184 185 186 187 188 189 190 191 192 193
  }
  int numOfRows = 0;
  TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres);
  int num_fields = ws_field_count(wres);
  int precision = ws_result_precision(wres);
  for (int col = 0; col < num_fields; col++) {
    if (col > 0) {
      taosFprintfFile(pFile, ",");
    }
    taosFprintfFile(pFile, "%s", fields[col].name);
  }
194
  taosFprintfFile(pFile, "\r\n");
Y
Yang Zhao 已提交
195 196 197 198 199 200 201 202 203 204
  do {
    uint8_t ty;
    uint32_t len;
    numOfRows += rows;
    for (int i = 0; i < rows; i++) {
      for (int j = 0; j < num_fields; j++) {
        if (j > 0) {
          taosFprintfFile(pFile, ",");
        }
        const void *value = ws_get_value_in_block(wres, i, j, &ty, &len);
205 206
        shellDumpFieldToFile(pFile, (const char*)value,
                             fields + j, len, precision);
Y
Yang Zhao 已提交
207 208 209 210 211 212 213 214 215
      }
      taosFprintfFile(pFile, "\r\n");
    }
    ws_fetch_block(wres, &data, &rows);
  } while (rows && !shell.stop_query);
  taosCloseFile(&pFile);
  return numOfRows;
}

216 217 218
static int shellDumpWebsocket(WS_RES *wres, char *fname,
                              int *error_no, bool vertical,
                              double* pexecute_time) {
Y
Yang Zhao 已提交
219 220
  int numOfRows = 0;
  if (fname != NULL) {
221
    numOfRows = dumpWebsocketToFile(fname, wres, pexecute_time);
Y
Yang Zhao 已提交
222
  } else if (vertical) {
223
    numOfRows = verticalPrintWebsocket(wres, pexecute_time);
Y
Yang Zhao 已提交
224
  } else {
225
    numOfRows = horizontalPrintWebsocket(wres, pexecute_time);
Y
Yang Zhao 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
  }
  *error_no = ws_errno(wres);
  return numOfRows;
}

void shellRunSingleCommandWebsocketImp(char *command) {
  int64_t st, et;
  char   *sptr = NULL;
  char   *cptr = NULL;
  char   *fname = NULL;
  bool    printMode = false;

  if ((sptr = strstr(command, ">>")) != NULL) {
    cptr = strstr(command, ";");
    if (cptr != NULL) {
      *cptr = '\0';
    }

    fname = sptr + 2;
    while (*fname == ' ') fname++;
    *sptr = '\0';
  }

  if ((sptr = strstr(command, "\\G")) != NULL) {
    cptr = strstr(command, ";");
    if (cptr != NULL) {
      *cptr = '\0';
    }

    *sptr = '\0';
    printMode = true;  // When output to a file, the switch does not work.
  }

  shell.stop_query = false;
260
  WS_RES* res;
Y
Yang Zhao 已提交
261

262 263 264
  for (int reconnectNum = 0; reconnectNum < 2; reconnectNum++) {
    if (!shell.ws_conn && shell_conn_ws_server(0)) {
      return;
wafwerar's avatar
wafwerar 已提交
265
    }
266 267 268 269 270
    st = taosGetTimestampUs();

    res = ws_query_timeout(shell.ws_conn, command, shell.args.timeout);
    int code = ws_errno(res);
    if (code != 0 && !shell.stop_query) {
271 272
      // if it's not a ws connection error
      if (TSDB_CODE_WS_DSN_ERROR != (code&TSDB_CODE_WS_DSN_ERROR)) {
273
        et = taosGetTimestampUs();
274 275
        fprintf(stderr, "\nDB: error: %s (%.6fs)\n",
                ws_errstr(res), (et - st)/1E6);
276 277 278
        ws_free_result(res);
        return;
      }
279 280
      if (code == TSDB_CODE_WS_SEND_TIMEOUT
                || code == TSDB_CODE_WS_RECV_TIMEOUT) {
A
Alex Duan 已提交
281
        fprintf(stderr, "Hint: use -T to increase the timeout in seconds\n");
282 283
      } else if (code == TSDB_CODE_WS_INTERNAL_ERRO
                    || code == TSDB_CODE_WS_CLOSED) {
284 285 286
        shell.ws_conn = NULL;
      }
      ws_free_result(res);
287 288 289
      if (reconnectNum == 0) {
        continue;
      } else {
sangshuduo's avatar
sangshuduo 已提交
290
        fprintf(stderr, "The server is disconnected, will try to reconnect\n");
291
      }
292 293 294
      return;
    }
    break;
Y
Yang Zhao 已提交
295 296
  }

297 298 299 300
  double execute_time = 0;
  if (res) {
    execute_time = ws_take_timing(res)/1E6;
  }
301

302 303
  if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$",
                      REG_EXTENDED | REG_ICASE)) {
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343

    // copy dbname to curDBName
    char *p         = command;
    bool firstStart = false;
    bool firstEnd   = false;
    int  i          = 0;
    while (*p != 0) {
      if (*p != ' ') {
        // not blank
        if (!firstStart) {
          firstStart = true;
        } else if (firstEnd) {
          if(*p == ';' && *p != '\\') {
            break;
          }
          // database name
          curDBName[i++] = *p;
          if(i + 4 > sizeof(curDBName)) {
            // DBName is too long, reset zero and break
            i = 0;
            break;
          }
        }
      } else {
        // blank
        if(firstStart == true && firstEnd == false){
          firstEnd = true;
        }
        if(firstStart && firstEnd && i > 0){
          // blank after database name
          break;
        }
      }
      // move next
      p++;
    }
    // append end
    curDBName[i] = 0;

    fprintf(stdout, "Database changed to %s.\r\n\r\n", curDBName);
Y
Yang Zhao 已提交
344
    fflush(stdout);
wafwerar's avatar
wafwerar 已提交
345
    ws_free_result(res);
Y
Yang Zhao 已提交
346 347 348 349 350
    return;
  }

  int numOfRows = 0;
  if (ws_is_update_query(res)) {
wafwerar's avatar
wafwerar 已提交
351 352
    numOfRows = ws_affected_rows(res);
    et = taosGetTimestampUs();
353 354
    double total_time = (et - st)/1E3;
    double net_time = total_time - (double)execute_time;
wafwerar's avatar
wafwerar 已提交
355
    printf("Query Ok, %d of %d row(s) in database\n", numOfRows, numOfRows);
356 357
    printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n",
           execute_time, net_time, total_time);
Y
Yang Zhao 已提交
358
  } else {
wafwerar's avatar
wafwerar 已提交
359
    int error_no = 0;
360 361
    numOfRows  = shellDumpWebsocket(res, fname, &error_no,
                                    printMode, &execute_time);
wafwerar's avatar
wafwerar 已提交
362 363 364 365 366 367 368 369 370
    if (numOfRows < 0) {
      ws_free_result(res);
      return;
    }
    et = taosGetTimestampUs();
    double total_time = (et - st) / 1E3;
    double net_time = total_time - execute_time;
    if (error_no == 0 && !shell.stop_query) {
      printf("Query OK, %d row(s) in set\n", numOfRows);
371 372
      printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n",
             execute_time, net_time, total_time);
wafwerar's avatar
wafwerar 已提交
373 374 375 376
    } else {
      printf("Query interrupted, %d row(s) in set (%.6fs)\n", numOfRows,
          (et - st)/1E6);
    }
Y
Yang Zhao 已提交
377 378 379 380 381
  }
  printf("\n");
  ws_free_result(res);
}
#endif