dmProc.c 16.3 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmMgmt.h"
S
shm  
Shengliang Guan 已提交
18

19
static inline int32_t CEIL8(int32_t v) { return ceil((float)(v) / 8) * 8; }
S
shm  
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
static int32_t dmInitProcMutex(SProcQueue *queue) {
S
Shengliang Guan 已提交
22
  TdThreadMutexAttr mattr = {0};
S
shm  
Shengliang Guan 已提交
23

wafwerar's avatar
wafwerar 已提交
24
  if (taosThreadMutexAttrInit(&mattr) != 0) {
S
shm  
Shengliang Guan 已提交
25
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
26
    dError("node:%s, failed to init mutex while init attr since %s", queue->name, terrstr());
S
shm  
Shengliang Guan 已提交
27
    return -1;
S
shm  
Shengliang Guan 已提交
28 29
  }

wafwerar's avatar
wafwerar 已提交
30
  if (taosThreadMutexAttrSetPshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) {
31
    taosThreadMutexAttrDestroy(&mattr);
S
shm  
Shengliang Guan 已提交
32
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
33
    dError("node:%s, failed to init mutex while set shared since %s", queue->name, terrstr());
S
shm  
Shengliang Guan 已提交
34
    return -1;
S
shm  
Shengliang Guan 已提交
35 36
  }

S
Shengliang Guan 已提交
37
  if (taosThreadMutexInit(&queue->mutex, &mattr) != 0) {
S
Shengliang Guan 已提交
38
    taosThreadMutexAttrDestroy(&mattr);
S
shm  
Shengliang Guan 已提交
39
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
40
    dError("node:%s, failed to init mutex since %s", queue->name, terrstr());
S
shm  
Shengliang Guan 已提交
41
    return -1;
S
shm  
Shengliang Guan 已提交
42 43
  }

wafwerar's avatar
wafwerar 已提交
44
  taosThreadMutexAttrDestroy(&mattr);
S
shm  
Shengliang Guan 已提交
45
  return 0;
S
shm  
Shengliang Guan 已提交
46 47
}

S
Shengliang Guan 已提交
48 49
static int32_t dmInitProcSem(SProcQueue *queue) {
  if (tsem_init(&queue->sem, 1, 0) != 0) {
S
shm  
Shengliang Guan 已提交
50
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
51
    dError("node:%s, failed to init sem since %s", queue->name, terrstr());
S
shm  
Shengliang Guan 已提交
52 53 54
    return -1;
  }

S
shm  
Shengliang Guan 已提交
55
  return 0;
S
shm  
Shengliang Guan 已提交
56 57
}

S
Shengliang Guan 已提交
58 59 60
static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) {
  SProcQueue *queue = (SProcQueue *)(ptr);

S
shm  
Shengliang Guan 已提交
61 62 63 64
  int32_t bufSize = size - CEIL8(sizeof(SProcQueue));
  if (bufSize <= 1024) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
S
shm  
Shengliang Guan 已提交
65 66
  }

67
  if (proc->ptype & DND_PROC_PARENT) {
S
Shengliang Guan 已提交
68
    if (dmInitProcMutex(queue) != 0) {
S
shm  
Shengliang Guan 已提交
69 70
      return NULL;
    }
S
shm  
Shengliang Guan 已提交
71

S
Shengliang Guan 已提交
72
    if (dmInitProcSem(queue) != 0) {
S
shm  
Shengliang Guan 已提交
73 74
      return NULL;
    }
S
shm  
Shengliang Guan 已提交
75

S
Shengliang Guan 已提交
76 77 78 79 80 81
    tstrncpy(queue->name, proc->name, sizeof(queue->name));
    queue->head = 0;
    queue->tail = 0;
    queue->total = bufSize;
    queue->avail = bufSize;
    queue->items = 0;
S
shm  
Shengliang Guan 已提交
82 83
  }

S
Shengliang Guan 已提交
84
  return queue;
