qwDbg.c 8.8 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6
#include "dataSinkMgt.h"
#include "executor.h"
#include "planner.h"
#include "query.h"
#include "qwInt.h"
#include "qwMsg.h"
H
Hongze Cheng 已提交
7
#include "qworker.h"
D
dapan1121 已提交
8 9 10 11
#include "tcommon.h"
#include "tmsg.h"
#include "tname.h"

H
Hongze Cheng 已提交
12 13 14 15 16
SQWDebug gQWDebug = {.statusEnable = true,
                     .dumpEnable = false,
                     .redirectSimulate = false,
                     .deadSimulate = false,
                     .sleepSimulate = false};
D
dapan1121 已提交
17 18 19 20 21 22 23 24 25

int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
  if (!gQWDebug.statusEnable) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = 0;

  if (oriStatus == newStatus) {
D
dapan1121 已提交
26
    if (newStatus == JOB_TASK_STATUS_EXEC || newStatus == JOB_TASK_STATUS_FAIL) {
D
dapan1121 已提交
27 28 29 30
      *ignore = true;
      return TSDB_CODE_SUCCESS;
    }

S
Shengliang Guan 已提交
31
    QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
32 33 34 35
  }

  switch (oriStatus) {
    case JOB_TASK_STATUS_NULL:
H
Hongze Cheng 已提交
36
      if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_INIT) {
S
Shengliang Guan 已提交
37
        QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
38 39 40
      }

      break;
D
dapan1121 已提交
41
    case JOB_TASK_STATUS_INIT:
H
Hongze Cheng 已提交
42
      if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_FAIL) {
S
Shengliang Guan 已提交
43
        QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
44 45 46
      }

      break;
D
dapan1121 已提交
47 48 49
    case JOB_TASK_STATUS_EXEC:
      if (newStatus != JOB_TASK_STATUS_PART_SUCC && newStatus != JOB_TASK_STATUS_SUCC &&
          newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
50
        QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
51 52 53
      }

      break;
D
dapan1121 已提交
54
    case JOB_TASK_STATUS_PART_SUCC:
H
Hongze Cheng 已提交
55 56
      if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_SUCC && newStatus != JOB_TASK_STATUS_FAIL &&
          newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
57
        QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
58 59 60
      }

      break;
D
dapan1121 已提交
61 62
    case JOB_TASK_STATUS_SUCC:
      if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_FAIL) {
S
Shengliang Guan 已提交
63
        QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
64 65 66
      }

      break;
D
dapan1121 已提交
67 68
    case JOB_TASK_STATUS_FAIL:
      if (newStatus != JOB_TASK_STATUS_DROP) {
S
Shengliang Guan 已提交
69
        QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
70 71 72
      }
      break;

D
dapan1121 已提交
73 74
    case JOB_TASK_STATUS_DROP:
      if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_PART_SUCC) {
S
Shengliang Guan 已提交
75
        QW_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
76 77 78 79 80
      }
      break;

    default:
      QW_TASK_ELOG("invalid task origStatus:%s", jobTaskStatusStr(oriStatus));
S
Shengliang Guan 已提交
81
      return TSDB_CODE_APP_ERROR;
D
dapan1121 已提交
82 83 84 85 86 87 88 89 90 91
  }

  return TSDB_CODE_SUCCESS;

_return:

  QW_TASK_ELOG("invalid task status update from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
  QW_RET(code);
}

