shellCheck.c 5.5 KB
Newer Older
S
TD-3309  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _GNU_SOURCE
#define _XOPEN_SOURCE
#define _DEFAULT_SOURCE

#include "os.h"
#include "shell.h"
#include "shellCommand.h"
#include "tglobal.h"
#include "tutil.h"

#define SHELL_SQL_LEN 1024
static int32_t tbNum = 0;
static int32_t tbMallocNum = 0;
static char ** tbNames = NULL;
static int32_t checkedNum = 0;
static int32_t errorNum = 0;

typedef struct {
  pthread_t threadID;
  int       threadIndex;
  int       totalThreads;
  void *    taos;
  char *    db;
} ShellThreadObj;

static int32_t shellUseDb(TAOS *con, char *db) {
  if (db == NULL) {
    fprintf(stdout, "no dbname input\n");
    return -1;
  }

  char sql[SHELL_SQL_LEN] = {0};
  snprintf(sql, SHELL_SQL_LEN, "use %s", db);

  TAOS_RES *pSql = taos_query(con, sql);
  int32_t   code = taos_errno(pSql);
  if (code != 0) {
    fprintf(stdout, "failed to execute sql:%s since %s", sql, taos_errstr(pSql));
  }

  taos_free_result(pSql);
  return code;
}

static int32_t shellShowTables(TAOS *con, char *db) {
  char sql[SHELL_SQL_LEN] = {0};
  snprintf(sql, SHELL_SQL_LEN, "show %s.tables", db);

  TAOS_RES *pSql = taos_query(con, sql);
  int32_t   code = taos_errno(pSql);

  if (code != 0) {
    fprintf(stdout, "failed to execute sql:%s since %s\n", sql, taos_errstr(pSql));
  } else {
    TAOS_ROW row;
    while ((row = taos_fetch_row(pSql))) {
      int32_t tbIndex = tbNum++;
      if (tbMallocNum < tbNum) {
        tbMallocNum = (tbMallocNum * 2 + 1);
T
tickduan 已提交
75 76
        char** tbNames1 = realloc(tbNames, tbMallocNum * sizeof(char *));
        if (tbNames1 == NULL) {
S
TD-3309  
Shengliang Guan 已提交
77 78 79 80
          fprintf(stdout, "failed to malloc tablenames, num:%d\n", tbMallocNum);
          code = TSDB_CODE_TSC_OUT_OF_MEMORY;
          break;
        }
T
tickduan 已提交
81
        tbNames = tbNames1;
S
TD-3309  
Shengliang Guan 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
      }

      tbNames[tbIndex] = malloc(TSDB_TABLE_NAME_LEN);
      strncpy(tbNames[tbIndex], (const char *)row[0], TSDB_TABLE_NAME_LEN);
      if (tbIndex % 100000 == 0 && tbIndex != 0) {
        fprintf(stdout, "%d tablenames fetched\n", tbIndex);
      }
    }
  }

  taos_free_result(pSql);

  fprintf(stdout, "total %d tablenames fetched, over\n", tbNum);
  return code;
}

static void shellFreeTbnames() {
  for (int32_t i = 0; i < tbNum; ++i) {
    free(tbNames[i]);
  }
  free(tbNames);
}

static void *shellCheckThreadFp(void *arg) {
  ShellThreadObj *pThread = (ShellThreadObj *)arg;

108 109
  setThreadName("shellCheckThrd");

S
TD-3309  
Shengliang Guan 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
  int32_t interval = tbNum / pThread->totalThreads + 1;
  int32_t start = pThread->threadIndex * interval;
  int32_t end = (pThread->threadIndex + 1) * interval;

  if (end > tbNum) end = tbNum + 1;

  char file[32] = {0};
  snprintf(file, 32, "tb%d.txt", pThread->threadIndex);

  FILE *fp = fopen(file, "w");
  if (!fp) {
    fprintf(stdout, "failed to open %s, reason:%s", file, strerror(errno));
    return NULL;
  }

  char sql[SHELL_SQL_LEN];
  for (int32_t t = start; t < end; ++t) {
    char *tbname = tbNames[t];
    if (tbname == NULL) break;

    snprintf(sql, SHELL_SQL_LEN, "select * from %s limit 1", tbname);

    TAOS_RES *pSql = taos_query(pThread->taos, sql);
    int32_t   code = taos_errno(pSql);
    if (code != 0) {
      int32_t len = snprintf(sql, SHELL_SQL_LEN, "drop table %s.%s;\n", pThread->db, tbname);
      fwrite(sql, 1, len, fp);
      atomic_add_fetch_32(&errorNum, 1);
    }

    int32_t cnum = atomic_add_fetch_32(&checkedNum, 1);
    if (cnum % 5000 == 0 && cnum != 0) {
      fprintf(stdout, "%d tables checked\n", cnum);
    }

    taos_free_result(pSql);
  }

S
TD-4088  
Shengliang Guan 已提交
148
  taosFsync(fileno(fp));
S
TD-3309  
Shengliang Guan 已提交
149 150 151 152 153
  fclose(fp);

  return NULL;
}

154
static void shellRunCheckThreads(TAOS *con, SShellArguments *_args) {
S
TD-3309  
Shengliang Guan 已提交
155
  pthread_attr_t  thattr;
156 157
  ShellThreadObj *threadObj = (ShellThreadObj *)calloc(_args->threadNum, sizeof(ShellThreadObj));
  for (int t = 0; t < _args->threadNum; ++t) {
S
TD-3309  
Shengliang Guan 已提交
158 159
    ShellThreadObj *pThread = threadObj + t;
    pThread->threadIndex = t;
160
    pThread->totalThreads = _args->threadNum;
S
TD-3309  
Shengliang Guan 已提交
161
    pThread->taos = con;
162
    pThread->db = _args->database;
S
TD-3309  
Shengliang Guan 已提交
163 164 165 166 167 168 169 170 171 172

    pthread_attr_init(&thattr);
    pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

    if (pthread_create(&(pThread->threadID), &thattr, shellCheckThreadFp, (void *)pThread) != 0) {
      fprintf(stderr, "ERROR: thread:%d failed to start\n", pThread->threadIndex);
      exit(0);
    }
  }

173
  for (int t = 0; t < _args->threadNum; ++t) {
S
TD-3309  
Shengliang Guan 已提交
174 175 176
    pthread_join(threadObj[t].threadID, NULL);
  }

177
  for (int t = 0; t < _args->threadNum; ++t) {
S
TD-3309  
Shengliang Guan 已提交
178 179 180 181 182
    taos_close(threadObj[t].taos);
  }
  free(threadObj);
}

183
void shellCheck(TAOS *con, SShellArguments *_args) {
S
TD-3309  
Shengliang Guan 已提交
184 185
  int64_t start = taosGetTimestampMs();

186
  if (shellUseDb(con, _args->database) != 0) {
S
TD-3309  
Shengliang Guan 已提交
187 188 189 190
    shellFreeTbnames();
    return;
  }

191
  if (shellShowTables(con, _args->database) != 0) {
S
TD-3309  
Shengliang Guan 已提交
192 193 194 195
    shellFreeTbnames();
    return;
  }

196 197
  fprintf(stdout, "total %d tables will be checked by %d threads\n", tbNum, _args->threadNum);
  shellRunCheckThreads(con, _args);
S
TD-3309  
Shengliang Guan 已提交
198 199 200 201 202

  int64_t end = taosGetTimestampMs();
  fprintf(stdout, "total %d tables checked, failed:%d, time spent %.2f seconds\n", checkedNum, errorNum,
          (end - start) / 1000.0);
}