mmWorker.c 4.2 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "mmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
shm  
Shengliang Guan 已提交
19
#include "dmInt.h"
S
shm  
Shengliang Guan 已提交
20 21 22
#include "dndTransport.h"
#include "dndWorker.h"

S
shm  
Shengliang Guan 已提交
23
static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
24 25 26 27 28 29 30 31
  dTrace("msg:%p, will be processed", pMsg);
  SMnode  *pMnode = mmAcquire(pMgmt);
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  bool     isReq = (pRpc->msgType & 1U);
  int32_t  code = -1;

  if (pMnode != NULL) {
    pMsg->pNode = pMnode;
S
shm  
Shengliang Guan 已提交
32
    code = mndProcessMsg(pMsg);
S
shm  
Shengliang Guan 已提交
33 34 35 36 37 38 39
    mmRelease(pMgmt, pMnode);
  }

  if (isReq) {
    if (pMsg->rpcMsg.handle == NULL) return;
    if (code == 0) {
      SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp};
S
shm  
Shengliang Guan 已提交
40
      dndSendRsp(pMgmt->pWrapper, &rsp);
S
shm  
Shengliang Guan 已提交
41 42 43
    } else {
      if (terrno != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
        SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp, .code = terrno};
S
shm  
Shengliang Guan 已提交
44
        dndSendRsp(pMgmt->pWrapper, &rsp);
S
shm  
Shengliang Guan 已提交
45 46 47 48 49 50 51 52 53 54
      }
    }
  }

  dTrace("msg:%p, is freed", pMsg);
  rpcFreeCont(pRpc->pCont);
  taosFreeQitem(pMsg);
}

int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
55
  if (dndInitWorker(pMgmt, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmProcessQueue) != 0) {
S
shm  
Shengliang Guan 已提交
56 57 58 59
    dError("failed to start mnode read worker since %s", terrstr());
    return -1;
  }

S
shm  
Shengliang Guan 已提交
60
  if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmProcessQueue) != 0) {
S
shm  
Shengliang Guan 已提交
61 62 63 64
    dError("failed to start mnode write worker since %s", terrstr());
    return -1;
  }

S
shm  
Shengliang Guan 已提交
65
  if (dndInitWorker(pMgmt, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmProcessQueue) != 0) {
S
shm  
Shengliang Guan 已提交
66 67 68 69 70 71 72
    dError("failed to start mnode sync worker since %s", terrstr());
    return -1;
  }

  return 0;
}

S
shm  
Shengliang Guan 已提交
73
void mmStopWorker(SMnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
74 75 76 77 78 79 80 81 82 83 84 85
  taosWLockLatch(&pMgmt->latch);
  pMgmt->deployed = 0;
  taosWUnLockLatch(&pMgmt->latch);

  while (pMgmt->refCount > 1) {
    taosMsleep(10);
  }

  dndCleanupWorker(&pMgmt->readWorker);
  dndCleanupWorker(&pMgmt->writeWorker);
  dndCleanupWorker(&pMgmt->syncWorker);
}
S
shm  
Shengliang Guan 已提交
86

S
shm  
Shengliang Guan 已提交
87 88 89
static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeMsg *pMsg) {
  SMnode *pMnode = mmAcquire(pMgmt);
  if (pMnode == NULL) return -1;
S
shm  
Shengliang Guan 已提交
90

S
shm  
Shengliang Guan 已提交
91 92 93 94
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
  int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0);
  mmRelease(pMgmt, pMnode);
  return code;
S
shm  
Shengliang Guan 已提交
95 96
}

S
shm  
Shengliang Guan 已提交
97
int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
98
  return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
S
shm  
Shengliang Guan 已提交
99 100
}

S
shm  
Shengliang Guan 已提交
101
int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
102
  return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
S
shm  
Shengliang Guan 已提交
103 104
}

S
shm  
Shengliang Guan 已提交
105
int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
106
  return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
S
shm  
Shengliang Guan 已提交
107
}
S
shm  
Shengliang Guan 已提交
108

S
shm  
Shengliang Guan 已提交
109 110
static int32_t mmPutRpcMsgToWorker(SMgmtWrapper *pWrapper, SDnodeWorker *pWorker, SRpcMsg *pRpc) {
  SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
S
shm  
Shengliang Guan 已提交
111
  if (pMsg == NULL) {
S
shm  
Shengliang Guan 已提交
112 113 114
    return -1;
  }

S
shm  
Shengliang Guan 已提交
115
  dTrace("msg:%p, is created", pMsg);
S
shm  
Shengliang Guan 已提交
116
  pMsg->rpcMsg = *pRpc;
S
shm  
Shengliang Guan 已提交
117

S
shm  
Shengliang Guan 已提交
118
  int32_t code = mmPutMsgToWorker(pWrapper->pMgmt, pWorker, pMsg);
S
shm  
Shengliang Guan 已提交
119
  if (code != 0) {
S
shm  
Shengliang Guan 已提交
120
    dTrace("msg:%p, is freed", pMsg);
S
shm  
Shengliang Guan 已提交
121
    taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
122
    rpcFreeCont(pRpc->pCont);
S
shm  
Shengliang Guan 已提交
123 124 125 126 127
  }

  return code;
}

S
shm  
Shengliang Guan 已提交
128 129 130 131 132
int32_t mmPutMsgToWriteQueue(void *wrapper, SRpcMsg *pRpc) {
  // SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE);
  // SMnodeMgmt   *pMgmt = pWrapper->pMgmt;
  // return mmPutRpcMsgToWorker(pWrapper, &pMgmt->writeWorker, pRpc);
  return 0;
S
shm  
Shengliang Guan 已提交
133
}
S
shm  
Shengliang Guan 已提交
134

S
shm  
Shengliang Guan 已提交
135 136 137 138 139
int32_t mmPutMsgToReadQueue(void *wrapper, SRpcMsg *pRpc) {
  // SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE);
  // SMnodeMgmt   *pMgmt = pWrapper->pMgmt;
  // return mmPutRpcMsgToWorker(pWrapper, &pMgmt->readWorker, pRpc);
  return 0;
S
shm  
Shengliang Guan 已提交
140
}