shellCheck.c 5.6 KB
Newer Older
S
TD-3309  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _GNU_SOURCE
#define _XOPEN_SOURCE
#define _DEFAULT_SOURCE

#include "os.h"
#include "shell.h"
#include "shellCommand.h"
#include "tglobal.h"
#include "tutil.h"

#define SHELL_SQL_LEN 1024
static int32_t tbNum = 0;
static int32_t tbMallocNum = 0;
static char ** tbNames = NULL;
static int32_t checkedNum = 0;
static int32_t errorNum = 0;

typedef struct {
  pthread_t threadID;
  int       threadIndex;
  int       totalThreads;
  void *    taos;
  char *    db;
} ShellThreadObj;

static int32_t shellUseDb(TAOS *con, char *db) {
  if (db == NULL) {
    fprintf(stdout, "no dbname input\n");
    return -1;
  }

  char sql[SHELL_SQL_LEN] = {0};
  snprintf(sql, SHELL_SQL_LEN, "use %s", db);

  TAOS_RES *pSql = taos_query(con, sql);
  int32_t   code = taos_errno(pSql);
  if (code != 0) {
    fprintf(stdout, "failed to execute sql:%s since %s", sql, taos_errstr(pSql));
  }

  taos_free_result(pSql);
  return code;
}

static int32_t shellShowTables(TAOS *con, char *db) {
  char sql[SHELL_SQL_LEN] = {0};
  snprintf(sql, SHELL_SQL_LEN, "show %s.tables", db);

  TAOS_RES *pSql = taos_query(con, sql);
  int32_t   code = taos_errno(pSql);

  if (code != 0) {
    fprintf(stdout, "failed to execute sql:%s since %s\n", sql, taos_errstr(pSql));
  } else {
    TAOS_ROW row;
    while ((row = taos_fetch_row(pSql))) {
      int32_t tbIndex = tbNum++;
      if (tbMallocNum < tbNum) {
        tbMallocNum = (tbMallocNum * 2 + 1);
T
tickduan 已提交
75 76
        char** tbNames1 = realloc(tbNames, tbMallocNum * sizeof(char *));
        if (tbNames1 == NULL) {
S
TD-3309  
Shengliang Guan 已提交
77 78 79 80
          fprintf(stdout, "failed to malloc tablenames, num:%d\n", tbMallocNum);
          code = TSDB_CODE_TSC_OUT_OF_MEMORY;
          break;
        }
T
tickduan 已提交
81
        tbNames = tbNames1;
S
TD-3309  
Shengliang Guan 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
      }

      tbNames[tbIndex] = malloc(TSDB_TABLE_NAME_LEN);
      strncpy(tbNames[tbIndex], (const char *)row[0], TSDB_TABLE_NAME_LEN);
      if (tbIndex % 100000 == 0 && tbIndex != 0) {
        fprintf(stdout, "%d tablenames fetched\n", tbIndex);
      }
    }
  }

  taos_free_result(pSql);

  fprintf(stdout, "total %d tablenames fetched, over\n", tbNum);
  return code;
}

static void shellFreeTbnames() {
  for (int32_t i = 0; i < tbNum; ++i) {
    free(tbNames[i]);
  }
  free(tbNames);
}

