smWorker.c 6.2 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "smInt.h"
S
Shengliang Guan 已提交
18

19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
static void smProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
  SSnodeMgmt *pMgmt = pInfo->ahandle;

  dTrace("msg:%p, get from snode monitor queue", pMsg);
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  int32_t  code = -1;

  if (pMsg->rpcMsg.msgType == TDMT_MON_SM_INFO) {
    code = smProcessGetMonSmInfoReq(pMgmt->pWrapper, pMsg);
  }

  if (pRpc->msgType & 1U) {
    if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
      if (code != 0) {
        code = terrno;
        dError("msg:%p, failed to process since %s", pMsg, terrstr());
      }
      SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp};
      tmsgSendRsp(&rsp);
    }
  }

  dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
  rpcFreeCont(pRpc->pCont);
  taosFreeQitem(pMsg);
}

S
Shengliang Guan 已提交
46 47 48
static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SSnodeMgmt *pMgmt = pInfo->ahandle;

S
shm  
Shengliang Guan 已提交
49 50 51
  for (int32_t i = 0; i < numOfMsgs; i++) {
    SNodeMsg *pMsg = NULL;
    taosGetQitem(qall, (void **)&pMsg);
L
Liu Jicong 已提交
52

S
shm  
Shengliang Guan 已提交
53 54
    dTrace("msg:%p, will be processed in snode unique queue", pMsg);
    sndProcessUMsg(pMgmt->pSnode, &pMsg->rpcMsg);
L
Liu Jicong 已提交
55

S
shm  
Shengliang Guan 已提交
56 57 58 59 60 61
    dTrace("msg:%p, is freed", pMsg);
    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
  }
}

S
Shengliang Guan 已提交
62 63 64
static void smProcessSharedQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
  SSnodeMgmt *pMgmt = pInfo->ahandle;

S
shm  
Shengliang Guan 已提交
65
  dTrace("msg:%p, will be processed in snode shared queue", pMsg);
S
shm  
Shengliang Guan 已提交
66
  sndProcessSMsg(pMgmt->pSnode, &pMsg->rpcMsg);
S
shm  
Shengliang Guan 已提交
67 68

  dTrace("msg:%p, is freed", pMsg);
S
shm  
Shengliang Guan 已提交
69
  rpcFreeCont(pMsg->rpcMsg.pCont);
S
shm  
Shengliang Guan 已提交
70 71
  taosFreeQitem(pMsg);
}
S
Shengliang Guan 已提交
72

S
shm  
Shengliang Guan 已提交
73
int32_t smStartWorker(SSnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
74
  pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(SMultiWorker *));
S
shm  
Shengliang Guan 已提交
75 76 77 78 79
  if (pMgmt->uniqueWorkers == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
80
  for (int32_t i = 0; i < tsNumOfSnodeUniqueThreads; i++) {
wafwerar's avatar
wafwerar 已提交
81
    SMultiWorker *pUniqueWorker = taosMemoryMalloc(sizeof(SMultiWorker));
L
Liu Jicong 已提交
82
    if (pUniqueWorker == NULL) {
S
shm  
Shengliang Guan 已提交
83
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
84 85
      return -1;
    }
S
Shengliang Guan 已提交
86

S
shm  
Shengliang Guan 已提交
87
    SMultiWorkerCfg cfg = {.max = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt};
S
Shengliang Guan 已提交
88

S
Shengliang Guan 已提交
89
    if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) {
S
Shengliang Guan 已提交
90
      dError("failed to start snode-unique worker since %s", terrstr());
L
Liu Jicong 已提交
91 92
      return -1;
    }
S
shm  
Shengliang Guan 已提交
93 94 95 96
    if (taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker) == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
L
Liu Jicong 已提交
97
  }
S
shm  
Shengliang Guan 已提交
98

S
Shengliang Guan 已提交
99 100
  SSingleWorkerCfg cfg = {.min = tsNumOfSnodeSharedThreads,
                          .max = tsNumOfSnodeSharedThreads,
S
Shengliang Guan 已提交
101 102 103
                          .name = "snode-shared",
                          .fp = (FItem)smProcessSharedQueue,
                          .param = pMgmt};
S
Shengliang Guan 已提交
104

S
Shengliang Guan 已提交
105
  if (tSingleWorkerInit(&pMgmt->sharedWorker, &cfg)) {
S
Shengliang Guan 已提交
106
    dError("failed to start snode shared-worker since %s", terrstr());
S
Shengliang Guan 已提交
107 108 109
    return -1;
  }

