qwDbg.c 8.9 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6
#include "dataSinkMgt.h"
#include "executor.h"
#include "planner.h"
#include "query.h"
#include "qwInt.h"
#include "qwMsg.h"
H
Hongze Cheng 已提交
7
#include "qworker.h"
D
dapan1121 已提交
8 9 10 11
#include "tcommon.h"
#include "tmsg.h"
#include "tname.h"

H
Hongze Cheng 已提交
12 13 14 15 16
SQWDebug gQWDebug = {.statusEnable = true,
                     .dumpEnable = false,
                     .redirectSimulate = false,
                     .deadSimulate = false,
                     .sleepSimulate = false};
D
dapan1121 已提交
17 18 19 20 21 22 23 24 25

int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
  if (!gQWDebug.statusEnable) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = 0;

  if (oriStatus == newStatus) {
D
dapan1121 已提交
26
    if (newStatus == JOB_TASK_STATUS_EXEC || newStatus == JOB_TASK_STATUS_FAIL) {
D
dapan1121 已提交
27 28 29 30 31 32 33 34 35
      *ignore = true;
      return TSDB_CODE_SUCCESS;
    }

    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

  switch (oriStatus) {
    case JOB_TASK_STATUS_NULL:
H
Hongze Cheng 已提交
36
      if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_INIT) {
D
dapan1121 已提交
37 38 39 40
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
41
    case JOB_TASK_STATUS_INIT:
H
Hongze Cheng 已提交
42
      if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_FAIL) {
D
dapan1121 已提交
43 44 45 46
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
47 48 49
    case JOB_TASK_STATUS_EXEC:
      if (newStatus != JOB_TASK_STATUS_PART_SUCC && newStatus != JOB_TASK_STATUS_SUCC &&
          newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_DROP) {
D
dapan1121 已提交
50 51 52 53
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
54
    case JOB_TASK_STATUS_PART_SUCC:
H
Hongze Cheng 已提交
55 56
      if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_SUCC && newStatus != JOB_TASK_STATUS_FAIL &&
          newStatus != JOB_TASK_STATUS_DROP) {
D
dapan1121 已提交
57 58 59 60
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
61 62
    case JOB_TASK_STATUS_SUCC:
      if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_FAIL) {
D
dapan1121 已提交
63 64 65 66
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
67 68
    case JOB_TASK_STATUS_FAIL:
      if (newStatus != JOB_TASK_STATUS_DROP) {
D
dapan1121 已提交
69 70 71 72
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      break;

D
dapan1121 已提交
73 74
    case JOB_TASK_STATUS_DROP:
      if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_PART_SUCC) {
D
dapan1121 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      break;

    default:
      QW_TASK_ELOG("invalid task origStatus:%s", jobTaskStatusStr(oriStatus));
      return TSDB_CODE_QRY_APP_ERROR;
  }

  return TSDB_CODE_SUCCESS;

_return:

  QW_TASK_ELOG("invalid task status update from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
  QW_RET(code);
}

D
dapan1121 已提交
92 93
void qwDbgDumpSchInfo(SQWorker *mgmt, SQWSchStatus *sch, int32_t i) {
  QW_LOCK(QW_READ, &sch->tasksLock);
D
dapan1121 已提交
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
  int32_t taskNum = taosHashGetSize(sch->tasksHash);
  QW_DLOG("***The %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, taskNum);

  uint64_t qId, tId;
  int32_t  eId;
  SQWTaskStatus *pTask = NULL;
  void *pIter = taosHashIterate(sch->tasksHash, NULL);
  while (pIter) {
    pTask = (SQWTaskStatus *)pIter;
    void       *key = taosHashGetKey(pIter, NULL);
    QW_GET_QTID(key, qId, tId, eId);

    QW_TASK_DLOG("job refId:%" PRIx64 ", code:%x, task status:%d", pTask->refId, pTask->code, pTask->status);

    pIter = taosHashIterate(sch->tasksHash, pIter);
  }
  
D
dapan1121 已提交
111 112
  QW_UNLOCK(QW_READ, &sch->tasksLock);
}
D
dapan1121 已提交
113

D
dapan1121 已提交
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
void qwDbgDumpTasksInfo(SQWorker *mgmt) {
  QW_DUMP("***Total remain ctx num %d", taosHashGetSize(mgmt->ctxHash));

  int32_t i = 0;
  SQWTaskCtx *ctx = NULL;
  uint64_t qId, tId;
  int32_t  eId;
  void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
  while (pIter) {
    ctx = (SQWTaskCtx *)pIter;
    void       *key = taosHashGetKey(pIter, NULL);
    QW_GET_QTID(key, qId, tId, eId);
    
    QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, msgType:%d, fetchType:%d, "
      "execId:%x, level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, "
      "rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbFName:%s, sver:%d, tver:%d, events:%d,%d,%d,%d,%d",
      ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->msgType,
      ctx->fetchType, ctx->execId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue, 
      ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, ctx->tbInfo.tbFName,
      ctx->tbInfo.sversion, ctx->tbInfo.tversion, ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY], 
      ctx->events[QW_EVENT_FETCH], ctx->events[QW_EVENT_DROP], ctx->events[QW_EVENT_CQUERY]);
      
    pIter = taosHashIterate(mgmt->ctxHash, pIter);
  }

}

D
dapan1121 已提交
141 142 143 144 145 146 147
void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
  if (!gQWDebug.dumpEnable) {
    return;
  }

  QW_LOCK(QW_READ, &mgmt->schLock);

D
dapan1121 已提交
148
  QW_DUMP("total remain scheduler num %d", taosHashGetSize(mgmt->schHash));
D
dapan1121 已提交
149 150 151 152 153 154 155 156 157

  void         *key = NULL;
  size_t        keyLen = 0;
  int32_t       i = 0;
  SQWSchStatus *sch = NULL;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
    sch = (SQWSchStatus *)pIter;
D
dapan1121 已提交
158
    qwDbgDumpSchInfo(mgmt, sch, i);
D
dapan1121 已提交
159 160 161 162 163 164
    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

  QW_UNLOCK(QW_READ, &mgmt->schLock);

D
dapan1121 已提交
165
  qwDbgDumpTasksInfo(mgmt);
D
dapan1121 已提交
166 167
}

D
dapan1121 已提交
168 169
int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet) {
  int32_t contLen = 0;
H
Hongze Cheng 已提交
170 171
  char   *rsp = NULL;

D
dapan1121 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184
  if (pEpSet) {
    contLen = tSerializeSEpSet(NULL, 0, pEpSet);
    rsp = rpcMallocCont(contLen);
    tSerializeSEpSet(rsp, contLen, pEpSet);
  }

  SRpcMsg rpcRsp = {
      .msgType = rspType,
      .pCont = rsp,
      .contLen = contLen,
      .code = code,
      .info = *pConn,
  };
D
dapan1121 已提交
185
  rpcRsp.info.hasEpSet = 1;
D
dapan1121 已提交
186 187 188 189 190 191 192 193

  tmsgSendRsp(&rpcRsp);

  qDebug("response %s msg, code: %s", TMSG_INFO(rspType), tstrerror(code));

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
194 195 196 197 198
void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) {
  static int32_t ignoreTime = 0;
  if (*rsped) {
    return;
  }
H
Hongze Cheng 已提交
199

D
dapan1121 已提交
200 201 202 203
  if (gQWDebug.redirectSimulate) {
    if (++ignoreTime <= 10) {
      return;
    }
H
Hongze Cheng 已提交
204

D
dapan1121 已提交
205
    if (TDMT_SCH_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
D
dapan1121 已提交
206 207 208 209 210 211 212 213 214
      SEpSet epSet = {0};
      epSet.inUse = 1;
      epSet.numOfEps = 3;
      strcpy(epSet.eps[0].fqdn, "localhost");
      epSet.eps[0].port = 7100;
      strcpy(epSet.eps[1].fqdn, "localhost");
      epSet.eps[1].port = 7200;
      strcpy(epSet.eps[2].fqdn, "localhost");
      epSet.eps[2].port = 7300;
D
dapan1121 已提交
215

H
Hongze Cheng 已提交
216
      ctx->phase = QW_PHASE_POST_QUERY;
D
dapan1121 已提交
217
      qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet);
D
dapan1121 已提交
218
      *rsped = true;
D
dapan1121 已提交
219
      return;
D
dapan1121 已提交
220
    }
H
Hongze Cheng 已提交
221

D
dapan1121 已提交
222
    if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
D
dapan1121 已提交
223
      QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY);
