smWorker.c 5.5 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "smInt.h"
S
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19
static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) {
S
Shengliang Guan 已提交
20 21
  SRpcMsg rsp = {
      .code = code,
S
Shengliang Guan 已提交
22 23
      .pCont = pMsg->info.rsp,
      .contLen = pMsg->info.rspLen,
24
      .info = pMsg->info,
S
Shengliang Guan 已提交
25
  };
26 27 28
  tmsgSendRsp(&rsp);
}

L
Liu Jicong 已提交
29
static void smProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
S
Shengliang Guan 已提交
30 31
  SSnodeMgmt *pMgmt = pInfo->ahandle;

S
shm  
Shengliang Guan 已提交
32
  for (int32_t i = 0; i < numOfMsgs; i++) {
S
Shengliang Guan 已提交
33
    SRpcMsg *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
34
    taosGetQitem(qall, (void **)&pMsg);
L
Liu Jicong 已提交
35 36 37 38 39 40 41 42 43 44 45
    const STraceId *trace = &pMsg->info.traceId;

    dTrace("msg:%p, get from snode-write queue", pMsg);
    int32_t code = sndProcessWriteMsg(pMgmt->pSnode, pMsg, NULL);
    if (code < 0) {
      dGError("snd, msg:%p failed to process write since %s", pMsg, terrstr(code));
      if (pMsg->info.handle != NULL) {
        tmsgSendRsp(pMsg);
      }
    } else {
      smSendRsp(pMsg, 0);
L
Liu Jicong 已提交
46
    }
L
Liu Jicong 已提交
47

S
shm  
Shengliang Guan 已提交
48
    dTrace("msg:%p, is freed", pMsg);
S
Shengliang Guan 已提交
49
    rpcFreeCont(pMsg->pCont);
S
shm  
Shengliang Guan 已提交
50 51 52 53
    taosFreeQitem(pMsg);
  }
}

L
Liu Jicong 已提交
54 55 56
static void smProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
  SSnodeMgmt     *pMgmt = pInfo->ahandle;
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
57

L
Liu Jicong 已提交
58 59 60
  dTrace("msg:%p, get from snode-stream queue", pMsg);
  int32_t code = sndProcessStreamMsg(pMgmt->pSnode, pMsg);
  if (code < 0) {
61
    dGError("snd, msg:%p failed to process stream msg %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr(code));
L
Liu Jicong 已提交
62
    smSendRsp(pMsg, terrno);
L
Liu Jicong 已提交
63
  }
S
shm  
Shengliang Guan 已提交
64 65

  dTrace("msg:%p, is freed", pMsg);
S
Shengliang Guan 已提交
66
  rpcFreeCont(pMsg->pCont);
S
shm  
Shengliang Guan 已提交
67 68
  taosFreeQitem(pMsg);
}
S
Shengliang Guan 已提交
69

