mmWorker.c 8.0 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "mmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
shm  
Shengliang Guan 已提交
19
#include "dmInt.h"
S
shm  
Shengliang Guan 已提交
20 21 22
#include "dndTransport.h"
#include "dndWorker.h"

S
shm  
Shengliang Guan 已提交
23
#if 0
S
shm  
Shengliang Guan 已提交
24 25 26 27
static int32_t mmProcessWriteMsg(SDnode *pDnode, SMndMsg *pMsg);
static int32_t mmProcessSyncMsg(SDnode *pDnode, SMndMsg *pMsg);
static int32_t mmProcessReadMsg(SDnode *pDnode, SMndMsg *pMsg);
static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMsg *pMsg);
S
shm  
Shengliang Guan 已提交
28
static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc);
S
shm  
Shengliang Guan 已提交
29
static void    mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg);
S
shm  
Shengliang Guan 已提交
30 31

int32_t mmStartWorker(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
32
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
shm  
Shengliang Guan 已提交
33
  if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmConsumeMsgQueue) != 0) {
S
shm  
Shengliang Guan 已提交
34 35 36 37
    dError("failed to start mnode read worker since %s", terrstr());
    return -1;
  }

S
shm  
Shengliang Guan 已提交
38
  if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmConsumeMsgQueue) != 0) {
S
shm  
Shengliang Guan 已提交
39 40 41 42
    dError("failed to start mnode write worker since %s", terrstr());
    return -1;
  }

S
shm  
Shengliang Guan 已提交
43
  if (dndInitWorker(pDnode, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmConsumeMsgQueue) != 0) {
S
shm  
Shengliang Guan 已提交
44 45 46 47 48 49 50 51
    dError("failed to start mnode sync worker since %s", terrstr());
    return -1;
  }

  return 0;
}

void mmStopWorker(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
52
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
shm  
Shengliang Guan 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65

  taosWLockLatch(&pMgmt->latch);
  pMgmt->deployed = 0;
  taosWUnLockLatch(&pMgmt->latch);

  while (pMgmt->refCount > 1) {
    taosMsleep(10);
  }

  dndCleanupWorker(&pMgmt->readWorker);
  dndCleanupWorker(&pMgmt->writeWorker);
  dndCleanupWorker(&pMgmt->syncWorker);
}
S
shm  
Shengliang Guan 已提交
66

S
shm  
Shengliang Guan 已提交
67
void mmInitMsgFp(SMnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
68
  
S
shm  
Shengliang Guan 已提交
69 70
}

S
shm  
Shengliang Guan 已提交
71 72
static void mmSendRpcRsp(SDnode *pDnode, SRpcMsg *pRpc) {
  if (pRpc->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRpc->code == TSDB_CODE_APP_NOT_READY) {
S
shm  
Shengliang Guan 已提交
73
    dmSendRedirectRsp(pDnode, pRpc);
S
shm  
Shengliang Guan 已提交
74 75 76 77 78
  } else {
    rpcSendResponse(pRpc);
  }
}

S
shm  
Shengliang Guan 已提交
79
static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) {
S
shm  
Shengliang Guan 已提交
80 81 82 83 84 85 86
  SRpcConnInfo connInfo = {0};
  if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    dError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle);
    return -1;
  }

S
shm  
Shengliang Guan 已提交
87
  memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
S
shm  
Shengliang Guan 已提交
88 89 90 91 92 93 94
  pMsg->rpcMsg = *pRpc;
  pMsg->createdTime = taosGetTimestampSec();

  return 0;
}

void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
S
shm  
Shengliang Guan 已提交
95
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
shm  
Shengliang Guan 已提交
96 97
  int32_t   code = -1;
  SMndMsg  *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
98

S
shm  
Shengliang Guan 已提交
99
  MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)];
S
shm  
Shengliang Guan 已提交
100 101 102 103 104
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    goto _OVER;
  }

S
shm  
Shengliang Guan 已提交
105
  pMsg = taosAllocateQitem(sizeof(SMndMsg));
S
shm  
Shengliang Guan 已提交
106
  if (pMsg == NULL) {
S
shm  
Shengliang Guan 已提交
107 108 109
    goto _OVER;
  }

S
shm  
Shengliang Guan 已提交
110
  if (mmBuildMsg(pMsg, pRpc) != 0) {
S
shm  
Shengliang Guan 已提交
111 112 113
    goto _OVER;
  }

S
shm  
Shengliang Guan 已提交
114 115
  dTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpc->ahandle, pRpc->handle, pMsg->user);

