smWorker.c 5.9 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "smInt.h"
S
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19
static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) {
S
Shengliang Guan 已提交
20 21
  SRpcMsg rsp = {
      .code = code,
S
Shengliang Guan 已提交
22 23
      .pCont = pMsg->info.rsp,
      .contLen = pMsg->info.rspLen,
24
      .info = pMsg->info,
S
Shengliang Guan 已提交
25
  };
26 27 28
  tmsgSendRsp(&rsp);
}

L
Liu Jicong 已提交
29
static void smProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
S
Shengliang Guan 已提交
30 31
  SSnodeMgmt *pMgmt = pInfo->ahandle;

S
shm  
Shengliang Guan 已提交
32
  for (int32_t i = 0; i < numOfMsgs; i++) {
S
Shengliang Guan 已提交
33
    SRpcMsg *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
34
    taosGetQitem(qall, (void **)&pMsg);
L
Liu Jicong 已提交
35 36 37 38 39 40 41 42 43 44 45
    const STraceId *trace = &pMsg->info.traceId;

    dTrace("msg:%p, get from snode-write queue", pMsg);
    int32_t code = sndProcessWriteMsg(pMgmt->pSnode, pMsg, NULL);
    if (code < 0) {
      dGError("snd, msg:%p failed to process write since %s", pMsg, terrstr(code));
      if (pMsg->info.handle != NULL) {
        tmsgSendRsp(pMsg);
      }
    } else {
      smSendRsp(pMsg, 0);
L
Liu Jicong 已提交
46
    }
L
Liu Jicong 已提交
47

S
shm  
Shengliang Guan 已提交
48
    dTrace("msg:%p, is freed", pMsg);
S
Shengliang Guan 已提交
49
    rpcFreeCont(pMsg->pCont);
S
shm  
Shengliang Guan 已提交
50 51 52 53
    taosFreeQitem(pMsg);
  }
}

L
Liu Jicong 已提交
54 55 56
static void smProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
  SSnodeMgmt     *pMgmt = pInfo->ahandle;
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
57

L
Liu Jicong 已提交
58 59 60
  dTrace("msg:%p, get from snode-stream queue", pMsg);
  int32_t code = sndProcessStreamMsg(pMgmt->pSnode, pMsg);
  if (code < 0) {
L
Liu Jicong 已提交
61 62 63 64 65
    if (pMsg) {
      dGError("snd, msg:%p failed to process stream msg %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr(code));
    } else {
      dGError("snd, msg:%p failed to process stream empty msg since %s", pMsg, terrstr(code));
    }
L
Liu Jicong 已提交
66
    smSendRsp(pMsg, terrno);
L
Liu Jicong 已提交
67
  }
S
shm  
Shengliang Guan 已提交
68 69

  dTrace("msg:%p, is freed", pMsg);
S
Shengliang Guan 已提交
70
  rpcFreeCont(pMsg->pCont);
S
shm  
Shengliang Guan 已提交
71 72
  taosFreeQitem(pMsg);
}
S
Shengliang Guan 已提交
73

