qwDbg.c 7.1 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6
#include "dataSinkMgt.h"
#include "executor.h"
#include "planner.h"
#include "query.h"
#include "qwInt.h"
#include "qwMsg.h"
H
Hongze Cheng 已提交
7
#include "qworker.h"
D
dapan1121 已提交
8 9 10 11
#include "tcommon.h"
#include "tmsg.h"
#include "tname.h"

H
Hongze Cheng 已提交
12 13 14 15 16
SQWDebug gQWDebug = {.statusEnable = true,
                     .dumpEnable = false,
                     .redirectSimulate = false,
                     .deadSimulate = false,
                     .sleepSimulate = false};
D
dapan1121 已提交
17 18 19 20 21 22 23 24 25

int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
  if (!gQWDebug.statusEnable) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = 0;

  if (oriStatus == newStatus) {
D
dapan1121 已提交
26
    if (newStatus == JOB_TASK_STATUS_EXEC || newStatus == JOB_TASK_STATUS_FAIL) {
D
dapan1121 已提交
27 28 29 30 31 32 33 34 35
      *ignore = true;
      return TSDB_CODE_SUCCESS;
    }

    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

  switch (oriStatus) {
    case JOB_TASK_STATUS_NULL:
H
Hongze Cheng 已提交
36
      if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_INIT) {
D
dapan1121 已提交
37 38 39 40
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
41
    case JOB_TASK_STATUS_INIT:
H
Hongze Cheng 已提交
42
      if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_FAIL) {
D
dapan1121 已提交
43 44 45 46
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
47 48 49
    case JOB_TASK_STATUS_EXEC:
      if (newStatus != JOB_TASK_STATUS_PART_SUCC && newStatus != JOB_TASK_STATUS_SUCC &&
          newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_DROP) {
D
dapan1121 已提交
50 51 52 53
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
54
    case JOB_TASK_STATUS_PART_SUCC:
H
Hongze Cheng 已提交
55 56
      if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_SUCC && newStatus != JOB_TASK_STATUS_FAIL &&
          newStatus != JOB_TASK_STATUS_DROP) {
D
dapan1121 已提交
57 58 59 60
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
61 62
    case JOB_TASK_STATUS_SUCC:
      if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_FAIL) {
D
dapan1121 已提交
63 64 65 66
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
67 68
    case JOB_TASK_STATUS_FAIL:
      if (newStatus != JOB_TASK_STATUS_DROP) {
D
dapan1121 已提交
69 70 71 72
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      break;

D
dapan1121 已提交
73 74
    case JOB_TASK_STATUS_DROP:
      if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_PART_SUCC) {
D
dapan1121 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      break;

    default:
      QW_TASK_ELOG("invalid task origStatus:%s", jobTaskStatusStr(oriStatus));
      return TSDB_CODE_QRY_APP_ERROR;
  }

  return TSDB_CODE_SUCCESS;

_return:

  QW_TASK_ELOG("invalid task status update from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
  QW_RET(code);
}

D
dapan1121 已提交
92 93
void qwDbgDumpSchInfo(SQWorker *mgmt, SQWSchStatus *sch, int32_t i) {
  QW_LOCK(QW_READ, &sch->tasksLock);
H
Hongze Cheng 已提交
94 95
  QW_DLOG("the %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs,
          taosHashGetSize(sch->tasksHash));
D
dapan1121 已提交
96 97
  QW_UNLOCK(QW_READ, &sch->tasksLock);
}
D
dapan1121 已提交
98 99 100 101 102 103 104 105

void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
  if (!gQWDebug.dumpEnable) {
    return;
  }

  QW_LOCK(QW_READ, &mgmt->schLock);

D
dapan1121 已提交
106
  QW_DUMP("total remain scheduler num %d", taosHashGetSize(mgmt->schHash));
D
dapan1121 已提交
107 108 109 110 111 112 113 114 115

  void         *key = NULL;
  size_t        keyLen = 0;
  int32_t       i = 0;
  SQWSchStatus *sch = NULL;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
    sch = (SQWSchStatus *)pIter;
D
dapan1121 已提交
116
    qwDbgDumpSchInfo(mgmt, sch, i);
D
dapan1121 已提交
117 118 119 120 121 122
    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

  QW_UNLOCK(QW_READ, &mgmt->schLock);

D
dapan1121 已提交
123
  QW_DUMP("total remain ctx num %d", taosHashGetSize(mgmt->ctxHash));
D
dapan1121 已提交
124 125
}

D
dapan1121 已提交
126 127
int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet) {
  int32_t contLen = 0;
H
Hongze Cheng 已提交
128 129
  char   *rsp = NULL;

D
dapan1121 已提交
130 131 132 133 134 135 136 137 138 139 140 141 142
  if (pEpSet) {
    contLen = tSerializeSEpSet(NULL, 0, pEpSet);
    rsp = rpcMallocCont(contLen);
    tSerializeSEpSet(rsp, contLen, pEpSet);
  }

  SRpcMsg rpcRsp = {
      .msgType = rspType,
      .pCont = rsp,
      .contLen = contLen,
      .code = code,
      .info = *pConn,
  };
D
dapan1121 已提交
143
  rpcRsp.info.hasEpSet = 1;
D
dapan1121 已提交
144 145 146 147 148 149 150 151

  tmsgSendRsp(&rpcRsp);

  qDebug("response %s msg, code: %s", TMSG_INFO(rspType), tstrerror(code));

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
152 153 154 155 156
void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) {
  static int32_t ignoreTime = 0;
  if (*rsped) {
    return;
  }
H
Hongze Cheng 已提交
157

D
dapan1121 已提交
158 159 160 161
  if (gQWDebug.redirectSimulate) {
    if (++ignoreTime <= 10) {
      return;
    }
H
Hongze Cheng 已提交
162

D
dapan1121 已提交
163
    if (TDMT_SCH_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
D
dapan1121 已提交
164 165 166 167 168 169 170 171 172
      SEpSet epSet = {0};
      epSet.inUse = 1;
      epSet.numOfEps = 3;
      strcpy(epSet.eps[0].fqdn, "localhost");
      epSet.eps[0].port = 7100;
      strcpy(epSet.eps[1].fqdn, "localhost");
      epSet.eps[1].port = 7200;
      strcpy(epSet.eps[2].fqdn, "localhost");
      epSet.eps[2].port = 7300;
D
dapan1121 已提交
173

H
Hongze Cheng 已提交
174
      ctx->phase = QW_PHASE_POST_QUERY;
D
dapan1121 已提交
175
      qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet);
D
dapan1121 已提交
176
      *rsped = true;
D
dapan1121 已提交
177
      return;
D
dapan1121 已提交
178
    }
H
Hongze Cheng 已提交
179

D
dapan1121 已提交
180
    if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
D
dapan1121 已提交
181
      QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY);
D
dapan1121 已提交
182
      qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL);