S
shm  
Shengliang Guan 已提交
85 86
}

87
static void dmCleanupProcQueue(SProcQueue *queue) {}
S
shm  
Shengliang Guan 已提交
88

89 90 91 92 93 94
static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) {
  const void   *pHead = pMsg;
  const void   *pBody = pMsg->pCont;
  const int16_t rawHeadLen = sizeof(SRpcMsg);
  const int32_t rawBodyLen = pMsg->contLen;
  const int16_t headLen = CEIL8(rawHeadLen);
S
shm  
Shengliang Guan 已提交
95 96
  const int32_t bodyLen = CEIL8(rawBodyLen);
  const int32_t fullLen = headLen + bodyLen + 8;
97
  const int64_t handle = (int64_t)pMsg->info.handle;
S
shm  
Shengliang Guan 已提交
98

S
Shengliang Guan 已提交
99 100 101 102 103
  if (fullLen > queue->total) {
    terrno = TSDB_CODE_OUT_OF_RANGE;
    return -1;
  }

S
Shengliang Guan 已提交
104 105 106
  taosThreadMutexLock(&queue->mutex);
  if (fullLen > queue->avail) {
    taosThreadMutexUnlock(&queue->mutex);
S
shm  
Shengliang Guan 已提交
107 108 109 110
    terrno = TSDB_CODE_OUT_OF_SHM_MEM;
    return -1;
  }

111
  if (ftype == DND_FUNC_REQ && IsReq(pMsg) && pMsg->code == 0 && handle != 0 && pMsg->info.noResp == 0) {
112
    if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &pMsg->info, sizeof(SRpcConnInfo)) != 0) {
S
Shengliang Guan 已提交
113
      taosThreadMutexUnlock(&queue->mutex);
114 115 116 117
      return -1;
    }
  }

S
Shengliang Guan 已提交
118 119 120 121 122
  const int32_t pos = queue->tail;
  if (queue->tail < queue->total) {
    *(int16_t *)(queue->pBuffer + queue->tail) = rawHeadLen;
    *(int8_t *)(queue->pBuffer + queue->tail + 2) = (int8_t)ftype;
    *(int32_t *)(queue->pBuffer + queue->tail + 4) = rawBodyLen;
S
shm  
Shengliang Guan 已提交
123
  } else {
S
Shengliang Guan 已提交
124 125 126
    *(int16_t *)(queue->pBuffer) = rawHeadLen;
    *(int8_t *)(queue->pBuffer + 2) = (int8_t)ftype;
    *(int32_t *)(queue->pBuffer + 4) = rawBodyLen;
S
shm  
Shengliang Guan 已提交
127 128
  }

S
Shengliang Guan 已提交
129 130
  if (queue->tail < queue->head) {
    memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
131
    if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, rawBodyLen);
S
Shengliang Guan 已提交
132
    queue->tail = queue->tail + 8 + headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
133
  } else {
S
Shengliang Guan 已提交
134
    int32_t remain = queue->total - queue->tail;
S
shm  
Shengliang Guan 已提交
135
    if (remain == 0) {
S
Shengliang Guan 已提交
136
      memcpy(queue->pBuffer + 8, pHead, rawHeadLen);
137
      if (rawBodyLen > 0) memcpy(queue->pBuffer + 8 + headLen, pBody, rawBodyLen);
S
Shengliang Guan 已提交
138
      queue->tail = 8 + headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
139
    } else if (remain == 8) {
S
Shengliang Guan 已提交
140
      memcpy(queue->pBuffer, pHead, rawHeadLen);
141
      if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen, pBody, rawBodyLen);
S
Shengliang Guan 已提交
142
      queue->tail = headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
143
    } else if (remain < 8 + headLen) {
S
Shengliang Guan 已提交
144
      memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8);
145
      memcpy(queue->pBuffer, (char *)pHead + remain - 8, rawHeadLen - (remain - 8));
146
      if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
S
Shengliang Guan 已提交
147
      queue->tail = headLen - (remain - 8) + bodyLen;