D
dapan1121 已提交
224
      qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL);
D
dapan1121 已提交
225
      *rsped = true;
D
dapan1121 已提交
226
      return;
D
dapan1121 已提交
227 228
    }

D
dapan1121 已提交
229
    if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) {
D
dapan1121 已提交
230 231
      qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL);
      *rsped = true;
D
dapan1121 已提交
232
      return;
D
dapan1121 已提交
233 234 235 236
    }
  }
}

D
dapan1121 已提交
237
void qwDbgSimulateSleep(void) {
D
dapan1121 已提交
238 239 240 241
  if (!gQWDebug.sleepSimulate) {
    return;
  }

D
dapan1121 已提交
242 243 244 245
  static int32_t ignoreTime = 0;
  if (++ignoreTime > 10) {
    taosSsleep(taosRand() % 20);
  }
D
dapan1121 已提交
246 247
}

D
dapan1121 已提交
248
void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) {
D
dapan1121 已提交
249 250 251 252
  if (!gQWDebug.deadSimulate) {
    return;
  }

D
dapan1121 已提交
253 254 255 256 257
  if (*rsped) {
    return;
  }

  static int32_t ignoreTime = 0;
D
dapan1121 已提交
258

D
dapan1121 已提交
259
  if (++ignoreTime > 10 && 0 == taosRand() % 9) {
H
Hongze Cheng 已提交
260 261 262
    SRpcHandleInfo *pConn =
        ((ctx->msgType == TDMT_SCH_FETCH || ctx->msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo
                                                                                  : &ctx->ctrlConnInfo);
D
dapan1121 已提交
263 264 265 266
    qwBuildAndSendErrorRsp(ctx->msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK);

    qwBuildAndSendDropMsg(QW_FPARAMS(), pConn);
    *rsped = true;
H
Hongze Cheng 已提交
267

D
dapan1121 已提交
268 269
    return;
  }
D
dapan1121 已提交
270 271
}

D
dapan1121 已提交
272 273 274
int32_t qwDbgEnableDebug(char *option) {
  if (0 == strcasecmp(option, "lock")) {
    gQWDebug.lockEnable = true;
D
dapan1121 已提交
275
    qError("qw lock debug enabled");
D
dapan1121 已提交
276 277 278 279 280
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "status")) {
    gQWDebug.statusEnable = true;
D
dapan1121 已提交
281
    qError("qw status debug enabled");
D
dapan1121 已提交
282 283 284 285 286
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "dump")) {
    gQWDebug.dumpEnable = true;
D
dapan1121 已提交
287 288 289 290 291 292 293 294 295 296 297
    qError("qw dump debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "sleep")) {
    gQWDebug.sleepSimulate = true;
    qError("qw sleep debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "dead")) {
D
dapan1121 已提交
298
    gQWDebug.deadSimulate = true;
D
dapan1121 已提交
299
    qError("qw dead debug enabled");
D
dapan1121 已提交
300 301 302
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
303 304 305
  if (0 == strcasecmp(option, "redirect")) {
    gQWDebug.redirectSimulate = true;
    qError("qw redirect debug enabled");
D
dapan1121 已提交
306 307 308 309
    return TSDB_CODE_SUCCESS;
  }

  qError("invalid qw debug option:%s", option);
H
Hongze Cheng 已提交
310

D
dapan1121 已提交
311 312
  return TSDB_CODE_APP_ERROR;
}