static void *shellCheckThreadFp(void *arg) {
  ShellThreadObj *pThread = (ShellThreadObj *)arg;

108 109
  setThreadName("shellCheckThrd");

S
TD-3309  
Shengliang Guan 已提交
110 111 112 113 114 115 116 117 118
  int32_t interval = tbNum / pThread->totalThreads + 1;
  int32_t start = pThread->threadIndex * interval;
  int32_t end = (pThread->threadIndex + 1) * interval;

  if (end > tbNum) end = tbNum + 1;

  char file[32] = {0};
  snprintf(file, 32, "tb%d.txt", pThread->threadIndex);

119
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
S
TD-3309  
Shengliang Guan 已提交
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
  if (!fp) {
    fprintf(stdout, "failed to open %s, reason:%s", file, strerror(errno));
    return NULL;
  }

  char sql[SHELL_SQL_LEN];
  for (int32_t t = start; t < end; ++t) {
    char *tbname = tbNames[t];
    if (tbname == NULL) break;

    snprintf(sql, SHELL_SQL_LEN, "select * from %s limit 1", tbname);

    TAOS_RES *pSql = taos_query(pThread->taos, sql);
    int32_t   code = taos_errno(pSql);
    if (code != 0) {
      int32_t len = snprintf(sql, SHELL_SQL_LEN, "drop table %s.%s;\n", pThread->db, tbname);
136
      taosWriteFile(pFile, sql, len);
S
TD-3309  
Shengliang Guan 已提交
137 138 139 140 141 142 143 144 145 146 147
      atomic_add_fetch_32(&errorNum, 1);
    }

    int32_t cnum = atomic_add_fetch_32(&checkedNum, 1);
    if (cnum % 5000 == 0 && cnum != 0) {
      fprintf(stdout, "%d tables checked\n", cnum);
    }

    taos_free_result(pSql);
  }

148 149
  taosFsync(pFile);
  taosCloseFile(&pFile);
S
TD-3309  
Shengliang Guan 已提交
150 151 152 153

  return NULL;
}

154
static void shellRunCheckThreads(TAOS *con, SShellArguments *_args) {
S
TD-3309  
Shengliang Guan 已提交
155
  pthread_attr_t  thattr;
156 157
  ShellThreadObj *threadObj = (ShellThreadObj *)calloc(_args->threadNum, sizeof(ShellThreadObj));
  for (int t = 0; t < _args->threadNum; ++t) {
S
TD-3309  
Shengliang Guan 已提交
158 159
    ShellThreadObj *pThread = threadObj + t;
    pThread->threadIndex = t;
160
    pThread->totalThreads = _args->threadNum;
S
TD-3309  
Shengliang Guan 已提交
161
    pThread->taos = con;
162
    pThread->db = _args->database;
S
TD-3309  
Shengliang Guan 已提交
163 164 165 166 167 168 169 170 171 172

    pthread_attr_init(&thattr);
    pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

    if (pthread_create(&(pThread->threadID), &thattr, shellCheckThreadFp, (void *)pThread) != 0) {
      fprintf(stderr, "ERROR: thread:%d failed to start\n", pThread->threadIndex);
      exit(0);
    }
  }

173
  for (int t = 0; t < _args->threadNum; ++t) {
S
TD-3309  
Shengliang Guan 已提交
174 175 176
    pthread_join(threadObj[t].threadID, NULL);
  }

177
  for (int t = 0; t < _args->threadNum; ++t) {
S
TD-3309  
Shengliang Guan 已提交
178 179 180 181 182
    taos_close(threadObj[t].taos);
  }
  free(threadObj);
}

183
void shellCheck(TAOS *con, SShellArguments *_args) {
S
TD-3309  
Shengliang Guan 已提交
184 185
  int64_t start = taosGetTimestampMs();

186
  if (shellUseDb(con, _args->database) != 0) {
S
TD-3309  
Shengliang Guan 已提交
187 188 189 190
    shellFreeTbnames();
    return;
  }

191
  if (shellShowTables(con, _args->database) != 0) {
S
TD-3309  
Shengliang Guan 已提交
192 193 194 195
    shellFreeTbnames();
    return;
  }

196 197
  fprintf(stdout, "total %d tables will be checked by %d threads\n", tbNum, _args->threadNum);
  shellRunCheckThreads(con, _args);
S
TD-3309  
Shengliang Guan 已提交
198 199 200 201 202

  int64_t end = taosGetTimestampMs();
  fprintf(stdout, "total %d tables checked, failed:%d, time spent %.2f seconds\n", checkedNum, errorNum,
          (end - start) / 1000.0);
}