schUtil.c 8.1 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalog.h"
#include "command.h"
#include "query.h"
D
dapan1121 已提交
19
#include "schInt.h"
D
dapan1121 已提交
20 21 22 23
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"

H
Hongze Cheng 已提交
24 25 26
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) {
  qDebug("sch acquire jobId:0x%" PRIx64, refId);
  return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId);
D
dapan1121 已提交
27 28
}

H
Hongze Cheng 已提交
29
FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
D
dapan1121 已提交
30 31 32
  if (0 == refId) {
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
33 34 35

  qDebug("sch release jobId:0x%" PRIx64, refId);
  return taosReleaseRef(schMgmt.jobRef, refId);
D
dapan1121 已提交
36 37
}

H
Hongze Cheng 已提交
38
char *schGetOpStr(SCH_OP_TYPE type) {
D
dapan1121 已提交
39 40 41 42 43 44 45
  switch (type) {
    case SCH_OP_NULL:
      return "NULL";
    case SCH_OP_EXEC:
      return "EXEC";
    case SCH_OP_FETCH:
      return "FETCH";
D
dapan1121 已提交
46 47
    case SCH_OP_GET_STATUS:
      return "GET STATUS";
D
dapan1121 已提交
48 49 50 51
    default:
      return "UNKNOWN";
  }
}
D
dapan1121 已提交
52

D
dapan1121 已提交
53 54 55
void schFreeHbTrans(SSchHbTrans *pTrans) {
  rpcReleaseHandle(pTrans->trans.pHandle, TAOS_CONN_CLIENT);

H
Hongze Cheng 已提交
56
  schFreeRpcCtx(&pTrans->rpcCtx);
D
dapan1121 已提交
57 58
}

H
Hongze Cheng 已提交
59
void schCleanClusterHb(void *pTrans) {
D
dapan1121 已提交
60 61 62 63 64
  SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);

  SSchHbTrans *hb = taosHashIterate(schMgmt.hbConnections, NULL);
  while (hb) {
    if (hb->trans.pTrans == pTrans) {
H
Hongze Cheng 已提交
65
      SQueryNodeEpId *pEpId = taosHashGetKey(hb, NULL);
D
dapan1121 已提交
66
      schFreeHbTrans(hb);
D
dapan1121 已提交
67 68
      taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
    }
H
Hongze Cheng 已提交
69

D
dapan1121 已提交
70 71
    hb = taosHashIterate(schMgmt.hbConnections, hb);
  }
H
Hongze Cheng 已提交
72

D
dapan1121 已提交
73 74 75 76
  SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
}

int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId) {
H
Hongze Cheng 已提交
77
  int32_t code = 0;
D
dapan1121 已提交
78 79 80 81 82 83 84 85 86 87 88

  SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
    SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
    SCH_TASK_ELOG("nodeId %d fqdn %s port %d not in hb connections", epId->nodeId, epId->ep.fqdn, epId->ep.port);
    return TSDB_CODE_SUCCESS;
  }

  int64_t taskNum = atomic_load_64(&hb->taskNum);
  if (taskNum <= 0) {
D
dapan1121 已提交
89
    schFreeHbTrans(hb);
D
dapan1121 已提交
90 91 92 93 94 95 96 97 98 99 100
    taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
  }
  SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);

  return TSDB_CODE_SUCCESS;
}

int32_t schAddHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
  int32_t     code = 0;
  SSchHbTrans hb = {0};

D
dapan1121 已提交
101
  hb.trans.pTrans = pJob->conn.pTrans;
D
dapan1121 已提交
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
  hb.taskNum = 1;

  SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));

  SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
  code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
  if (code) {
    SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
    schFreeRpcCtx(&hb.rpcCtx);

    if (HASH_NODE_EXIST(code)) {
      *exist = true;
      return TSDB_CODE_SUCCESS;
    }

    qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
    SCH_ERR_RET(code);
  }

  SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);

  return TSDB_CODE_SUCCESS;
}

int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *pEpId) {
  SSchHbTrans *hb = NULL;

  while (true) {
    SCH_LOCK(SCH_READ, &schMgmt.hbLock);
    hb = taosHashGet(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
    if (NULL == hb) {
      bool exist = false;
      SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
      SCH_ERR_RET(schAddHbConnection(pJob, pTask, pEpId, &exist));
      if (!exist) {
        SCH_RET(schBuildAndSendHbMsg(pEpId, NULL));
      }

      continue;
    }

    break;
  }

  atomic_add_fetch_64(&hb->taskNum, 1);

  SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);

  return TSDB_CODE_SUCCESS;
}

void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
  if (!pTask->registerdHb) {
    return;
  }
H
Hongze Cheng 已提交
157