S
Shengliang Guan 已提交
148
    } else if (remain < 8 + headLen + bodyLen) {
S
Shengliang Guan 已提交
149
      memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
150
      if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen);
151 152
      if (rawBodyLen > 0)
        memcpy(queue->pBuffer, (char *)pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
S
Shengliang Guan 已提交
153
      queue->tail = bodyLen - (remain - 8 - headLen);
S
shm  
Shengliang Guan 已提交
154
    } else {
S
Shengliang Guan 已提交
155
      memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
156
      if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + headLen + 8, pBody, rawBodyLen);
S
Shengliang Guan 已提交
157
      queue->tail = queue->tail + headLen + bodyLen + 8;
S
shm  
Shengliang Guan 已提交
158 159 160
    }
  }

S
Shengliang Guan 已提交
161 162 163 164
  queue->avail -= fullLen;
  queue->items++;
  taosThreadMutexUnlock(&queue->mutex);
  tsem_post(&queue->sem);
S
shm  
Shengliang Guan 已提交
165

166 167
  dTrace("node:%s, push %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype),
         pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, pMsg->code, pos, queue->items);
S
shm  
Shengliang Guan 已提交
168 169 170
  return 0;
}

S
Shengliang Guan 已提交
171
static inline int32_t dmPopFromProcQueue(SProcQueue *queue, SRpcMsg **ppMsg, EProcFuncType *pFuncType) {
S
Shengliang Guan 已提交
172
  tsem_wait(&queue->sem);
S
shm  
Shengliang Guan 已提交
173

S
Shengliang Guan 已提交
174 175 176
  taosThreadMutexLock(&queue->mutex);
  if (queue->total - queue->avail <= 0) {
    taosThreadMutexUnlock(&queue->mutex);
S
shm  
Shengliang Guan 已提交
177
    terrno = TSDB_CODE_OUT_OF_SHM_MEM;
S
shm  
Shengliang Guan 已提交
178
    return 0;
S
shm  
Shengliang Guan 已提交
179 180
  }

S
Shengliang Guan 已提交
181
  int16_t rawHeadLen = 0;
S
Shengliang Guan 已提交
182
  int8_t  ftype = 0;
S
Shengliang Guan 已提交
183
  int32_t rawBodyLen = 0;
S
Shengliang Guan 已提交
184 185 186 187
  if (queue->head < queue->total) {
    rawHeadLen = *(int16_t *)(queue->pBuffer + queue->head);
    ftype = *(int8_t *)(queue->pBuffer + queue->head + 2);
    rawBodyLen = *(int32_t *)(queue->pBuffer + queue->head + 4);
S
shm  
Shengliang Guan 已提交
188
  } else {
S
Shengliang Guan 已提交
189 190 191
    rawHeadLen = *(int16_t *)(queue->pBuffer);
    ftype = *(int8_t *)(queue->pBuffer + 2);
    rawBodyLen = *(int32_t *)(queue->pBuffer + 4);
S
shm  
Shengliang Guan 已提交
192
  }
S
Shengliang Guan 已提交
193
  int16_t headLen = CEIL8(rawHeadLen);
S
Shengliang Guan 已提交
194
  int32_t bodyLen = CEIL8(rawBodyLen);
S
shm  
Shengliang Guan 已提交
195

S
Shengliang Guan 已提交
196
  void *pHead = taosAllocateQitem(headLen, DEF_QITEM);
197 198 199
  void *pBody = NULL;
  if (bodyLen > 0) pBody = rpcMallocCont(bodyLen);
  if (pHead == NULL || (bodyLen > 0 && pBody == NULL)) {
S
Shengliang Guan 已提交
200 201 202 203
    taosThreadMutexUnlock(&queue->mutex);
    tsem_post(&queue->sem);
    taosFreeQitem(pHead);
    rpcFreeCont(pBody);
S
shm  
Shengliang Guan 已提交
204 205 206 207
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
208 209 210
  const int32_t pos = queue->head;
  if (queue->head < queue->tail) {
    memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
211
    if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, bodyLen);
S
Shengliang Guan 已提交
212
    queue->head = queue->head + 8 + headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
213
  } else {
S
Shengliang Guan 已提交
214
    int32_t remain = queue->total - queue->head;
S
shm  
Shengliang Guan 已提交
215
    if (remain == 0) {
S
Shengliang Guan 已提交
216
      memcpy(pHead, queue->pBuffer + 8, headLen);
217
      if (bodyLen > 0) memcpy(pBody, queue->pBuffer + 8 + headLen, bodyLen);
S
Shengliang Guan 已提交
218
      queue->head = 8 + headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
219
    } else if (remain == 8) {
S
Shengliang Guan 已提交
220
      memcpy(pHead, queue->pBuffer, headLen);
221
      if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen, bodyLen);
S
Shengliang Guan 已提交
222
      queue->head = headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
223
    } else if (remain < 8 + headLen) {
S
Shengliang Guan 已提交
224 225
      memcpy(pHead, queue->pBuffer + queue->head + 8, remain - 8);
      memcpy((char *)pHead + remain - 8, queue->pBuffer, headLen - (remain - 8));
226
      if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen - (remain - 8), bodyLen);
S
Shengliang Guan 已提交
227
      queue->head = headLen - (remain - 8) + bodyLen;
S
Shengliang Guan 已提交
228
    } else if (remain < 8 + headLen + bodyLen) {
S
Shengliang Guan 已提交
229
      memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
230 231
      if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, remain - 8 - headLen);
      if (bodyLen > 0) memcpy((char *)pBody + remain - 8 - headLen, queue->pBuffer, bodyLen - (remain - 8 - headLen));