S
shm  
Shengliang Guan 已提交
74
int32_t smStartWorker(SSnodeMgmt *pMgmt) {
L
Liu Jicong 已提交
75 76
  pMgmt->writeWroker = taosArrayInit(0, sizeof(SMultiWorker *));
  if (pMgmt->writeWroker == NULL) {
S
shm  
Shengliang Guan 已提交
77 78 79 80
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
81 82 83
  for (int32_t i = 0; i < tsNumOfSnodeWriteThreads; i++) {
    SMultiWorker *pWriteWorker = taosMemoryMalloc(sizeof(SMultiWorker));
    if (pWriteWorker == NULL) {
S
shm  
Shengliang Guan 已提交
84
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
85 86
      return -1;
    }
S
Shengliang Guan 已提交
87

S
Shengliang Guan 已提交
88 89
    SMultiWorkerCfg cfg = {
        .max = 1,
L
Liu Jicong 已提交
90 91
        .name = "snode-write",
        .fp = smProcessWriteQueue,
S
Shengliang Guan 已提交
92 93
        .param = pMgmt,
    };
L
Liu Jicong 已提交
94
    if (tMultiWorkerInit(pWriteWorker, &cfg) != 0) {
S
Shengliang Guan 已提交
95
      dError("failed to start snode-unique worker since %s", terrstr());
L
Liu Jicong 已提交
96 97
      return -1;
    }
L
Liu Jicong 已提交
98
    if (taosArrayPush(pMgmt->writeWroker, &pWriteWorker) == NULL) {
S
shm  
Shengliang Guan 已提交
99 100 101
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
L
Liu Jicong 已提交
102
  }
S
shm  
Shengliang Guan 已提交
103

S
Shengliang Guan 已提交
104
  SSingleWorkerCfg cfg = {
L
Liu Jicong 已提交
105 106 107 108
      .min = tsNumOfSnodeStreamThreads,
      .max = tsNumOfSnodeStreamThreads,
      .name = "snode-stream",
      .fp = (FItem)smProcessStreamQueue,
S
Shengliang Guan 已提交
109 110
      .param = pMgmt,
  };
S
Shengliang Guan 已提交
111

L
Liu Jicong 已提交
112
  if (tSingleWorkerInit(&pMgmt->streamWorker, &cfg)) {
S
Shengliang Guan 已提交
113
    dError("failed to start snode shared-worker since %s", terrstr());
S
Shengliang Guan 已提交
114 115 116
    return -1;
  }

S
Shengliang Guan 已提交
117
  dDebug("snode workers are initialized");
S
Shengliang Guan 已提交
118 119 120
  return 0;
}

S
shm  
Shengliang Guan 已提交
121
void smStopWorker(SSnodeMgmt *pMgmt) {
L
Liu Jicong 已提交
122 123
  for (int32_t i = 0; i < taosArrayGetSize(pMgmt->writeWroker); i++) {
    SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, i);
S
Shengliang Guan 已提交
124
    tMultiWorkerCleanup(pWorker);
S
Shengliang Guan 已提交
125
    taosMemoryFree(pWorker);
L
Liu Jicong 已提交
126
  }
L
Liu Jicong 已提交
127 128
  taosArrayDestroy(pMgmt->writeWroker);
  tSingleWorkerCleanup(&pMgmt->streamWorker);
S
Shengliang Guan 已提交
129
  dDebug("snode workers are closed");
S
Shengliang Guan 已提交
130 131
}

L
Liu Jicong 已提交
132 133 134 135 136 137 138 139 140 141
int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
  SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
  if (pMsg == NULL) {
    rpcFreeCont(pRpc->pCont);
    pRpc->pCont = NULL;
    return -1;
  }

  SSnode *pSnode = pMgmt->pSnode;
  if (pSnode == NULL) {
S
Shengliang Guan 已提交
142
    dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d", pMsg, terrstr(),
L
Liu Jicong 已提交
143
           TMSG_INFO(pMsg->msgType), qtype);
S
Shengliang Guan 已提交
144 145 146
    taosFreeQitem(pMsg);
    rpcFreeCont(pRpc->pCont);
    pRpc->pCont = NULL;
L
Liu Jicong 已提交
147 148
    return -1;
  }
L
Liu Jicong 已提交
149

L
Liu Jicong 已提交
150 151 152 153
  SMsgHead *pHead = pRpc->pCont;
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = SNODE_HANDLE;
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
154
  pRpc->pCont = NULL;
L
Liu Jicong 已提交
155 156 157 158 159 160 161 162 163

  switch (qtype) {
    case STREAM_QUEUE:
      smPutNodeMsgToStreamQueue(pMgmt, pMsg);
      break;
    case WRITE_QUEUE:
      smPutNodeMsgToWriteQueue(pMgmt, pMsg);
      break;
    default:
164 165
      tAssertS(0, "msg:%p failed to put into snode queue since %s, type:%s qtype:%d", pMsg, terrstr(),
               TMSG_INFO(pMsg->msgType), qtype);
L
Liu Jicong 已提交
166
  }
L
Liu Jicong 已提交
167
  return 0;
S
shm  
Shengliang Guan 已提交
168 169
}

S
Shengliang Guan 已提交
170
int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
L
Liu Jicong 已提交
171
  SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0);
S
shm  
Shengliang Guan 已提交
172 173 174
  if (pWorker == NULL) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
L
Liu Jicong 已提交
175 176
  }

S
Shengliang Guan 已提交
177
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
178
  taosWriteQitem(pWorker->queue, pMsg);
179 180 181
  return 0;
}

L
Liu Jicong 已提交
182 183
int32_t smPutNodeMsgToWriteQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
  SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0);
S
shm  
Shengliang Guan 已提交
184 185 186
  if (pWorker == NULL) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
S
Shengliang Guan 已提交
187 188
  }

S
Shengliang Guan 已提交
189
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
190 191
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
L
Liu Jicong 已提交
192 193
}

L
Liu Jicong 已提交
194 195
int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
  SSingleWorker *pWorker = &pMgmt->streamWorker;
L
Liu Jicong 已提交
196

S
Shengliang Guan 已提交
197
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
L
Liu Jicong 已提交
198 199
  if (pMsg->msgType == TDMT_STREAM_TASK_DISPATCH) {
    sndEnqueueStreamDispatch(pMgmt->pSnode, pMsg);
S
shm  
Shengliang Guan 已提交
200
  } else {
L
Liu Jicong 已提交
201
    taosWriteQitem(pWorker->queue, pMsg);
S
shm  
Shengliang Guan 已提交
202
  }
L
Liu Jicong 已提交
203
  return 0;
S
shm  
Shengliang Guan 已提交
204
}