qwDbg.c 9.1 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6
#include "dataSinkMgt.h"
#include "executor.h"
#include "planner.h"
#include "query.h"
#include "qwInt.h"
#include "qwMsg.h"
H
Hongze Cheng 已提交
7
#include "qworker.h"
D
dapan1121 已提交
8 9 10 11
#include "tcommon.h"
#include "tmsg.h"
#include "tname.h"

12 13
SQWDebug gQWDebug = {.lockEnable = false,
                     .statusEnable = true,
H
Hongze Cheng 已提交
14 15 16
                     .dumpEnable = false,
                     .redirectSimulate = false,
                     .deadSimulate = false,
17 18
                     .sleepSimulate = false,
                     .forceStop = false};
D
dapan1121 已提交
19 20 21 22 23 24 25 26 27

int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
  if (!gQWDebug.statusEnable) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = 0;

  if (oriStatus == newStatus) {
D
dapan1121 已提交
28
    if (newStatus == JOB_TASK_STATUS_EXEC || newStatus == JOB_TASK_STATUS_FAIL) {
D
dapan1121 已提交
29 30 31 32
      *ignore = true;
      return TSDB_CODE_SUCCESS;
    }

S
Shengliang Guan 已提交
33
    QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
34 35 36 37
  }

  switch (oriStatus) {
    case JOB_TASK_STATUS_NULL:
H
Hongze Cheng 已提交
38
      if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_INIT) {
S
Shengliang Guan 已提交
39
        QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
40 41 42
      }

      break;
D
dapan1121 已提交
43
    case JOB_TASK_STATUS_INIT:
H
Hongze Cheng 已提交
44
      if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_FAIL) {
S
Shengliang Guan 已提交
45
        QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
46 47 48
      }

      break;
D
dapan1121 已提交
49 50 51
    case JOB_TASK_STATUS_EXEC:
      if (newStatus != JOB_TASK_STATUS_PART_SUCC && newStatus != JOB_TASK_STATUS_SUCC &&
          newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
52
        QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
53 54 55
      }

      break;
D
dapan1121 已提交
56
    case JOB_TASK_STATUS_PART_SUCC:
H
Hongze Cheng 已提交
57 58
      if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_SUCC && newStatus != JOB_TASK_STATUS_FAIL &&
          newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
59
        QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
60 61 62
      }

      break;
D
dapan1121 已提交
63 64
    case JOB_TASK_STATUS_SUCC:
      if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_FAIL) {
S
Shengliang Guan 已提交
65
        QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
66 67 68
      }

      break;
D
dapan1121 已提交
69 70
    case JOB_TASK_STATUS_FAIL:
      if (newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
71
        QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
72 73 74
      }
      break;

D
dapan1121 已提交
75 76
    case JOB_TASK_STATUS_DROP:
      if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_PART_SUCC) {
S
Shengliang Guan 已提交
77
        QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
78 79 80 81 82
      }
      break;

    default:
      QW_TASK_ELOG("invalid task origStatus:%s", jobTaskStatusStr(oriStatus));
S
Shengliang Guan 已提交
83
      return TSDB_CODE_APP_ERROR;
D
dapan1121 已提交
84 85 86 87 88 89 90 91 92 93
  }

  return TSDB_CODE_SUCCESS;

_return:

  QW_TASK_ELOG("invalid task status update from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
  QW_RET(code);
}