D
dapan1121 已提交
92 93
void qwDbgDumpSchInfo(SQWorker *mgmt, SQWSchStatus *sch, int32_t i) {
  QW_LOCK(QW_READ, &sch->tasksLock);
D
dapan1121 已提交
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
  int32_t taskNum = taosHashGetSize(sch->tasksHash);
  QW_DLOG("***The %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, taskNum);

  uint64_t qId, tId;
  int32_t  eId;
  SQWTaskStatus *pTask = NULL;
  void *pIter = taosHashIterate(sch->tasksHash, NULL);
  while (pIter) {
    pTask = (SQWTaskStatus *)pIter;
    void       *key = taosHashGetKey(pIter, NULL);
    QW_GET_QTID(key, qId, tId, eId);

    QW_TASK_DLOG("job refId:%" PRIx64 ", code:%x, task status:%d", pTask->refId, pTask->code, pTask->status);

    pIter = taosHashIterate(sch->tasksHash, pIter);
  }
  
D
dapan1121 已提交
111 112
  QW_UNLOCK(QW_READ, &sch->tasksLock);
}
D
dapan1121 已提交
113

D
dapan1121 已提交
114 115 116 117 118 119 120 121 122 123 124 125 126
void qwDbgDumpTasksInfo(SQWorker *mgmt) {
  QW_DUMP("***Total remain ctx num %d", taosHashGetSize(mgmt->ctxHash));

  int32_t i = 0;
  SQWTaskCtx *ctx = NULL;
  uint64_t qId, tId;
  int32_t  eId;
  void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
  while (pIter) {
    ctx = (SQWTaskCtx *)pIter;
    void       *key = taosHashGetKey(pIter, NULL);
    QW_GET_QTID(key, qId, tId, eId);
    
127 128
    QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, msgType:%d, "
      "sId:%x, level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, "
D
dapan1121 已提交
129 130
      "rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbFName:%s, sver:%d, tver:%d, events:%d,%d,%d,%d,%d",
      ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->msgType,
131
      ctx->sId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue, 
D
dapan1121 已提交
132 133 134 135 136 137 138 139 140
      ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, ctx->tbInfo.tbFName,
      ctx->tbInfo.sversion, ctx->tbInfo.tversion, ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY], 
      ctx->events[QW_EVENT_FETCH], ctx->events[QW_EVENT_DROP], ctx->events[QW_EVENT_CQUERY]);
      
    pIter = taosHashIterate(mgmt->ctxHash, pIter);
  }

}

D
dapan1121 已提交
141 142 143 144 145 146 147
void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
  if (!gQWDebug.dumpEnable) {
    return;
  }

  QW_LOCK(QW_READ, &mgmt->schLock);

D
dapan1121 已提交
148
  QW_DUMP("total remain scheduler num %d", taosHashGetSize(mgmt->schHash));
D
dapan1121 已提交
149 150 151 152 153 154 155 156 157

  void         *key = NULL;
  size_t        keyLen = 0;
  int32_t       i = 0;
  SQWSchStatus *sch = NULL;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
    sch = (SQWSchStatus *)pIter;
D
dapan1121 已提交
158
    qwDbgDumpSchInfo(mgmt, sch, i);
D
dapan1121 已提交
159 160 161 162 163 164
    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

  QW_UNLOCK(QW_READ, &mgmt->schLock);

D
dapan1121 已提交
165
  qwDbgDumpTasksInfo(mgmt);
D
dapan1121 已提交
166 167
}

D
dapan1121 已提交
168 169
int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet) {
  int32_t contLen = 0;
H
Hongze Cheng 已提交
170 171
  char   *rsp = NULL;

D
dapan1121 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184
  if (pEpSet) {
    contLen = tSerializeSEpSet(NULL, 0, pEpSet);
    rsp = rpcMallocCont(contLen);
    tSerializeSEpSet(rsp, contLen, pEpSet);
  }

  SRpcMsg rpcRsp = {
      .msgType = rspType,
      .pCont = rsp,
      .contLen = contLen,
      .code = code,
      .info = *pConn,
  };
D
dapan1121 已提交
185
  rpcRsp.info.hasEpSet = 1;
D
dapan1121 已提交
186 187 188 189 190 191 192 193

  tmsgSendRsp(&rpcRsp);

  qDebug("response %s msg, code: %s", TMSG_INFO(rspType), tstrerror(code));

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
194 195 196 197 198
void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) {
  static int32_t ignoreTime = 0;
  if (*rsped) {
    return;
  }
H
Hongze Cheng 已提交
199

D
dapan1121 已提交
200 201 202 203
  if (gQWDebug.redirectSimulate) {
    if (++ignoreTime <= 10) {
      return;
    }
H
Hongze Cheng 已提交
204

D
dapan1121 已提交
205
    if (TDMT_SCH_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
D
dapan1121 已提交
206 207 208 209 210 211 212 213 214
      SEpSet epSet = {0};
      epSet.inUse = 1;
      epSet.numOfEps = 3;
      strcpy(epSet.eps[0].fqdn, "localhost");
      epSet.eps[0].port = 7100;
      strcpy(epSet.eps[1].fqdn, "localhost");
      epSet.eps[1].port = 7200;
      strcpy(epSet.eps[2].fqdn, "localhost");
      epSet.eps[2].port = 7300;
D
dapan1121 已提交
215

H
Hongze Cheng 已提交
216
      ctx->phase = QW_PHASE_POST_QUERY;
217
      qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet);
D
dapan1121 已提交
218
      *rsped = true;
D
dapan1121 已提交
219
      return;
D
dapan1121 已提交
220
    }
H
Hongze Cheng 已提交
221

D
dapan1121 已提交
222
    if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
D
dapan1121 已提交
223
      QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY);
