smWorker.c 5.7 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "smInt.h"
S
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19
static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) {
S
Shengliang Guan 已提交
20 21
  SRpcMsg rsp = {
      .code = code,
S
Shengliang Guan 已提交
22 23
      .pCont = pMsg->info.rsp,
      .contLen = pMsg->info.rspLen,
24
      .info = pMsg->info,
S
Shengliang Guan 已提交
25
  };
26 27 28
  tmsgSendRsp(&rsp);
}

L
Liu Jicong 已提交
29
static void smProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
S
Shengliang Guan 已提交
30 31
  SSnodeMgmt *pMgmt = pInfo->ahandle;

S
shm  
Shengliang Guan 已提交
32
  for (int32_t i = 0; i < numOfMsgs; i++) {
S
Shengliang Guan 已提交
33
    SRpcMsg *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
34
    taosGetQitem(qall, (void **)&pMsg);
L
Liu Jicong 已提交
35 36 37 38 39 40 41 42 43 44 45
    const STraceId *trace = &pMsg->info.traceId;

    dTrace("msg:%p, get from snode-write queue", pMsg);
    int32_t code = sndProcessWriteMsg(pMgmt->pSnode, pMsg, NULL);
    if (code < 0) {
      dGError("snd, msg:%p failed to process write since %s", pMsg, terrstr(code));
      if (pMsg->info.handle != NULL) {
        tmsgSendRsp(pMsg);
      }
    } else {
      smSendRsp(pMsg, 0);
L
Liu Jicong 已提交
46
    }
L
Liu Jicong 已提交
47

S
shm  
Shengliang Guan 已提交
48
    dTrace("msg:%p, is freed", pMsg);
S
Shengliang Guan 已提交
49
    rpcFreeCont(pMsg->pCont);
S
shm  
Shengliang Guan 已提交
50 51 52 53
    taosFreeQitem(pMsg);
  }
}

L
Liu Jicong 已提交
54 55 56
static void smProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
  SSnodeMgmt     *pMgmt = pInfo->ahandle;
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
57

L
Liu Jicong 已提交
58 59 60
  dTrace("msg:%p, get from snode-stream queue", pMsg);
  int32_t code = sndProcessStreamMsg(pMgmt->pSnode, pMsg);
  if (code < 0) {
L
Liu Jicong 已提交
61
    dGError("snd, msg:%p failed to process stream msg %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr(code));
L
Liu Jicong 已提交
62
    smSendRsp(pMsg, terrno);
L
Liu Jicong 已提交
63
  }
S
shm  
Shengliang Guan 已提交
64 65

  dTrace("msg:%p, is freed", pMsg);
S
Shengliang Guan 已提交
66
  rpcFreeCont(pMsg->pCont);
S
shm  
Shengliang Guan 已提交
67 68
  taosFreeQitem(pMsg);
}
S
Shengliang Guan 已提交
69