D
dapan1121 已提交
94 95
void qwDbgDumpSchInfo(SQWorker *mgmt, SQWSchStatus *sch, int32_t i) {
  QW_LOCK(QW_READ, &sch->tasksLock);
D
dapan1121 已提交
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
  int32_t taskNum = taosHashGetSize(sch->tasksHash);
  QW_DLOG("***The %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, taskNum);

  uint64_t qId, tId;
  int32_t  eId;
  SQWTaskStatus *pTask = NULL;
  void *pIter = taosHashIterate(sch->tasksHash, NULL);
  while (pIter) {
    pTask = (SQWTaskStatus *)pIter;
    void       *key = taosHashGetKey(pIter, NULL);
    QW_GET_QTID(key, qId, tId, eId);

    QW_TASK_DLOG("job refId:%" PRIx64 ", code:%x, task status:%d", pTask->refId, pTask->code, pTask->status);

    pIter = taosHashIterate(sch->tasksHash, pIter);
  }
  
D
dapan1121 已提交
113 114
  QW_UNLOCK(QW_READ, &sch->tasksLock);
}
D
dapan1121 已提交
115

D
dapan1121 已提交
116 117 118 119 120 121 122 123 124 125 126 127 128
void qwDbgDumpTasksInfo(SQWorker *mgmt) {
  QW_DUMP("***Total remain ctx num %d", taosHashGetSize(mgmt->ctxHash));

  int32_t i = 0;
  SQWTaskCtx *ctx = NULL;
  uint64_t qId, tId;
  int32_t  eId;
  void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
  while (pIter) {
    ctx = (SQWTaskCtx *)pIter;
    void       *key = taosHashGetKey(pIter, NULL);
    QW_GET_QTID(key, qId, tId, eId);
    
129
    QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, msgType:%d, "
D
dapan1121 已提交
130
      "sId:%" PRId64 ", level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, "
D
dapan1121 已提交
131 132
      "rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbFName:%s, sver:%d, tver:%d, events:%d,%d,%d,%d,%d",
      ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->msgType,
133
      ctx->sId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue, 
D
dapan1121 已提交
134 135 136 137 138 139 140 141 142
      ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, ctx->tbInfo.tbFName,
      ctx->tbInfo.sversion, ctx->tbInfo.tversion, ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY], 
      ctx->events[QW_EVENT_FETCH], ctx->events[QW_EVENT_DROP], ctx->events[QW_EVENT_CQUERY]);
      
    pIter = taosHashIterate(mgmt->ctxHash, pIter);
  }

}

D
dapan1121 已提交
143 144 145 146 147 148 149
void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
  if (!gQWDebug.dumpEnable) {
    return;
  }

  QW_LOCK(QW_READ, &mgmt->schLock);

D
dapan1121 已提交
150
  QW_DUMP("total remain scheduler num %d", taosHashGetSize(mgmt->schHash));
D
dapan1121 已提交
151 152 153 154 155 156 157 158 159

  void         *key = NULL;
  size_t        keyLen = 0;
  int32_t       i = 0;
  SQWSchStatus *sch = NULL;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
    sch = (SQWSchStatus *)pIter;
D
dapan1121 已提交
160
    qwDbgDumpSchInfo(mgmt, sch, i);
D
dapan1121 已提交
161 162 163 164 165 166
    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

  QW_UNLOCK(QW_READ, &mgmt->schLock);

D
dapan1121 已提交
167
  qwDbgDumpTasksInfo(mgmt);
D
dapan1121 已提交
168 169
}

D
dapan1121 已提交
170 171
int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet) {
  int32_t contLen = 0;
H
Hongze Cheng 已提交
172 173
  char   *rsp = NULL;

D
dapan1121 已提交
174 175 176 177 178 179 180 181 182 183 184 185 186
  if (pEpSet) {
    contLen = tSerializeSEpSet(NULL, 0, pEpSet);
    rsp = rpcMallocCont(contLen);
    tSerializeSEpSet(rsp, contLen, pEpSet);
  }

  SRpcMsg rpcRsp = {
      .msgType = rspType,
      .pCont = rsp,
      .contLen = contLen,
      .code = code,
      .info = *pConn,
  };
D
dapan1121 已提交
187
  rpcRsp.info.hasEpSet = 1;
D
dapan1121 已提交
188 189 190 191 192 193 194 195

  tmsgSendRsp(&rpcRsp);

  qDebug("response %s msg, code: %s", TMSG_INFO(rspType), tstrerror(code));

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
196 197 198 199 200
void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) {
  static int32_t ignoreTime = 0;
  if (*rsped) {
    return;
  }
H
Hongze Cheng 已提交
201

D
dapan1121 已提交
202 203 204 205
  if (gQWDebug.redirectSimulate) {
    if (++ignoreTime <= 10) {
      return;
    }
H
Hongze Cheng 已提交
206

D
dapan1121 已提交
207
    if (TDMT_SCH_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
D
dapan1121 已提交
208 209 210 211 212 213 214 215 216
      SEpSet epSet = {0};
      epSet.inUse = 1;
      epSet.numOfEps = 3;
      strcpy(epSet.eps[0].fqdn, "localhost");
      epSet.eps[0].port = 7100;
      strcpy(epSet.eps[1].fqdn, "localhost");
      epSet.eps[1].port = 7200;
      strcpy(epSet.eps[2].fqdn, "localhost");
      epSet.eps[2].port = 7300;
D
dapan1121 已提交
217

H
Hongze Cheng 已提交
218
      ctx->phase = QW_PHASE_POST_QUERY;
219
      qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet);
D
dapan1121 已提交
220
      *rsped = true;
D
dapan1121 已提交
221
      return;
D
dapan1121 已提交
222
    }
H
Hongze Cheng 已提交
223

D
dapan1121 已提交
224
    if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
D
dapan1121 已提交
225
      QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY);
