shellCheck.c 5.7 KB
Newer Older
S
TD-3309  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _GNU_SOURCE
#define _XOPEN_SOURCE
#define _DEFAULT_SOURCE

#include "os.h"
#include "shell.h"
#include "shellCommand.h"
#include "tglobal.h"
#include "tutil.h"

#define SHELL_SQL_LEN 1024
static int32_t tbNum = 0;
static int32_t tbMallocNum = 0;
static char ** tbNames = NULL;
static int32_t checkedNum = 0;
static int32_t errorNum = 0;

typedef struct {
wafwerar's avatar
wafwerar 已提交
34
  TdThread threadID;
S
TD-3309  
Shengliang Guan 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
  int       threadIndex;
  int       totalThreads;
  void *    taos;
  char *    db;
} ShellThreadObj;

static int32_t shellUseDb(TAOS *con, char *db) {
  if (db == NULL) {
    fprintf(stdout, "no dbname input\n");
    return -1;
  }

  char sql[SHELL_SQL_LEN] = {0};
  snprintf(sql, SHELL_SQL_LEN, "use %s", db);

  TAOS_RES *pSql = taos_query(con, sql);
  int32_t   code = taos_errno(pSql);
  if (code != 0) {
    fprintf(stdout, "failed to execute sql:%s since %s", sql, taos_errstr(pSql));
  }

  taos_free_result(pSql);
  return code;
}

static int32_t shellShowTables(TAOS *con, char *db) {
  char sql[SHELL_SQL_LEN] = {0};
  snprintf(sql, SHELL_SQL_LEN, "show %s.tables", db);

  TAOS_RES *pSql = taos_query(con, sql);
  int32_t   code = taos_errno(pSql);

  if (code != 0) {
    fprintf(stdout, "failed to execute sql:%s since %s\n", sql, taos_errstr(pSql));
  } else {
    TAOS_ROW row;
    while ((row = taos_fetch_row(pSql))) {
      int32_t tbIndex = tbNum++;
      if (tbMallocNum < tbNum) {
        tbMallocNum = (tbMallocNum * 2 + 1);
wafwerar's avatar
wafwerar 已提交
75
        char** tbNames1 = taosMemoryRealloc(tbNames, tbMallocNum * sizeof(char *));
T
tickduan 已提交
76
        if (tbNames1 == NULL) {
S
TD-3309  
Shengliang Guan 已提交
77 78 79 80
          fprintf(stdout, "failed to malloc tablenames, num:%d\n", tbMallocNum);
          code = TSDB_CODE_TSC_OUT_OF_MEMORY;
          break;
        }
T
tickduan 已提交
81
        tbNames = tbNames1;
S
TD-3309  
Shengliang Guan 已提交
82 83
      }

wafwerar's avatar
wafwerar 已提交
84
      tbNames[tbIndex] = taosMemoryMalloc(TSDB_TABLE_NAME_LEN);
S
TD-3309  
Shengliang Guan 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
      strncpy(tbNames[tbIndex], (const char *)row[0], TSDB_TABLE_NAME_LEN);
      if (tbIndex % 100000 == 0 && tbIndex != 0) {
        fprintf(stdout, "%d tablenames fetched\n", tbIndex);
      }
    }
  }

  taos_free_result(pSql);

  fprintf(stdout, "total %d tablenames fetched, over\n", tbNum);
  return code;
}

static void shellFreeTbnames() {
  for (int32_t i = 0; i < tbNum; ++i) {
wafwerar's avatar
wafwerar 已提交
100
    taosMemoryFree(tbNames[i]);
S
TD-3309  
Shengliang Guan 已提交
101
  }
wafwerar's avatar
wafwerar 已提交
102
  taosMemoryFree(tbNames);
S
TD-3309  
Shengliang Guan 已提交
103 104 105 106 107
}