110 111 112 113 114 115 116 117 118
  if (tsMultiProcess) {
    SSingleWorkerCfg sCfg = {
        .min = 1, .max = 1, .name = "snode-monitor", .fp = (FItem)smProcessMonitorQueue, .param = pMgmt};
    if (tSingleWorkerInit(&pMgmt->monitorWorker, &sCfg) != 0) {
      dError("failed to start snode-monitor worker since %s", terrstr());
      return -1;
    }
  }

S
Shengliang Guan 已提交
119
  dDebug("snode workers are initialized");
S
Shengliang Guan 已提交
120 121 122
  return 0;
}

S
shm  
Shengliang Guan 已提交
123
void smStopWorker(SSnodeMgmt *pMgmt) {
124
  tSingleWorkerCleanup(&pMgmt->monitorWorker);
L
Liu Jicong 已提交
125
  for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) {
S
Shengliang Guan 已提交
126 127
    SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i);
    tMultiWorkerCleanup(pWorker);
L
Liu Jicong 已提交
128 129
  }
  taosArrayDestroy(pMgmt->uniqueWorkers);
S
Shengliang Guan 已提交
130
  tSingleWorkerCleanup(&pMgmt->sharedWorker);
S
Shengliang Guan 已提交
131
  dDebug("snode workers are closed");
S
Shengliang Guan 已提交
132 133
}

S
shm  
Shengliang Guan 已提交
134
static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
135
  SMsgHead *pHead = pMsg->pCont;
L
Liu Jicong 已提交
136
  pHead->vgId = htonl(pHead->vgId);
S
Shengliang Guan 已提交
137
  return pHead->vgId % tsNumOfSnodeUniqueThreads;
L
Liu Jicong 已提交
138 139
}

S
shm  
Shengliang Guan 已提交
140
static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
L
Liu Jicong 已提交
141 142 143 144
  /*SMsgHead *pHead = pMsg->pCont;*/
  /*pHead->workerType = htonl(pHead->workerType);*/
  /*return pHead->workerType;*/
  return 0;
S
shm  
Shengliang Guan 已提交
145 146
}

S
Shengliang Guan 已提交
147 148
int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SSnodeMgmt   *pMgmt = pWrapper->pMgmt;
S
Shengliang Guan 已提交
149
  SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
S
shm  
Shengliang Guan 已提交
150 151 152
  if (pWorker == NULL) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
L
Liu Jicong 已提交
153 154
  }

S
shm  
Shengliang Guan 已提交
155
  dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
156
  taosWriteQitem(pWorker->queue, pMsg);
157 158 159 160 161 162 163 164 165
  return 0;
}

int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SSnodeMgmt    *pMgmt = pWrapper->pMgmt;
  SSingleWorker *pWorker = &pMgmt->monitorWorker;

  dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
  taosWriteQitem(pWorker->queue, pMsg);
S
Shengliang Guan 已提交
166
  return 0;
L
Liu Jicong 已提交
167 168
}

S
Shengliang Guan 已提交
169 170
int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SSnodeMgmt   *pMgmt = pWrapper->pMgmt;
S
Shengliang Guan 已提交
171 172
  int32_t       index = smGetSWIdFromMsg(&pMsg->rpcMsg);
  SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index);
S
shm  
Shengliang Guan 已提交
173 174 175
  if (pWorker == NULL) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
S
Shengliang Guan 已提交
176 177
  }

S
shm  
Shengliang Guan 已提交
178
  dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
179 180
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
L
Liu Jicong 已提交
181 182
}

S
Shengliang Guan 已提交
183 184
int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SSnodeMgmt    *pMgmt = pWrapper->pMgmt;
S
Shengliang Guan 已提交
185
  SSingleWorker *pWorker = &pMgmt->sharedWorker;
L
Liu Jicong 已提交
186

S
shm  
Shengliang Guan 已提交
187
  dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
188 189
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
S
Shengliang Guan 已提交
190
}
S
shm  
Shengliang Guan 已提交
191

S
Shengliang Guan 已提交
192 193
int32_t smProcessExecMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  int32_t     workerType = smGetSWTypeFromMsg(&pMsg->rpcMsg);
S
shm  
Shengliang Guan 已提交
194
  if (workerType == SND_WORKER_TYPE__SHARED) {
S
Shengliang Guan 已提交
195
    return smProcessSharedMsg(pWrapper, pMsg);
S
shm  
Shengliang Guan 已提交
196
  } else {
S
Shengliang Guan 已提交
197
    return smProcessUniqueMsg(pWrapper, pMsg);
S
shm  
Shengliang Guan 已提交
198 199
  }
}