226
      qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL);
D
dapan1121 已提交
227
      *rsped = true;
D
dapan1121 已提交
228
      return;
D
dapan1121 已提交
229 230
    }

D
dapan1121 已提交
231
    if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) {
232
      qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL);
D
dapan1121 已提交
233
      *rsped = true;
D
dapan1121 已提交
234
      return;
D
dapan1121 已提交
235 236 237 238
    }
  }
}

D
dapan1121 已提交
239
void qwDbgSimulateSleep(void) {
D
dapan1121 已提交
240 241 242 243
  if (!gQWDebug.sleepSimulate) {
    return;
  }

D
dapan1121 已提交
244 245 246 247
  static int32_t ignoreTime = 0;
  if (++ignoreTime > 10) {
    taosSsleep(taosRand() % 20);
  }
D
dapan1121 已提交
248 249
}

D
dapan1121 已提交
250
void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) {
D
dapan1121 已提交
251 252 253 254
  if (!gQWDebug.deadSimulate) {
    return;
  }

D
dapan1121 已提交
255 256 257 258 259
  if (*rsped) {
    return;
  }

  static int32_t ignoreTime = 0;
D
dapan1121 已提交
260

D
dapan1121 已提交
261
  if (++ignoreTime > 10 && 0 == taosRand() % 9) {
H
Hongze Cheng 已提交
262 263 264
    SRpcHandleInfo *pConn =
        ((ctx->msgType == TDMT_SCH_FETCH || ctx->msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo
                                                                                  : &ctx->ctrlConnInfo);
D
dapan1121 已提交
265 266 267 268
    qwBuildAndSendErrorRsp(ctx->msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK);

    qwBuildAndSendDropMsg(QW_FPARAMS(), pConn);
    *rsped = true;
H
Hongze Cheng 已提交
269

D
dapan1121 已提交
270 271
    return;
  }
D
dapan1121 已提交
272 273
}

274
int32_t qWorkerDbgEnableDebug(char *option) {
D
dapan1121 已提交
275 276
  if (0 == strcasecmp(option, "lock")) {
    gQWDebug.lockEnable = true;
D
dapan1121 已提交
277
    qError("qw lock debug enabled");
D
dapan1121 已提交
278 279 280 281 282
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "status")) {
    gQWDebug.statusEnable = true;
D
dapan1121 已提交
283
    qError("qw status debug enabled");
D
dapan1121 已提交
284 285 286 287 288
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "dump")) {
    gQWDebug.dumpEnable = true;
D
dapan1121 已提交
289 290 291 292 293 294 295 296 297 298 299
    qError("qw dump debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "sleep")) {
    gQWDebug.sleepSimulate = true;
    qError("qw sleep debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "dead")) {
D
dapan1121 已提交
300
    gQWDebug.deadSimulate = true;
D
dapan1121 已提交
301
    qError("qw dead debug enabled");
D
dapan1121 已提交
302 303 304
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
305 306 307
  if (0 == strcasecmp(option, "redirect")) {
    gQWDebug.redirectSimulate = true;
    qError("qw redirect debug enabled");
D
dapan1121 已提交
308 309 310
    return TSDB_CODE_SUCCESS;
  }

311 312 313 314 315 316
  if (0 == strcasecmp(option, "forceStop")) {
    gQWDebug.forceStop = true;
    qError("qw forceStop debug enabled");
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
317
  qError("invalid qw debug option:%s", option);
H
Hongze Cheng 已提交
318

D
dapan1121 已提交
319 320
  return TSDB_CODE_APP_ERROR;
}