S
shm  
Shengliang Guan 已提交
116
  if (pMgmt->singleProc) {
S
shm  
Shengliang Guan 已提交
117
    code = (*msgFp)(pDnode, pMsg);
S
shm  
Shengliang Guan 已提交
118
  } else {
S
shm  
Shengliang Guan 已提交
119
    code = taosProcPutToChildQueue(pMgmt->pProcess, pMsg, sizeof(SMndMsg), pRpc->pCont, pRpc->contLen);
S
shm  
Shengliang Guan 已提交
120 121 122 123
  }

_OVER:

S
shm  
Shengliang Guan 已提交
124 125
  if (code == 0) {
    if (!pMgmt->singleProc) {
S
shm  
Shengliang Guan 已提交
126
      dTrace("msg:%p, is freed", pMsg);
S
shm  
Shengliang Guan 已提交
127 128 129 130
      taosFreeQitem(pMsg);
      rpcFreeCont(pRpc->pCont);
    }
  } else {
S
shm  
Shengliang Guan 已提交
131
    bool isReq = (pRpc->msgType & 1U);
S
shm  
Shengliang Guan 已提交
132
    if (isReq) {
S
shm  
Shengliang Guan 已提交
133 134
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      mmSendRpcRsp(pDnode, &rsp);
S
shm  
Shengliang Guan 已提交
135
    }
S
shm  
Shengliang Guan 已提交
136
    dTrace("msg:%p, is freed", pMsg);
S
shm  
Shengliang Guan 已提交
137
    taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
138
    rpcFreeCont(pRpc->pCont);
S
shm  
Shengliang Guan 已提交
139 140 141
  }
}

S
shm  
Shengliang Guan 已提交
142 143
int32_t mmProcessWriteMsg(SDnode *pDnode, SMndMsg *pMsg) {
  return mmPutMndMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg);
S
shm  
Shengliang Guan 已提交
144 145
}

S
shm  
Shengliang Guan 已提交
146 147
int32_t mmProcessSyncMsg(SDnode *pDnode, SMndMsg *pMsg) {
  return mmPutMndMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg);
S
shm  
Shengliang Guan 已提交
148 149
}

S
shm  
Shengliang Guan 已提交
150 151
int32_t mmProcessReadMsg(SDnode *pDnode, SMndMsg *pMsg) {
  return mmPutMndMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg);
S
shm  
Shengliang Guan 已提交
152 153
}

S
shm  
Shengliang Guan 已提交
154 155
int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpc) {
  return mmPutRpcMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pRpc);
S
shm  
Shengliang Guan 已提交
156 157
}

S
shm  
Shengliang Guan 已提交
158 159
int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpc) {
  return mmPutRpcMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pRpc);
S
shm  
Shengliang Guan 已提交
160 161
}

S
shm  
Shengliang Guan 已提交
162
static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
163 164 165
  SMnode *pMnode = mmAcquire(pDnode);
  if (pMnode == NULL) return -1;

S
shm  
Shengliang Guan 已提交
166
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
S
shm  
Shengliang Guan 已提交
167
  int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0);
S
shm  
Shengliang Guan 已提交
168 169 170
  mmRelease(pDnode, pMnode);
  return code;
}
S
shm  
Shengliang Guan 已提交
171

S
shm  
Shengliang Guan 已提交
172
static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc) {
S
shm  
Shengliang Guan 已提交
173
  SMndMsg *pMsg = taosAllocateQitem(sizeof(SMndMsg));
S
shm  
Shengliang Guan 已提交
174
  if (pMsg == NULL) {
S
shm  
Shengliang Guan 已提交
175 176 177
    return -1;
  }

S
shm  
Shengliang Guan 已提交
178
  dTrace("msg:%p, is created", pMsg);
S
shm  
Shengliang Guan 已提交
179
  pMsg->rpcMsg = *pRpc;
S
shm  
Shengliang Guan 已提交
180
  pMsg->createdTime = taosGetTimestampSec();
S
shm  
Shengliang Guan 已提交
181

S
shm  
Shengliang Guan 已提交
182
  int32_t code = mmPutMndMsgToWorker(pDnode, pWorker, pMsg);
S
shm  
Shengliang Guan 已提交
183
  if (code != 0) {
S
shm  
Shengliang Guan 已提交
184
    dTrace("msg:%p, is freed", pMsg);
S
shm  
Shengliang Guan 已提交
185
    taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
186
    rpcFreeCont(pRpc->pCont);
S
shm  
Shengliang Guan 已提交
187 188 189 190 191
  }

  return code;
}