224
      qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL);
D
dapan1121 已提交
225
      *rsped = true;
D
dapan1121 已提交
226
      return;
D
dapan1121 已提交
227 228
    }

D
dapan1121 已提交
229
    if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) {
230
      qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL);
D
dapan1121 已提交
231
      *rsped = true;
D
dapan1121 已提交
232
      return;
D
dapan1121 已提交
233 234 235 236
    }
  }
}

D
dapan1121 已提交
237
void qwDbgSimulateSleep(void) {
D
dapan1121 已提交
238 239 240 241
  if (!gQWDebug.sleepSimulate) {
    return;
  }

D
dapan1121 已提交
242 243 244 245
  static int32_t ignoreTime = 0;
  if (++ignoreTime > 10) {
    taosSsleep(taosRand() % 20);
  }
D
dapan1121 已提交
246 247
}

D
dapan1121 已提交
248
void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) {
D
dapan1121 已提交
249 250 251 252
  if (!gQWDebug.deadSimulate) {
    return;
  }

D
dapan1121 已提交
253 254 255 256 257
  if (*rsped) {
    return;
  }

  static int32_t ignoreTime = 0;
D
dapan1121 已提交
258

D
dapan1121 已提交
259
  if (++ignoreTime > 10 && 0 == taosRand() % 9) {
H
Hongze Cheng 已提交
260 261 262
    SRpcHandleInfo *pConn =
        ((ctx->msgType == TDMT_SCH_FETCH || ctx->msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo
                                                                                  : &ctx->ctrlConnInfo);
D
dapan1121 已提交
263 264 265 266
    qwBuildAndSendErrorRsp(ctx->msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK);

    qwBuildAndSendDropMsg(QW_FPARAMS(), pConn);
    *rsped = true;
H
Hongze Cheng 已提交
267

D
dapan1121 已提交
268 269
    return;
  }
D
dapan1121 已提交
270 271
}

D
dapan1121 已提交
272 273 274
int32_t qwDbgEnableDebug(char *option) {
  if (0 == strcasecmp(option, "lock")) {
    gQWDebug.lockEnable = true;
D
dapan1121 已提交
275
    qError("qw lock debug enabled");
D
dapan1121 已提交
276 277 278 279 280
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "status")) {
    gQWDebug.statusEnable = true;
D
dapan1121 已提交
281
    qError("qw status debug enabled");
D
dapan1121 已提交
282 283 284 285 286
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "dump")) {
    gQWDebug.dumpEnable = true;
D
dapan1121 已提交
287 288 289 290 291 292 293 294 295 296 297
    qError("qw dump debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "sleep")) {
    gQWDebug.sleepSimulate = true;
    qError("qw sleep debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "dead")) {
D
dapan1121 已提交
298
    gQWDebug.deadSimulate = true;
D
dapan1121 已提交
299
    qError("qw dead debug enabled");
D
dapan1121 已提交
300 301 302
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
303 304 305
  if (0 == strcasecmp(option, "redirect")) {
    gQWDebug.redirectSimulate = true;
    qError("qw redirect debug enabled");
D
dapan1121 已提交
306 307 308 309
    return TSDB_CODE_SUCCESS;
  }

  qError("invalid qw debug option:%s", option);
H
Hongze Cheng 已提交
310

D
dapan1121 已提交
311 312
  return TSDB_CODE_APP_ERROR;
}