S
shm  
Shengliang Guan 已提交
70
int32_t smStartWorker(SSnodeMgmt *pMgmt) {
L
Liu Jicong 已提交
71 72
  pMgmt->writeWroker = taosArrayInit(0, sizeof(SMultiWorker *));
  if (pMgmt->writeWroker == NULL) {
S
shm  
Shengliang Guan 已提交
73 74 75 76
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
77 78 79
  for (int32_t i = 0; i < tsNumOfSnodeWriteThreads; i++) {
    SMultiWorker *pWriteWorker = taosMemoryMalloc(sizeof(SMultiWorker));
    if (pWriteWorker == NULL) {
S
shm  
Shengliang Guan 已提交
80
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
81 82
      return -1;
    }
S
Shengliang Guan 已提交
83

S
Shengliang Guan 已提交
84 85
    SMultiWorkerCfg cfg = {
        .max = 1,
L
Liu Jicong 已提交
86 87
        .name = "snode-write",
        .fp = smProcessWriteQueue,
S
Shengliang Guan 已提交
88 89
        .param = pMgmt,
    };
L
Liu Jicong 已提交
90
    if (tMultiWorkerInit(pWriteWorker, &cfg) != 0) {
S
Shengliang Guan 已提交
91
      dError("failed to start snode-unique worker since %s", terrstr());
L
Liu Jicong 已提交
92 93
      return -1;
    }
L
Liu Jicong 已提交
94
    if (taosArrayPush(pMgmt->writeWroker, &pWriteWorker) == NULL) {
S
shm  
Shengliang Guan 已提交
95 96 97
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
L
Liu Jicong 已提交
98
  }
S
shm  
Shengliang Guan 已提交
99

S
Shengliang Guan 已提交
100
  SSingleWorkerCfg cfg = {
L
Liu Jicong 已提交
101 102 103 104
      .min = tsNumOfSnodeStreamThreads,
      .max = tsNumOfSnodeStreamThreads,
      .name = "snode-stream",
      .fp = (FItem)smProcessStreamQueue,
S
Shengliang Guan 已提交
105 106
      .param = pMgmt,
  };
S
Shengliang Guan 已提交
107

L
Liu Jicong 已提交
108
  if (tSingleWorkerInit(&pMgmt->streamWorker, &cfg)) {
S
Shengliang Guan 已提交
109
    dError("failed to start snode shared-worker since %s", terrstr());
S
Shengliang Guan 已提交
110 111 112
    return -1;
  }

S
Shengliang Guan 已提交
113
  dDebug("snode workers are initialized");
S
Shengliang Guan 已提交
114 115 116
  return 0;
}

S
shm  
Shengliang Guan 已提交
117
void smStopWorker(SSnodeMgmt *pMgmt) {
L
Liu Jicong 已提交
118 119
  for (int32_t i = 0; i < taosArrayGetSize(pMgmt->writeWroker); i++) {
    SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, i);
S
Shengliang Guan 已提交
120
    tMultiWorkerCleanup(pWorker);
S
Shengliang Guan 已提交
121
    taosMemoryFree(pWorker);
L
Liu Jicong 已提交
122
  }
L
Liu Jicong 已提交
123 124
  taosArrayDestroy(pMgmt->writeWroker);
  tSingleWorkerCleanup(&pMgmt->streamWorker);
S
Shengliang Guan 已提交
125
  dDebug("snode workers are closed");
S
Shengliang Guan 已提交
126 127
}

L
Liu Jicong 已提交
128
int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
129
  SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
L
Liu Jicong 已提交
130 131 132 133 134 135 136 137
  if (pMsg == NULL) {
    rpcFreeCont(pRpc->pCont);
    pRpc->pCont = NULL;
    return -1;
  }

  SSnode *pSnode = pMgmt->pSnode;
  if (pSnode == NULL) {
S
Shengliang Guan 已提交
138 139
    dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d len:%d", pMsg, terrstr(),
           TMSG_INFO(pMsg->msgType), qtype, pRpc->contLen);
S
Shengliang Guan 已提交
140 141 142
    taosFreeQitem(pMsg);
    rpcFreeCont(pRpc->pCont);
    pRpc->pCont = NULL;
L
Liu Jicong 已提交
143 144
    return -1;
  }
L
Liu Jicong 已提交
145

L
Liu Jicong 已提交
146 147 148 149
  SMsgHead *pHead = pRpc->pCont;
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = SNODE_HANDLE;
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
150
  pRpc->pCont = NULL;
L
Liu Jicong 已提交
151 152 153 154 155 156 157 158 159

  switch (qtype) {
    case STREAM_QUEUE:
      smPutNodeMsgToStreamQueue(pMgmt, pMsg);
      break;
    case WRITE_QUEUE:
      smPutNodeMsgToWriteQueue(pMgmt, pMsg);
      break;
    default:
S
Shengliang Guan 已提交
160 161 162 163
      terrno = TSDB_CODE_INVALID_PARA;
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      return -1;
L
Liu Jicong 已提交
164
  }
L
Liu Jicong 已提交
165
  return 0;
S
shm  
Shengliang Guan 已提交
166 167
}

S
Shengliang Guan 已提交
168
int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
L
Liu Jicong 已提交
169
  SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0);
S
shm  
Shengliang Guan 已提交
170 171 172
  if (pWorker == NULL) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
L
Liu Jicong 已提交
173 174
  }

S
Shengliang Guan 已提交
175
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
176
  taosWriteQitem(pWorker->queue, pMsg);
177 178 179
  return 0;
}

L
Liu Jicong 已提交
180 181
int32_t smPutNodeMsgToWriteQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
  SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0);
S
shm  
Shengliang Guan 已提交
182 183 184
  if (pWorker == NULL) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
S
Shengliang Guan 已提交
185 186
  }

S
Shengliang Guan 已提交
187
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
188 189
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
L
Liu Jicong 已提交
190 191
}

L
Liu Jicong 已提交
192 193
int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
  SSingleWorker *pWorker = &pMgmt->streamWorker;
L
Liu Jicong 已提交
194

S
Shengliang Guan 已提交
195
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
L
Liu Jicong 已提交
196 197
  if (pMsg->msgType == TDMT_STREAM_TASK_DISPATCH) {
    sndEnqueueStreamDispatch(pMgmt->pSnode, pMsg);
S
shm  
Shengliang Guan 已提交
198
  } else {
L
Liu Jicong 已提交
199
    taosWriteQitem(pWorker->queue, pMsg);
S
shm  
Shengliang Guan 已提交
200
  }
L
Liu Jicong 已提交
201
  return 0;
S
shm  
Shengliang Guan 已提交
202
}