D
dapan1121 已提交
158 159 160 161 162
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
  SQueryNodeEpId  epId = {0};

  epId.nodeId = addr->nodeId;

H
Hongze Cheng 已提交
163
  SEp *pEp = SCH_GET_CUR_EP(addr);
D
dapan1121 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
  strcpy(epId.ep.fqdn, pEp->fqdn);
  epId.ep.port = pEp->port;

  SCH_LOCK(SCH_READ, &schMgmt.hbLock);
  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
    SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
    SCH_TASK_WLOG("nodeId %d fqdn %s port %d not in hb connections", epId.nodeId, epId.ep.fqdn, epId.ep.port);
    return;
  }

  int64_t taskNum = atomic_sub_fetch_64(&hb->taskNum, 1);
  if (0 == taskNum) {
    SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
    schRemoveHbConnection(pJob, pTask, &epId);
  } else {
    SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
  }
H
Hongze Cheng 已提交
182

D
dapan1121 已提交
183 184 185 186 187 188 189 190 191
  pTask->registerdHb = false;
}

int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
  SQueryNodeEpId  epId = {0};

  epId.nodeId = addr->nodeId;

H
Hongze Cheng 已提交
192
  SEp *pEp = SCH_GET_CUR_EP(addr);
D
dapan1121 已提交
193 194 195 196
  strcpy(epId.ep.fqdn, pEp->fqdn);
  epId.ep.port = pEp->port;

  SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId));
H
Hongze Cheng 已提交
197

D
dapan1121 已提交
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
  pTask->registerdHb = true;

  return TSDB_CODE_SUCCESS;
}

int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
  int32_t      code = 0;
  SSchHbTrans *hb = NULL;

  SCH_LOCK(SCH_READ, &schMgmt.hbLock);
  hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
  if (NULL == hb) {
    SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
    qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
    SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

  SCH_LOCK(SCH_WRITE, &hb->lock);
  memcpy(&hb->trans, trans, sizeof(*trans));
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
  SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);

D
dapan1121 已提交
220
  qDebug("hb connection updated, sId:0x%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId,
D
dapan1121 已提交
221 222 223 224 225
         epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
226 227 228 229 230
void schCloseJobRef(void) {
  if (!atomic_load_8((int8_t *)&schMgmt.exit)) {
    return;
  }

D
dapan1121 已提交
231
  if (schMgmt.jobRef >= 0) {
D
dapan1121 已提交
232 233 234 235 236 237 238 239 240
    taosCloseRef(schMgmt.jobRef);
    schMgmt.jobRef = -1;
  }
}

uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }

uint64_t schGenUUID(void) {
  static uint64_t hashId = 0;
H
Hongze Cheng 已提交
241
  static int32_t  requestSerialId = 0;
D
dapan1121 已提交
242 243

  if (hashId == 0) {
244
    char    uid[64] = {0};
D
dapan1121 已提交
245 246 247 248 249 250 251 252
    int32_t code = taosGetSystemUUID(uid, tListLen(uid));
    if (code != TSDB_CODE_SUCCESS) {
      qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    } else {
      hashId = MurmurHash3_32(uid, strlen(uid));
    }
  }

H
Hongze Cheng 已提交
253 254 255
  int64_t  ts = taosGetTimestampMs();
  uint64_t pid = taosGetPId();
  int32_t  val = atomic_add_fetch_32(&requestSerialId, 1);
D
dapan1121 已提交
256 257 258 259 260 261 262 263 264 265 266

  uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
  return id;
}

void schFreeRpcCtxVal(const void *arg) {
  if (NULL == arg) {
    return;
  }

  SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg;
D
dapan1121 已提交
267
  destroySendMsgInfo(pMsgSendInfo);
D
dapan1121 已提交
268 269 270 271 272 273 274 275 276 277
}

void schFreeRpcCtx(SRpcCtx *pCtx) {
  if (NULL == pCtx) {
    return;
  }
  void *pIter = taosHashIterate(pCtx->args, NULL);
  while (pIter) {
    SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter;

D
dapan1121 已提交
278
    (*pCtx->freeFunc)(ctxVal->val);
D
dapan1121 已提交
279 280 281 282 283 284

    pIter = taosHashIterate(pCtx->args, pIter);
  }

  taosHashCleanup(pCtx->args);

D
dapan1121 已提交
285 286 287
  if (pCtx->freeFunc) {
    (*pCtx->freeFunc)(pCtx->brokenVal.val);
  }
D
dapan1121 已提交
288 289
}

D
dapan1121 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
  int32_t s = taosHashGetSize(pTaskList);
  if (s <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
  if (NULL == task || NULL == (*task)) {
    return TSDB_CODE_SUCCESS;
  }

  *pTask = *task;

  return TSDB_CODE_SUCCESS;
}