vnodeMgmt.c 3.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
#include "vnodeMain.h"
#include "vnodeMgmt.h"
#include "vnodeMgmtMsg.h"

typedef struct {
  SRpcMsg rpcMsg;
  char    pCont[];
} SVnMgmtMsg;

static struct {
  SWorkerPool pool;
  taos_queue  pQueue;
  int32_t (*msgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
} tsVmgmt = {0};

static int32_t vnodeProcessMgmtStart(void *unused, SVnMgmtMsg *pMgmt, int32_t qtype) {
  SRpcMsg *pMsg = &pMgmt->rpcMsg;
  int32_t  msgType = pMsg->msgType;

  if (tsVmgmt.msgFp[msgType]) {
    vTrace("msg:%p, ahandle:%p type:%s will be processed", pMgmt, pMsg->ahandle, taosMsg[msgType]);
    return (*tsVmgmt.msgFp[msgType])(pMsg);
  } else {
    vError("msg:%p, ahandle:%p type:%s not processed since no handle", pMgmt, pMsg->ahandle, taosMsg[msgType]);
    return TSDB_CODE_DND_MSG_NOT_PROCESSED;
  }
}

static void vnodeSendMgmtEnd(void *unused, SVnMgmtMsg *pMgmt, int32_t qtype, int32_t code) {
  SRpcMsg *pMsg = &pMgmt->rpcMsg;
  SRpcMsg  rsp = {0};

  rsp.code = code;
  vTrace("msg:%p, is processed, code:0x%x", pMgmt, rsp.code);
  if (rsp.code != TSDB_CODE_DND_ACTION_IN_PROGRESS) {
    rsp.handle = pMsg->handle;
    rsp.pCont = NULL;
    rpcSendResponse(&rsp);
  }

  taosFreeQitem(pMsg);
}

static void vnodeInitMgmtReqFp() {
  tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessCreateVnodeMsg;
  tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE]  = vnodeProcessAlterVnodeMsg;
  tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE]   = vnodeProcessSyncVnodeMsg;
  tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE]= vnodeProcessCompactVnodeMsg;
  tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE]   = vnodeProcessDropVnodeMsg;
  tsVmgmt.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessAlterStreamReq;
}

static int32_t vnodeWriteToMgmtQueue(SRpcMsg *pMsg) {
  int32_t     size = sizeof(SVnMgmtMsg) + pMsg->contLen;
  SVnMgmtMsg *pMgmt = taosAllocateQitem(size);
  if (pMgmt == NULL) return TSDB_CODE_DND_OUT_OF_MEMORY;

  pMgmt->rpcMsg = *pMsg;
  pMgmt->rpcMsg.pCont = pMgmt->pCont;
  memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen);
  taosWriteQitem(tsVmgmt.pQueue, TAOS_QTYPE_RPC, pMgmt);

  return TSDB_CODE_SUCCESS;
}

void vnodeProcessMgmtMsg(SRpcMsg *pMsg) {
  int32_t code = vnodeWriteToMgmtQueue(pMsg);
  if (code != TSDB_CODE_SUCCESS) {
    SRpcMsg rsp = {.handle = pMsg->handle, .code = code};
    rpcSendResponse(&rsp);
  }

  rpcFreeCont(pMsg->pCont);
}

int32_t vnodeInitMgmt() {
  vnodeInitMgmtReqFp();

  SWorkerPool *pPool = &tsVmgmt.pool;
  pPool->name = "vmgmt";
  pPool->startFp = (ProcessStartFp)vnodeProcessMgmtStart;
  pPool->endFp = (ProcessEndFp)vnodeSendMgmtEnd;
  pPool->min = 1;
  pPool->max = 1;
  if (tWorkerInit(pPool) != 0) {
    return TSDB_CODE_VND_OUT_OF_MEMORY;
  }

  tsVmgmt.pQueue = tWorkerAllocQueue(pPool, NULL);

  vInfo("vmgmt is initialized, max worker %d", pPool->max);
  return TSDB_CODE_SUCCESS;
}

void vnodeCleanupMgmt() {
  tWorkerFreeQueue(&tsVmgmt.pool, tsVmgmt.pQueue);
  tWorkerCleanup(&tsVmgmt.pool);
  tsVmgmt.pQueue = NULL;
  vInfo("vmgmt is closed");
}