S
shm  
Shengliang Guan 已提交
70
int32_t smStartWorker(SSnodeMgmt *pMgmt) {
L
Liu Jicong 已提交
71 72
  pMgmt->writeWroker = taosArrayInit(0, sizeof(SMultiWorker *));
  if (pMgmt->writeWroker == NULL) {
S
shm  
Shengliang Guan 已提交
73 74 75 76
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
77 78 79
  for (int32_t i = 0; i < tsNumOfSnodeWriteThreads; i++) {
    SMultiWorker *pWriteWorker = taosMemoryMalloc(sizeof(SMultiWorker));
    if (pWriteWorker == NULL) {
S
shm  
Shengliang Guan 已提交
80
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
81 82
      return -1;
    }
S
Shengliang Guan 已提交
83

S
Shengliang Guan 已提交
84 85
    SMultiWorkerCfg cfg = {
        .max = 1,
L
Liu Jicong 已提交
86 87
        .name = "snode-write",
        .fp = smProcessWriteQueue,
S
Shengliang Guan 已提交
88 89
        .param = pMgmt,
    };
L
Liu Jicong 已提交
90
    if (tMultiWorkerInit(pWriteWorker, &cfg) != 0) {
S
Shengliang Guan 已提交
91
      dError("failed to start snode-unique worker since %s", terrstr());
L
Liu Jicong 已提交
92 93
      return -1;
    }
L
Liu Jicong 已提交
94
    if (taosArrayPush(pMgmt->writeWroker, &pWriteWorker) == NULL) {
S
shm  
Shengliang Guan 已提交
95 96 97
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
L
Liu Jicong 已提交
98
  }
S
shm  
Shengliang Guan 已提交
99

S
Shengliang Guan 已提交
100
  SSingleWorkerCfg cfg = {
L
Liu Jicong 已提交
101 102 103 104
      .min = tsNumOfSnodeStreamThreads,
      .max = tsNumOfSnodeStreamThreads,
      .name = "snode-stream",
      .fp = (FItem)smProcessStreamQueue,
S
Shengliang Guan 已提交
105 106
      .param = pMgmt,
  };
S
Shengliang Guan 已提交
107

L
Liu Jicong 已提交
108
  if (tSingleWorkerInit(&pMgmt->streamWorker, &cfg)) {
S
Shengliang Guan 已提交
109
    dError("failed to start snode shared-worker since %s", terrstr());
S
Shengliang Guan 已提交
110 111 112
    return -1;
  }

S
Shengliang Guan 已提交
113
  dDebug("snode workers are initialized");
S
Shengliang Guan 已提交
114 115 116
  return 0;
}

S
shm  
Shengliang Guan 已提交
117
void smStopWorker(SSnodeMgmt *pMgmt) {
L
Liu Jicong 已提交
118 119
  for (int32_t i = 0; i < taosArrayGetSize(pMgmt->writeWroker); i++) {
    SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, i);
S
Shengliang Guan 已提交
120
    tMultiWorkerCleanup(pWorker);
L
Liu Jicong 已提交
121
  }
L
Liu Jicong 已提交
122 123
  taosArrayDestroy(pMgmt->writeWroker);
  tSingleWorkerCleanup(&pMgmt->streamWorker);
S
Shengliang Guan 已提交
124
  dDebug("snode workers are closed");
S
Shengliang Guan 已提交
125 126
}

L
Liu Jicong 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139 140
int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
  SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
  if (pMsg == NULL) {
    rpcFreeCont(pRpc->pCont);
    pRpc->pCont = NULL;
    return -1;
  }

  SSnode *pSnode = pMgmt->pSnode;
  if (pSnode == NULL) {
    dError("snode: msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pMsg, terrstr(),
           TMSG_INFO(pMsg->msgType), qtype);
    return -1;
  }
L
Liu Jicong 已提交
141

L
Liu Jicong 已提交
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
  SMsgHead *pHead = pRpc->pCont;
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = SNODE_HANDLE;
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));

  switch (qtype) {
    case STREAM_QUEUE:
      smPutNodeMsgToStreamQueue(pMgmt, pMsg);
      break;
    case WRITE_QUEUE:
      smPutNodeMsgToWriteQueue(pMgmt, pMsg);
      break;
    default:
      ASSERT(0);
  }
L
Liu Jicong 已提交
157
  return 0;
S
shm  
Shengliang Guan 已提交
158 159
}

S
Shengliang Guan 已提交
160
int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
L
Liu Jicong 已提交
161
  SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0);
S
shm  
Shengliang Guan 已提交
162 163 164
  if (pWorker == NULL) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
L
Liu Jicong 已提交
165 166
  }

S
Shengliang Guan 已提交
167
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
168
  taosWriteQitem(pWorker->queue, pMsg);
169 170 171
  return 0;
}

L
Liu Jicong 已提交
172 173
int32_t smPutNodeMsgToWriteQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
  SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0);
S
shm  
Shengliang Guan 已提交
174 175 176
  if (pWorker == NULL) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
S
Shengliang Guan 已提交
177 178
  }

S
Shengliang Guan 已提交
179
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
180 181
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
L
Liu Jicong 已提交
182 183
}

L
Liu Jicong 已提交
184 185
int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
  SSingleWorker *pWorker = &pMgmt->streamWorker;
L
Liu Jicong 已提交
186

S
Shengliang Guan 已提交
187
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
L
Liu Jicong 已提交
188 189
  if (pMsg->msgType == TDMT_STREAM_TASK_DISPATCH) {
    sndEnqueueStreamDispatch(pMgmt->pSnode, pMsg);
S
shm  
Shengliang Guan 已提交
190
  } else {
L
Liu Jicong 已提交
191
    taosWriteQitem(pWorker->queue, pMsg);
S
shm  
Shengliang Guan 已提交
192
  }
L
Liu Jicong 已提交
193
  return 0;
S
shm  
Shengliang Guan 已提交
194
}