D
dapan1121 已提交
183
      *rsped = true;
D
dapan1121 已提交
184
      return;
D
dapan1121 已提交
185 186
    }

D
dapan1121 已提交
187
    if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) {
D
dapan1121 已提交
188 189
      qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL);
      *rsped = true;
D
dapan1121 已提交
190
      return;
D
dapan1121 已提交
191 192 193 194
    }
  }
}

D
dapan1121 已提交
195
void qwDbgSimulateSleep(void) {
D
dapan1121 已提交
196 197 198 199
  if (!gQWDebug.sleepSimulate) {
    return;
  }

D
dapan1121 已提交
200 201 202 203
  static int32_t ignoreTime = 0;
  if (++ignoreTime > 10) {
    taosSsleep(taosRand() % 20);
  }
D
dapan1121 已提交
204 205
}

D
dapan1121 已提交
206
void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) {
D
dapan1121 已提交
207 208 209 210
  if (!gQWDebug.deadSimulate) {
    return;
  }

D
dapan1121 已提交
211 212 213 214 215
  if (*rsped) {
    return;
  }

  static int32_t ignoreTime = 0;
D
dapan1121 已提交
216

D
dapan1121 已提交
217
  if (++ignoreTime > 10 && 0 == taosRand() % 9) {
H
Hongze Cheng 已提交
218 219 220
    SRpcHandleInfo *pConn =
        ((ctx->msgType == TDMT_SCH_FETCH || ctx->msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo
                                                                                  : &ctx->ctrlConnInfo);
D
dapan1121 已提交
221 222 223 224
    qwBuildAndSendErrorRsp(ctx->msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK);

    qwBuildAndSendDropMsg(QW_FPARAMS(), pConn);
    *rsped = true;
H
Hongze Cheng 已提交
225

D
dapan1121 已提交
226 227
    return;
  }
D
dapan1121 已提交
228 229
}

D
dapan1121 已提交
230 231 232
int32_t qwDbgEnableDebug(char *option) {
  if (0 == strcasecmp(option, "lock")) {
    gQWDebug.lockEnable = true;
D
dapan1121 已提交
233
    qError("qw lock debug enabled");
D
dapan1121 已提交
234 235 236 237 238
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "status")) {
    gQWDebug.statusEnable = true;
D
dapan1121 已提交
239
    qError("qw status debug enabled");
D
dapan1121 已提交
240 241 242 243 244
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "dump")) {
    gQWDebug.dumpEnable = true;
D
dapan1121 已提交
245 246 247 248 249 250 251 252 253 254 255
    qError("qw dump debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "sleep")) {
    gQWDebug.sleepSimulate = true;
    qError("qw sleep debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "dead")) {
D
dapan1121 已提交
256
    gQWDebug.deadSimulate = true;
D
dapan1121 已提交
257
    qError("qw dead debug enabled");
D
dapan1121 已提交
258 259 260
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
261 262 263
  if (0 == strcasecmp(option, "redirect")) {
    gQWDebug.redirectSimulate = true;
    qError("qw redirect debug enabled");
D
dapan1121 已提交
264 265 266 267
    return TSDB_CODE_SUCCESS;
  }

  qError("invalid qw debug option:%s", option);
H
Hongze Cheng 已提交
268

D
dapan1121 已提交
269 270
  return TSDB_CODE_APP_ERROR;
}