qwDbg.c 6.3 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11
#include "qworker.h"
#include "dataSinkMgt.h"
#include "executor.h"
#include "planner.h"
#include "query.h"
#include "qwInt.h"
#include "qwMsg.h"
#include "tcommon.h"
#include "tmsg.h"
#include "tname.h"

D
dapan1121 已提交
12
SQWDebug     gQWDebug = {.statusEnable = true, .dumpEnable = true, .tmp = false};
D
dapan1121 已提交
13 14 15 16 17 18 19 20 21

int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
  if (!gQWDebug.statusEnable) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = 0;

  if (oriStatus == newStatus) {
D
dapan1121 已提交
22
    if (newStatus == JOB_TASK_STATUS_EXEC || newStatus == JOB_TASK_STATUS_FAIL) {
D
dapan1121 已提交
23 24 25 26 27 28 29 30 31
      *ignore = true;
      return TSDB_CODE_SUCCESS;
    }

    QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
  }

  switch (oriStatus) {
    case JOB_TASK_STATUS_NULL:
D
dapan1121 已提交
32 33
      if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_FAIL &&
          newStatus != JOB_TASK_STATUS_INIT) {
D
dapan1121 已提交
34 35 36 37
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
38 39 40
    case JOB_TASK_STATUS_INIT:
      if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC
        && newStatus != JOB_TASK_STATUS_FAIL) {
D
dapan1121 已提交
41 42 43 44
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
45 46 47
    case JOB_TASK_STATUS_EXEC:
      if (newStatus != JOB_TASK_STATUS_PART_SUCC && newStatus != JOB_TASK_STATUS_SUCC &&
          newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_DROP) {
D
dapan1121 已提交
48 49 50 51
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
52 53 54
    case JOB_TASK_STATUS_PART_SUCC:
      if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_SUCC &&
          newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_DROP) {
D
dapan1121 已提交
55 56 57 58
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
59 60
    case JOB_TASK_STATUS_SUCC:
      if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_FAIL) {
D
dapan1121 已提交
61 62 63 64
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }

      break;
D
dapan1121 已提交
65 66
    case JOB_TASK_STATUS_FAIL:
      if (newStatus != JOB_TASK_STATUS_DROP) {
D
dapan1121 已提交
67 68 69 70
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      break;

D
dapan1121 已提交
71 72
    case JOB_TASK_STATUS_DROP:
      if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_PART_SUCC) {
D
dapan1121 已提交
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
        QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
      }
      break;

    default:
      QW_TASK_ELOG("invalid task origStatus:%s", jobTaskStatusStr(oriStatus));
      return TSDB_CODE_QRY_APP_ERROR;
  }

  return TSDB_CODE_SUCCESS;

_return:

  QW_TASK_ELOG("invalid task status update from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
  QW_RET(code);
}

D
dapan1121 已提交
90 91 92 93 94
void qwDbgDumpSchInfo(SQWorker *mgmt, SQWSchStatus *sch, int32_t i) {
  QW_LOCK(QW_READ, &sch->tasksLock);
  QW_DLOG("the %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, taosHashGetSize(sch->tasksHash));
  QW_UNLOCK(QW_READ, &sch->tasksLock);
}
D
dapan1121 已提交
95 96 97 98 99 100 101 102

void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
  if (!gQWDebug.dumpEnable) {
    return;
  }

  QW_LOCK(QW_READ, &mgmt->schLock);

D
dapan1121 已提交
103
  QW_DUMP("total remain scheduler num %d", taosHashGetSize(mgmt->schHash));
D
dapan1121 已提交
104 105 106 107 108 109 110 111 112

  void         *key = NULL;
  size_t        keyLen = 0;
  int32_t       i = 0;
  SQWSchStatus *sch = NULL;

  void *pIter = taosHashIterate(mgmt->schHash, NULL);
  while (pIter) {
    sch = (SQWSchStatus *)pIter;
D
dapan1121 已提交
113
    qwDbgDumpSchInfo(mgmt, sch, i);
D
dapan1121 已提交
114 115 116 117 118 119
    ++i;
    pIter = taosHashIterate(mgmt->schHash, pIter);
  }

  QW_UNLOCK(QW_READ, &mgmt->schLock);

D
dapan1121 已提交
120
  QW_DUMP("total remain ctx num %d", taosHashGetSize(mgmt->ctxHash));
D
dapan1121 已提交
121 122 123
}