S
Shengliang Guan 已提交
232
      queue->head = bodyLen - (remain - 8 - headLen);
S
shm  
Shengliang Guan 已提交
233
    } else {
S
Shengliang Guan 已提交
234
      memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
235
      if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + headLen + 8, bodyLen);
S
Shengliang Guan 已提交
236
      queue->head = queue->head + headLen + bodyLen + 8;
S
shm  
Shengliang Guan 已提交
237 238 239
    }
  }

S
Shengliang Guan 已提交
240 241 242
  queue->avail = queue->avail + headLen + bodyLen + 8;
  queue->items--;
  taosThreadMutexUnlock(&queue->mutex);
S
shm  
Shengliang Guan 已提交
243

244 245
  *ppMsg = pHead;
  (*ppMsg)->pCont = pBody;
S
Shengliang Guan 已提交
246
  *pFuncType = (EProcFuncType)ftype;
S
shm  
Shengliang Guan 已提交
247

248 249
  dTrace("node:%s, pop %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype),
         (*ppMsg), (*ppMsg)->msgType, (*ppMsg)->info.handle, (*ppMsg)->contLen, (*ppMsg)->code, pos, queue->items);
S
shm  
Shengliang Guan 已提交
250
  return 1;
S
shm  
Shengliang Guan 已提交
251 252
}