static void *shellCheckThreadFp(void *arg) {
  ShellThreadObj *pThread = (ShellThreadObj *)arg;

108 109
  setThreadName("shellCheckThrd");

S
TD-3309  
Shengliang Guan 已提交
110 111 112 113 114 115 116 117 118
  int32_t interval = tbNum / pThread->totalThreads + 1;
  int32_t start = pThread->threadIndex * interval;
  int32_t end = (pThread->threadIndex + 1) * interval;

  if (end > tbNum) end = tbNum + 1;

  char file[32] = {0};
  snprintf(file, 32, "tb%d.txt", pThread->threadIndex);

119
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
S
TD-3309  
Shengliang Guan 已提交
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
  if (!fp) {
    fprintf(stdout, "failed to open %s, reason:%s", file, strerror(errno));
    return NULL;
  }

  char sql[SHELL_SQL_LEN];
  for (int32_t t = start; t < end; ++t) {
    char *tbname = tbNames[t];
    if (tbname == NULL) break;

    snprintf(sql, SHELL_SQL_LEN, "select * from %s limit 1", tbname);

    TAOS_RES *pSql = taos_query(pThread->taos, sql);
    int32_t   code = taos_errno(pSql);
    if (code != 0) {
      int32_t len = snprintf(sql, SHELL_SQL_LEN, "drop table %s.%s;\n", pThread->db, tbname);
136
      taosWriteFile(pFile, sql, len);
S
TD-3309  
Shengliang Guan 已提交
137 138 139 140 141 142 143 144 145 146 147
      atomic_add_fetch_32(&errorNum, 1);
    }

    int32_t cnum = atomic_add_fetch_32(&checkedNum, 1);
    if (cnum % 5000 == 0 && cnum != 0) {
      fprintf(stdout, "%d tables checked\n", cnum);
    }

    taos_free_result(pSql);
  }

148 149
  taosFsync(pFile);
  taosCloseFile(&pFile);
S
TD-3309  
Shengliang Guan 已提交
150 151 152 153

  return NULL;
}

154
static void shellRunCheckThreads(TAOS *con, SShellArguments *_args) {
wafwerar's avatar
wafwerar 已提交
155
  TdThreadAttr  thattr;
wafwerar's avatar
wafwerar 已提交
156
  ShellThreadObj *threadObj = (ShellThreadObj *)taosMemoryCalloc(_args->threadNum, sizeof(ShellThreadObj));
157
  for (int t = 0; t < _args->threadNum; ++t) {
S
TD-3309  
Shengliang Guan 已提交
158 159
    ShellThreadObj *pThread = threadObj + t;
    pThread->threadIndex = t;
160
    pThread->totalThreads = _args->threadNum;
S
TD-3309  
Shengliang Guan 已提交
161
    pThread->taos = con;
162
    pThread->db = _args->database;
S
TD-3309  
Shengliang Guan 已提交
163

wafwerar's avatar
wafwerar 已提交
164 165
    taosThreadAttrInit(&thattr);
    taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
S
TD-3309  
Shengliang Guan 已提交
166

wafwerar's avatar
wafwerar 已提交
167
    if (taosThreadCreate(&(pThread->threadID), &thattr, shellCheckThreadFp, (void *)pThread) != 0) {
S
TD-3309  
Shengliang Guan 已提交
168 169 170 171 172
      fprintf(stderr, "ERROR: thread:%d failed to start\n", pThread->threadIndex);
      exit(0);
    }
  }

173
  for (int t = 0; t < _args->threadNum; ++t) {
wafwerar's avatar
wafwerar 已提交
174
    taosThreadJoin(threadObj[t].threadID, NULL);
S
TD-3309  
Shengliang Guan 已提交
175 176
  }

177
  for (int t = 0; t < _args->threadNum; ++t) {
S
TD-3309  
Shengliang Guan 已提交
178 179
    taos_close(threadObj[t].taos);
  }
wafwerar's avatar
wafwerar 已提交
180
  taosMemoryFree(threadObj);
S
TD-3309  
Shengliang Guan 已提交
181 182
}

183
void shellCheck(TAOS *con, SShellArguments *_args) {
S
TD-3309  
Shengliang Guan 已提交
184 185
  int64_t start = taosGetTimestampMs();

186
  if (shellUseDb(con, _args->database) != 0) {
S
TD-3309  
Shengliang Guan 已提交
187 188 189 190
    shellFreeTbnames();
    return;
  }

191
  if (shellShowTables(con, _args->database) != 0) {
S
TD-3309  
Shengliang Guan 已提交
192 193 194 195
    shellFreeTbnames();
    return;
  }

196 197
  fprintf(stdout, "total %d tables will be checked by %d threads\n", tbNum, _args->threadNum);
  shellRunCheckThreads(con, _args);
S
TD-3309  
Shengliang Guan 已提交
198 199 200 201 202

  int64_t end = taosGetTimestampMs();
  fprintf(stdout, "total %d tables checked, failed:%d, time spent %.2f seconds\n", checkedNum, errorNum,
          (end - start) / 1000.0);
}