D
dapan1121 已提交
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet) {
  int32_t contLen = 0;
  char* rsp = NULL;
  
  if (pEpSet) {
    contLen = tSerializeSEpSet(NULL, 0, pEpSet);
    rsp = rpcMallocCont(contLen);
    tSerializeSEpSet(rsp, contLen, pEpSet);
  }

  SRpcMsg rpcRsp = {
      .msgType = rspType,
      .pCont = rsp,
      .contLen = contLen,
      .code = code,
      .info = *pConn,
  };
D
dapan1121 已提交
141
  rpcRsp.info.hasEpSet = 1;
D
dapan1121 已提交
142 143 144 145 146 147 148 149

  tmsgSendRsp(&rpcRsp);

  qDebug("response %s msg, code: %s", TMSG_INFO(rspType), tstrerror(code));

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
150
int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) {
D
dapan1121 已提交
151
  if (gQWDebug.tmp) {
D
dapan1121 已提交
152
    if (TDMT_SCH_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
D
dapan1121 已提交
153 154 155 156 157 158 159 160 161
      SEpSet epSet = {0};
      epSet.inUse = 1;
      epSet.numOfEps = 3;
      strcpy(epSet.eps[0].fqdn, "localhost");
      epSet.eps[0].port = 7100;
      strcpy(epSet.eps[1].fqdn, "localhost");
      epSet.eps[1].port = 7200;
      strcpy(epSet.eps[2].fqdn, "localhost");
      epSet.eps[2].port = 7300;
D
dapan1121 已提交
162 163

      ctx->phase = QW_PHASE_POST_QUERY;      
D
dapan1121 已提交
164 165 166 167
      qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet);
      return TSDB_CODE_SUCCESS;
    }
    
D
dapan1121 已提交
168
    if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
D
dapan1121 已提交
169
      QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY);
D
dapan1121 已提交
170 171 172 173 174 175 176 177
      qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL);
      return TSDB_CODE_SUCCESS;
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
178 179 180 181 182 183 184 185 186 187 188 189 190
void qwDbgSimulateSleep() {
  if (!gQWDebug.sleepSimulate) {
    return;
  }

  taosSsleep(taosRand() % 10);
}

void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t msgType) {
  if (!gQWDebug.deadSimulate) {
    return;
  }

D
dapan1121 已提交
191 192
  SRpcHandleInfo *pConn = ((msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo : &ctx->ctrlConnInfo);
  qwBuildAndSendErrorRsp(msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK);
D
dapan1121 已提交
193 194 195 196 197

  qwDropTask(QW_FPARAMS());
}


D
dapan1121 已提交
198 199 200 201

int32_t qwDbgEnableDebug(char *option) {
  if (0 == strcasecmp(option, "lock")) {
    gQWDebug.lockEnable = true;
D
dapan1121 已提交
202
    qError("qw lock debug enabled");
D
dapan1121 已提交
203 204 205 206 207
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "status")) {
    gQWDebug.statusEnable = true;
D
dapan1121 已提交
208
    qError("qw status debug enabled");
D
dapan1121 已提交
209 210 211 212 213
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "dump")) {
    gQWDebug.dumpEnable = true;
D
dapan1121 已提交
214 215 216 217 218 219 220 221 222 223 224 225 226
    qError("qw dump debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "sleep")) {
    gQWDebug.sleepSimulate = true;
    qError("qw sleep debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "dead")) {
    gQWDebug.sleepSimulate = true;
    qError("qw dead debug enabled");
D
dapan1121 已提交
227 228 229 230 231
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "tmp")) {
    gQWDebug.tmp = true;
D
dapan1121 已提交
232
    qError("qw tmp debug enabled");
D
dapan1121 已提交
233 234 235 236 237 238 239 240 241
    return TSDB_CODE_SUCCESS;
  }

  qError("invalid qw debug option:%s", option);
  
  return TSDB_CODE_APP_ERROR;
}