S
Shengliang Guan 已提交
253 254
int32_t dmInitProc(struct SMgmtWrapper *pWrapper) {
  SProc *proc = &pWrapper->proc;
255 256
  if (proc->name != NULL) return 0;

S
Shengliang Guan 已提交
257 258
  proc->wrapper = pWrapper;
  proc->name = pWrapper->name;
S
shm  
Shengliang Guan 已提交
259

S
Shengliang Guan 已提交
260
  SShm   *shm = &proc->shm;
S
shm  
Shengliang Guan 已提交
261
  int32_t cstart = 0;
S
Shengliang Guan 已提交
262
  int32_t csize = CEIL8(shm->size / 2);
S
shm  
Shengliang Guan 已提交
263
  int32_t pstart = csize;
S
Shengliang Guan 已提交
264 265
  int32_t psize = CEIL8(shm->size - pstart);
  if (pstart + psize > shm->size) {
S
shm  
Shengliang Guan 已提交
266 267 268
    psize -= 8;
  }

S
Shengliang Guan 已提交
269 270 271 272 273 274 275 276
  proc->hash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  proc->cqueue = dmInitProcQueue(proc, (char *)shm->ptr + cstart, csize);
  proc->pqueue = dmInitProcQueue(proc, (char *)shm->ptr + pstart, psize);
  if (proc->cqueue == NULL || proc->pqueue == NULL || proc->hash == NULL) {
    dmCleanupProcQueue(proc->cqueue);
    dmCleanupProcQueue(proc->pqueue);
    taosHashCleanup(proc->hash);
    return -1;
S
shm  
Shengliang Guan 已提交
277 278
  }

S
Shengliang Guan 已提交
279 280
  dDebug("node:%s, proc is initialized, cqueue:%p pqueue:%p", proc->name, proc->cqueue, proc->pqueue);
  return 0;
S
shm  
Shengliang Guan 已提交
281 282
}

S
Shengliang Guan 已提交
283 284 285 286 287 288
static void *dmConsumChildQueue(void *param) {
  SProc        *proc = param;
  SMgmtWrapper *pWrapper = proc->wrapper;
  SProcQueue   *queue = proc->cqueue;
  int32_t       numOfMsgs = 0;
  int32_t       code = 0;
S
Shengliang Guan 已提交
289
  EProcFuncType ftype = DND_FUNC_REQ;
290
  SRpcMsg      *pMsg = NULL;
S
Shengliang Guan 已提交
291

S
Shengliang Guan 已提交
292
  dDebug("node:%s, start to consume from cqueue", proc->name);
S
Shengliang Guan 已提交
293
  do {
294
    numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype);
S
Shengliang Guan 已提交
295
    if (numOfMsgs == 0) {
296
      dDebug("node:%s, get no msg from cqueue and exit thread", proc->name);
S
Shengliang Guan 已提交
297 298 299 300
      break;
    }

    if (numOfMsgs < 0) {
S
Shengliang Guan 已提交
301
      dError("node:%s, get no msg from cqueue since %s", proc->name, terrstr());
S
Shengliang Guan 已提交
302 303 304 305
      taosMsleep(1);
      continue;
    }

S
Shengliang Guan 已提交
306
    if (ftype != DND_FUNC_REQ) {
307 308 309 310 311 312 313 314 315
      dError("node:%s, invalid ftype:%d from cqueue", proc->name, ftype);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

    code = dmProcessNodeMsg(pWrapper, pMsg);
    if (code != 0) {
      dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pMsg, terrstr());
316
      SRpcMsg rsp = {.code = (terrno != 0 ? terrno : code), .info = pMsg->info};
317 318 319
      dmPutToProcPQueue(proc, &rsp, DND_FUNC_RSP);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
320 321
    }
  } while (1);
S
shm  
Shengliang Guan 已提交
322

S
Shengliang Guan 已提交
323 324
  return NULL;
}
S
shm  
Shengliang Guan 已提交
325