S
shm  
Shengliang Guan 已提交
192
void mmPutRpcRspToWorker(SDnode *pDnode, SRpcMsg *pRpc) {
S
shm  
Shengliang Guan 已提交
193
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
shm  
Shengliang Guan 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206 207
  int32_t   code = -1;

  if (pMgmt->singleProc) {
    mmSendRpcRsp(pDnode, pRpc);
  } else {
    do {
      code = taosProcPutToParentQueue(pMgmt->pProcess, pRpc, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen);
      if (code != 0) {
        taosMsleep(10);
      }
    } while (code != 0);
  }
}

S
shm  
Shengliang Guan 已提交
208
void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
S
shm  
Shengliang Guan 已提交
209
  dTrace("msg:%p, get from child queue", pMsg);
S
shm  
Shengliang Guan 已提交
210
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
shm  
Shengliang Guan 已提交
211

S
shm  
Shengliang Guan 已提交
212
  SRpcMsg *pRpc = &pMsg->rpcMsg;
S
shm  
Shengliang Guan 已提交
213
  pRpc->pCont = pCont;
S
shm  
Shengliang Guan 已提交
214 215

  MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)];
S
shm  
Shengliang Guan 已提交
216
  int32_t  code = (*msgFp)(pDnode, pMsg);
S
shm  
Shengliang Guan 已提交
217

S
shm  
Shengliang Guan 已提交
218 219 220
  if (code != 0) {
    bool isReq = (pRpc->msgType & 1U);
    if (isReq) {
S
shm  
Shengliang Guan 已提交
221 222
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      mmPutRpcRspToWorker(pDnode, &rsp);
S
shm  
Shengliang Guan 已提交
223
    }
S
shm  
Shengliang Guan 已提交
224 225

    dTrace("msg:%p, is freed", pMsg);
S
shm  
Shengliang Guan 已提交
226 227
    taosFreeQitem(pMsg);
    rpcFreeCont(pCont);
S
shm  
Shengliang Guan 已提交
228 229 230
  }
}

S
shm  
Shengliang Guan 已提交
231
void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
S
shm  
Shengliang Guan 已提交
232
  dTrace("msg:%p, get from parent queue", pMsg);
S
shm  
Shengliang Guan 已提交
233 234 235 236
  pMsg->pCont = pCont;
  mmSendRpcRsp(pDnode, pMsg);
  free(pMsg);
}
S
shm  
Shengliang Guan 已提交
237

S
shm  
Shengliang Guan 已提交
238
static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
239
  dTrace("msg:%p, get from msg queue", pMsg);
S
shm  
Shengliang Guan 已提交
240 241 242 243
  SMnode  *pMnode = mmAcquire(pDnode);
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  bool     isReq = (pRpc->msgType & 1U);
  int32_t  code = -1;
S
shm  
Shengliang Guan 已提交
244 245

  if (pMnode != NULL) {
S
shm  
Shengliang Guan 已提交
246
    pMsg->pMnode = pMnode;
S
shm  
Shengliang Guan 已提交
247
    code = mndProcessMsg(pMsg);
S
shm  
Shengliang Guan 已提交
248
    mmRelease(pDnode, pMnode);
S
shm  
Shengliang Guan 已提交
249 250 251 252 253 254
  }

  if (isReq) {
    if (pMsg->rpcMsg.handle == NULL) return;
    if (code == 0) {
      SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont};
S
shm  
Shengliang Guan 已提交
255
      mmPutRpcRspToWorker(pDnode, &rsp);
S
shm  
Shengliang Guan 已提交
256
    } else {
S
shm  
Shengliang Guan 已提交
257
      if (terrno != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
shm  
Shengliang Guan 已提交
258
        SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont, .code = terrno};
S
shm  
Shengliang Guan 已提交
259
        mmPutRpcRspToWorker(pDnode, &rsp);
S
shm  
Shengliang Guan 已提交
260 261
      }
    }
S
shm  
Shengliang Guan 已提交
262 263
  }

S
shm  
Shengliang Guan 已提交
264
  dTrace("msg:%p, is freed", pMsg);
S
shm  
Shengliang Guan 已提交
265
  rpcFreeCont(pRpc->pCont);
S
shm  
Shengliang Guan 已提交
266
  taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
267
}
S
shm  
Shengliang Guan 已提交
268

S
shm  
Shengliang Guan 已提交
269 270
#endif

S
shm  
Shengliang Guan 已提交
271 272
int32_t mmProcessWriteMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;}
int32_t mmProcessSyncMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;}
S
shm  
Shengliang Guan 已提交
273 274 275 276
int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  terrno = TSDB_CODE_MSG_NOT_PROCESSED;
  return -1;
}