S
Shengliang Guan 已提交
326 327 328 329 330 331
static void *dmConsumParentQueue(void *param) {
  SProc        *proc = param;
  SMgmtWrapper *pWrapper = proc->wrapper;
  SProcQueue   *queue = proc->pqueue;
  int32_t       numOfMsgs = 0;
  int32_t       code = 0;
S
Shengliang Guan 已提交
332
  EProcFuncType ftype = DND_FUNC_REQ;
333
  SRpcMsg      *pMsg = NULL;
S
Shengliang Guan 已提交
334

S
Shengliang Guan 已提交
335
  dDebug("node:%s, start to consume from pqueue", proc->name);
S
Shengliang Guan 已提交
336
  do {
337
    numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype);
S
shm  
Shengliang Guan 已提交
338
    if (numOfMsgs == 0) {
S
Shengliang Guan 已提交
339
      dDebug("node:%s, get no msg from pqueue and exit thread", proc->name);
S
shm  
Shengliang Guan 已提交
340
      break;
S
Shengliang Guan 已提交
341 342 343
    }

    if (numOfMsgs < 0) {
S
Shengliang Guan 已提交
344
      dError("node:%s, get no msg from pqueue since %s", proc->name, terrstr());
S
shm  
Shengliang Guan 已提交
345 346
      taosMsleep(1);
      continue;
S
Shengliang Guan 已提交
347 348
    }

S
Shengliang Guan 已提交
349
    if (ftype == DND_FUNC_RSP) {
350 351
      dmRemoveProcRpcHandle(proc, pMsg->info.handle);
      rpcSendResponse(pMsg);
S
Shengliang Guan 已提交
352
    } else if (ftype == DND_FUNC_REGIST) {
353
      rpcRegisterBrokenLinkArg(pMsg);
S
Shengliang Guan 已提交
354
    } else if (ftype == DND_FUNC_RELEASE) {
355 356
      dmRemoveProcRpcHandle(proc, pMsg->info.handle);
      rpcReleaseHandle(pMsg->info.handle, (int8_t)pMsg->code);
S
shm  
Shengliang Guan 已提交
357
    } else {
358 359
      dError("node:%s, invalid ftype:%d from pqueue", proc->name, ftype);
      rpcFreeCont(pMsg->pCont);
S
shm  
Shengliang Guan 已提交
360
    }
S
Shengliang Guan 已提交
361

362
    taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
363 364 365
  } while (1);

  return NULL;
S
shm  
Shengliang Guan 已提交
366 367
}

S
Shengliang Guan 已提交
368 369
int32_t dmRunProc(SProc *proc) {
  TdThreadAttr thAttr = {0};
S
Shengliang Guan 已提交
370 371
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
shm  
Shengliang Guan 已提交
372

373
  if (proc->ptype & DND_PROC_PARENT) {
S
Shengliang Guan 已提交
374 375 376 377 378 379 380 381
    if (taosThreadCreate(&proc->pthread, &thAttr, dmConsumParentQueue, proc) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      dError("node:%s, failed to create pthread since %s", proc->name, terrstr());
      return -1;
    }
    dDebug("node:%s, thread:%" PRId64 " is created to consume pqueue", proc->name, proc->pthread);
  }

382
  if (proc->ptype & DND_PROC_CHILD) {
S
Shengliang Guan 已提交
383 384 385 386 387 388
    if (taosThreadCreate(&proc->cthread, &thAttr, dmConsumChildQueue, proc) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      dError("node:%s, failed to create cthread since %s", proc->name, terrstr());
      return -1;
    }
    dDebug("node:%s, thread:%" PRId64 " is created to consume cqueue", proc->name, proc->cthread);
S
shm  
Shengliang Guan 已提交
389 390
  }

S
Shengliang Guan 已提交
391
  taosThreadAttrDestroy(&thAttr);
S
shm  
Shengliang Guan 已提交
392 393 394
  return 0;
}

S
Shengliang Guan 已提交
395
void dmStopProc(SProc *proc) {
396
  proc->stop = true;
S
Shengliang Guan 已提交
397 398
  if (taosCheckPthreadValid(proc->pthread)) {
    dDebug("node:%s, start to join pthread:%" PRId64, proc->name, proc->pthread);
399
    tsem_post(&proc->pqueue->sem);
S
Shengliang Guan 已提交
400 401
    taosThreadJoin(proc->pthread, NULL);
    taosThreadClear(&proc->pthread);
S
shm  
Shengliang Guan 已提交
402
  }
S
shm  
Shengliang Guan 已提交
403

S
Shengliang Guan 已提交
404 405
  if (taosCheckPthreadValid(proc->cthread)) {
    dDebug("node:%s, start to join cthread:%" PRId64, proc->name, proc->cthread);
406
    tsem_post(&proc->cqueue->sem);
S
Shengliang Guan 已提交
407 408
    taosThreadJoin(proc->cthread, NULL);
    taosThreadClear(&proc->cthread);
S
shm  
Shengliang Guan 已提交
409 410 411
  }
}

S
Shengliang Guan 已提交
412 413
void dmCleanupProc(struct SMgmtWrapper *pWrapper) {
  SProc *proc = &pWrapper->proc;
414
  if (proc->name == NULL) return;
S
Shengliang Guan 已提交
415

S
Shengliang Guan 已提交
416
  dDebug("node:%s, start to cleanup proc", pWrapper->name);
S
Shengliang Guan 已提交
417 418 419 420
  dmStopProc(proc);
  dmCleanupProcQueue(proc->cqueue);
  dmCleanupProcQueue(proc->pqueue);
  taosHashCleanup(proc->hash);
421
  proc->hash = NULL;
S
Shengliang Guan 已提交
422
  dDebug("node:%s, proc is cleaned up", pWrapper->name);
423 424
}

425
void dmRemoveProcRpcHandle(SProc *proc, void *handle) {
426
  int64_t h = (int64_t)handle;
S
Shengliang Guan 已提交
427 428 429
  taosThreadMutexLock(&proc->cqueue->mutex);
  taosHashRemove(proc->hash, &h, sizeof(int64_t));
  taosThreadMutexUnlock(&proc->cqueue->mutex);
430 431
}

S
Shengliang Guan 已提交
432 433
void dmCloseProcRpcHandles(SProc *proc) {
  taosThreadMutexLock(&proc->cqueue->mutex);
434 435 436
  SRpcHandleInfo *pInfo = taosHashIterate(proc->hash, NULL);
  while (pInfo != NULL) {
    dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, pInfo->handle);
437
    SRpcMsg rpcMsg = {.code = TSDB_CODE_NODE_OFFLINE, .info = *pInfo};
S
Shengliang Guan 已提交
438
    rpcSendResponse(&rpcMsg);
439
    pInfo = taosHashIterate(proc->hash, pInfo);
440
  }
S
Shengliang Guan 已提交
441 442
  taosHashClear(proc->hash);
  taosThreadMutexUnlock(&proc->cqueue->mutex);
S
shm  
Shengliang Guan 已提交
443
}
S
shm  
Shengliang Guan 已提交
444

445
void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
S
Shengliang Guan 已提交
446
  int32_t retry = 0;
447 448 449 450 451
  while (1) {
    if (dmPushToProcQueue(proc, proc->pqueue, pMsg, ftype) == 0) {
      break;
    }

S
Shengliang Guan 已提交
452
    if (terrno != TSDB_CODE_OUT_OF_SHM_MEM) {
453 454 455 456 457 458 459 460 461 462 463 464 465 466
      pMsg->code = terrno;
      if (pMsg->contLen > 0) {
        rpcFreeCont(pMsg->pCont);
        pMsg->pCont = NULL;
        pMsg->contLen = 0;
      }
      dError("node:%s, failed to push %s msg:%p type:%d handle:%p then discard data and return error", proc->name,
             dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle);
    } else {
      dError("node:%s, failed to push %s msg:%p type:%d handle:%p len:%d since %s, retry:%d", proc->name,
             dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, terrstr(), retry);
      retry++;
      taosMsleep(retry);
    }
S
shm  
Shengliang Guan 已提交
467
  }
468 469 470 471

  rpcFreeCont(pMsg->pCont);
  pMsg->pCont = NULL;
  pMsg->contLen = 0;
S
shm  
Shengliang Guan 已提交
472
}
S
Shengliang Guan 已提交
473

474
int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
475 476 477 478 479 480 481
  int32_t code = dmPushToProcQueue(proc, proc->cqueue, pMsg, ftype);
  if (code == 0) {
    dTrace("msg:%p, is freed after push to cqueue", pMsg);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
  }
  return code;
S
Shengliang Guan 已提交
482
}