dmProc.c 16.0 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmMgmt.h"
S
shm  
Shengliang Guan 已提交
18

19
static inline int32_t CEIL8(int32_t v) { return ceil((float)(v) / 8) * 8; }
S
shm  
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
static int32_t dmInitProcMutex(SProcQueue *queue) {
S
Shengliang Guan 已提交
22
  TdThreadMutexAttr mattr = {0};
S
shm  
Shengliang Guan 已提交
23

wafwerar's avatar
wafwerar 已提交
24
  if (taosThreadMutexAttrInit(&mattr) != 0) {
S
shm  
Shengliang Guan 已提交
25
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
26
    dError("node:%s, failed to init mutex while init attr since %s", queue->name, terrstr());
S
shm  
Shengliang Guan 已提交
27
    return -1;
S
shm  
Shengliang Guan 已提交
28 29
  }

wafwerar's avatar
wafwerar 已提交
30
  if (taosThreadMutexAttrSetPshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) {
31
    taosThreadMutexAttrDestroy(&mattr);
S
shm  
Shengliang Guan 已提交
32
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
33
    dError("node:%s, failed to init mutex while set shared since %s", queue->name, terrstr());
S
shm  
Shengliang Guan 已提交
34
    return -1;
S
shm  
Shengliang Guan 已提交
35 36
  }

S
Shengliang Guan 已提交
37
  if (taosThreadMutexInit(&queue->mutex, &mattr) != 0) {
S
Shengliang Guan 已提交
38
    taosThreadMutexAttrDestroy(&mattr);
S
shm  
Shengliang Guan 已提交
39
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
40
    dError("node:%s, failed to init mutex since %s", queue->name, terrstr());
S
shm  
Shengliang Guan 已提交
41
    return -1;
S
shm  
Shengliang Guan 已提交
42 43
  }

wafwerar's avatar
wafwerar 已提交
44
  taosThreadMutexAttrDestroy(&mattr);
S
shm  
Shengliang Guan 已提交
45
  return 0;
S
shm  
Shengliang Guan 已提交
46 47
}

S
Shengliang Guan 已提交
48 49
static int32_t dmInitProcSem(SProcQueue *queue) {
  if (tsem_init(&queue->sem, 1, 0) != 0) {
S
shm  
Shengliang Guan 已提交
50
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
51
    dError("node:%s, failed to init sem since %s", queue->name, terrstr());
S
shm  
Shengliang Guan 已提交
52 53 54
    return -1;
  }

S
shm  
Shengliang Guan 已提交
55
  return 0;
S
shm  
Shengliang Guan 已提交
56 57
}

S
Shengliang Guan 已提交
58 59 60
static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) {
  SProcQueue *queue = (SProcQueue *)(ptr);

S
shm  
Shengliang Guan 已提交
61 62 63 64
  int32_t bufSize = size - CEIL8(sizeof(SProcQueue));
  if (bufSize <= 1024) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
S
shm  
Shengliang Guan 已提交
65 66
  }

67
  if (proc->ptype & DND_PROC_PARENT) {
S
Shengliang Guan 已提交
68
    if (dmInitProcMutex(queue) != 0) {
S
shm  
Shengliang Guan 已提交
69 70
      return NULL;
    }
S
shm  
Shengliang Guan 已提交
71

S
Shengliang Guan 已提交
72
    if (dmInitProcSem(queue) != 0) {
S
shm  
Shengliang Guan 已提交
73 74
      return NULL;
    }
S
shm  
Shengliang Guan 已提交
75

S
Shengliang Guan 已提交
76 77 78 79 80 81
    tstrncpy(queue->name, proc->name, sizeof(queue->name));
    queue->head = 0;
    queue->tail = 0;
    queue->total = bufSize;
    queue->avail = bufSize;
    queue->items = 0;
S
shm  
Shengliang Guan 已提交
82 83
  }

S
Shengliang Guan 已提交
84
  return queue;
S
shm  
Shengliang Guan 已提交
85 86
}

87
static void dmCleanupProcQueue(SProcQueue *queue) {}
S
shm  
Shengliang Guan 已提交
88

89 90 91 92 93 94
static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) {
  const void   *pHead = pMsg;
  const void   *pBody = pMsg->pCont;
  const int16_t rawHeadLen = sizeof(SRpcMsg);
  const int32_t rawBodyLen = pMsg->contLen;
  const int16_t headLen = CEIL8(rawHeadLen);
S
shm  
Shengliang Guan 已提交
95 96
  const int32_t bodyLen = CEIL8(rawBodyLen);
  const int32_t fullLen = headLen + bodyLen + 8;
97
  const int64_t handle = (int64_t)pMsg->info.handle;
S
shm  
Shengliang Guan 已提交
98

S
Shengliang Guan 已提交
99 100 101
  taosThreadMutexLock(&queue->mutex);
  if (fullLen > queue->avail) {
    taosThreadMutexUnlock(&queue->mutex);
S
shm  
Shengliang Guan 已提交
102 103 104 105
    terrno = TSDB_CODE_OUT_OF_SHM_MEM;
    return -1;
  }

106
  if (ftype == DND_FUNC_REQ && IsReq(pMsg) && pMsg->code == 0 && handle != 0 && pMsg->info.noResp == 0) {
107
    if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &pMsg->info, sizeof(SRpcConnInfo)) != 0) {
S
Shengliang Guan 已提交
108
      taosThreadMutexUnlock(&queue->mutex);
109 110 111 112
      return -1;
    }
  }

S
Shengliang Guan 已提交
113 114 115 116 117
  const int32_t pos = queue->tail;
  if (queue->tail < queue->total) {
    *(int16_t *)(queue->pBuffer + queue->tail) = rawHeadLen;
    *(int8_t *)(queue->pBuffer + queue->tail + 2) = (int8_t)ftype;
    *(int32_t *)(queue->pBuffer + queue->tail + 4) = rawBodyLen;
S
shm  
Shengliang Guan 已提交
118
  } else {
S
Shengliang Guan 已提交
119 120 121
    *(int16_t *)(queue->pBuffer) = rawHeadLen;
    *(int8_t *)(queue->pBuffer + 2) = (int8_t)ftype;
    *(int32_t *)(queue->pBuffer + 4) = rawBodyLen;
S
shm  
Shengliang Guan 已提交
122 123
  }

S
Shengliang Guan 已提交
124 125
  if (queue->tail < queue->head) {
    memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
126
    if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, rawBodyLen);
S
Shengliang Guan 已提交
127
    queue->tail = queue->tail + 8 + headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
128
  } else {
S
Shengliang Guan 已提交
129
    int32_t remain = queue->total - queue->tail;
S
shm  
Shengliang Guan 已提交
130
    if (remain == 0) {
S
Shengliang Guan 已提交
131
      memcpy(queue->pBuffer + 8, pHead, rawHeadLen);
132
      if (rawBodyLen > 0) memcpy(queue->pBuffer + 8 + headLen, pBody, rawBodyLen);
S
Shengliang Guan 已提交
133
      queue->tail = 8 + headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
134
    } else if (remain == 8) {
S
Shengliang Guan 已提交
135
      memcpy(queue->pBuffer, pHead, rawHeadLen);
136
      if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen, pBody, rawBodyLen);
S
Shengliang Guan 已提交
137
      queue->tail = headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
138
    } else if (remain < 8 + headLen) {
S
Shengliang Guan 已提交
139
      memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8);
wafwerar's avatar
wafwerar 已提交
140
      memcpy(queue->pBuffer, (char*)pHead + remain - 8, rawHeadLen - (remain - 8));
141
      if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
S
Shengliang Guan 已提交
142
      queue->tail = headLen - (remain - 8) + bodyLen;
S
Shengliang Guan 已提交
143
    } else if (remain < 8 + headLen + bodyLen) {
S
Shengliang Guan 已提交
144
      memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
145
      if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen);
wafwerar's avatar
wafwerar 已提交
146
      if (rawBodyLen > 0) memcpy(queue->pBuffer, (char*)pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
S
Shengliang Guan 已提交
147
      queue->tail = bodyLen - (remain - 8 - headLen);
S
shm  
Shengliang Guan 已提交
148
    } else {
S
Shengliang Guan 已提交
149
      memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
150
      if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + headLen + 8, pBody, rawBodyLen);
S
Shengliang Guan 已提交
151
      queue->tail = queue->tail + headLen + bodyLen + 8;
S
shm  
Shengliang Guan 已提交
152 153 154
    }
  }

S
Shengliang Guan 已提交
155 156 157 158
  queue->avail -= fullLen;
  queue->items++;
  taosThreadMutexUnlock(&queue->mutex);
  tsem_post(&queue->sem);
S
shm  
Shengliang Guan 已提交
159

160 161
  dTrace("node:%s, push %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype),
         pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, pMsg->code, pos, queue->items);
S
shm  
Shengliang Guan 已提交
162 163 164
  return 0;
}

S
Shengliang Guan 已提交
165
static inline int32_t dmPopFromProcQueue(SProcQueue *queue, SRpcMsg **ppMsg, EProcFuncType *pFuncType) {
S
Shengliang Guan 已提交
166
  tsem_wait(&queue->sem);
S
shm  
Shengliang Guan 已提交
167

S
Shengliang Guan 已提交
168 169 170
  taosThreadMutexLock(&queue->mutex);
  if (queue->total - queue->avail <= 0) {
    taosThreadMutexUnlock(&queue->mutex);
S
shm  
Shengliang Guan 已提交
171
    terrno = TSDB_CODE_OUT_OF_SHM_MEM;
S
shm  
Shengliang Guan 已提交
172
    return 0;
S
shm  
Shengliang Guan 已提交
173 174
  }

S
Shengliang Guan 已提交
175
  int16_t rawHeadLen = 0;
S
Shengliang Guan 已提交
176
  int8_t  ftype = 0;
S
Shengliang Guan 已提交
177
  int32_t rawBodyLen = 0;
S
Shengliang Guan 已提交
178 179 180 181
  if (queue->head < queue->total) {
    rawHeadLen = *(int16_t *)(queue->pBuffer + queue->head);
    ftype = *(int8_t *)(queue->pBuffer + queue->head + 2);
    rawBodyLen = *(int32_t *)(queue->pBuffer + queue->head + 4);
S
shm  
Shengliang Guan 已提交
182
  } else {
S
Shengliang Guan 已提交
183 184 185
    rawHeadLen = *(int16_t *)(queue->pBuffer);
    ftype = *(int8_t *)(queue->pBuffer + 2);
    rawBodyLen = *(int32_t *)(queue->pBuffer + 4);
S
shm  
Shengliang Guan 已提交
186
  }
S
Shengliang Guan 已提交
187
  int16_t headLen = CEIL8(rawHeadLen);
S
Shengliang Guan 已提交
188
  int32_t bodyLen = CEIL8(rawBodyLen);
S
shm  
Shengliang Guan 已提交
189

S
Shengliang Guan 已提交
190
  void *pHead = taosAllocateQitem(headLen, DEF_QITEM);
191 192 193
  void *pBody = NULL;
  if (bodyLen > 0) pBody = rpcMallocCont(bodyLen);
  if (pHead == NULL || (bodyLen > 0 && pBody == NULL)) {
S
Shengliang Guan 已提交
194 195 196 197
    taosThreadMutexUnlock(&queue->mutex);
    tsem_post(&queue->sem);
    taosFreeQitem(pHead);
    rpcFreeCont(pBody);
S
shm  
Shengliang Guan 已提交
198 199 200 201
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
202 203 204
  const int32_t pos = queue->head;
  if (queue->head < queue->tail) {
    memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
205
    if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, bodyLen);
S
Shengliang Guan 已提交
206
    queue->head = queue->head + 8 + headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
207
  } else {
S
Shengliang Guan 已提交
208
    int32_t remain = queue->total - queue->head;
S
shm  
Shengliang Guan 已提交
209
    if (remain == 0) {
S
Shengliang Guan 已提交
210
      memcpy(pHead, queue->pBuffer + 8, headLen);
211
      if (bodyLen > 0) memcpy(pBody, queue->pBuffer + 8 + headLen, bodyLen);
S
Shengliang Guan 已提交
212
      queue->head = 8 + headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
213
    } else if (remain == 8) {
S
Shengliang Guan 已提交
214
      memcpy(pHead, queue->pBuffer, headLen);
215
      if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen, bodyLen);
S
Shengliang Guan 已提交
216
      queue->head = headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
217
    } else if (remain < 8 + headLen) {
S
Shengliang Guan 已提交
218 219
      memcpy(pHead, queue->pBuffer + queue->head + 8, remain - 8);
      memcpy((char *)pHead + remain - 8, queue->pBuffer, headLen - (remain - 8));
220
      if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen - (remain - 8), bodyLen);
S
Shengliang Guan 已提交
221
      queue->head = headLen - (remain - 8) + bodyLen;
S
Shengliang Guan 已提交
222
    } else if (remain < 8 + headLen + bodyLen) {
S
Shengliang Guan 已提交
223
      memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
224 225
      if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, remain - 8 - headLen);
      if (bodyLen > 0) memcpy((char *)pBody + remain - 8 - headLen, queue->pBuffer, bodyLen - (remain - 8 - headLen));
S
Shengliang Guan 已提交
226
      queue->head = bodyLen - (remain - 8 - headLen);
S
shm  
Shengliang Guan 已提交
227
    } else {
S
Shengliang Guan 已提交
228
      memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
229
      if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + headLen + 8, bodyLen);
S
Shengliang Guan 已提交
230
      queue->head = queue->head + headLen + bodyLen + 8;
S
shm  
Shengliang Guan 已提交
231 232 233
    }
  }

S
Shengliang Guan 已提交
234 235 236
  queue->avail = queue->avail + headLen + bodyLen + 8;
  queue->items--;
  taosThreadMutexUnlock(&queue->mutex);
S
shm  
Shengliang Guan 已提交
237

238 239
  *ppMsg = pHead;
  (*ppMsg)->pCont = pBody;
S
Shengliang Guan 已提交
240
  *pFuncType = (EProcFuncType)ftype;
S
shm  
Shengliang Guan 已提交
241

242 243
  dTrace("node:%s, pop %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype),
         (*ppMsg), (*ppMsg)->msgType, (*ppMsg)->info.handle, (*ppMsg)->contLen, (*ppMsg)->code, pos, queue->items);
S
shm  
Shengliang Guan 已提交
244
  return 1;
S
shm  
Shengliang Guan 已提交
245 246
}

S
Shengliang Guan 已提交
247 248
int32_t dmInitProc(struct SMgmtWrapper *pWrapper) {
  SProc *proc = &pWrapper->proc;
249 250
  if (proc->name != NULL) return 0;

S
Shengliang Guan 已提交
251 252
  proc->wrapper = pWrapper;
  proc->name = pWrapper->name;
S
shm  
Shengliang Guan 已提交
253

S
Shengliang Guan 已提交
254
  SShm   *shm = &proc->shm;
S
shm  
Shengliang Guan 已提交
255
  int32_t cstart = 0;
S
Shengliang Guan 已提交
256
  int32_t csize = CEIL8(shm->size / 2);
S
shm  
Shengliang Guan 已提交
257
  int32_t pstart = csize;
S
Shengliang Guan 已提交
258 259
  int32_t psize = CEIL8(shm->size - pstart);
  if (pstart + psize > shm->size) {
S
shm  
Shengliang Guan 已提交
260 261 262
    psize -= 8;
  }

S
Shengliang Guan 已提交
263 264 265 266 267 268 269 270
  proc->hash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  proc->cqueue = dmInitProcQueue(proc, (char *)shm->ptr + cstart, csize);
  proc->pqueue = dmInitProcQueue(proc, (char *)shm->ptr + pstart, psize);
  if (proc->cqueue == NULL || proc->pqueue == NULL || proc->hash == NULL) {
    dmCleanupProcQueue(proc->cqueue);
    dmCleanupProcQueue(proc->pqueue);
    taosHashCleanup(proc->hash);
    return -1;
S
shm  
Shengliang Guan 已提交
271 272
  }

S
Shengliang Guan 已提交
273 274
  dDebug("node:%s, proc is initialized, cqueue:%p pqueue:%p", proc->name, proc->cqueue, proc->pqueue);
  return 0;
S
shm  
Shengliang Guan 已提交
275 276
}

S
Shengliang Guan 已提交
277 278 279 280 281 282
static void *dmConsumChildQueue(void *param) {
  SProc        *proc = param;
  SMgmtWrapper *pWrapper = proc->wrapper;
  SProcQueue   *queue = proc->cqueue;
  int32_t       numOfMsgs = 0;
  int32_t       code = 0;
S
Shengliang Guan 已提交
283
  EProcFuncType ftype = DND_FUNC_REQ;
284
  SRpcMsg      *pMsg = NULL;
S
Shengliang Guan 已提交
285

S
Shengliang Guan 已提交
286
  dDebug("node:%s, start to consume from cqueue", proc->name);
S
Shengliang Guan 已提交
287
  do {
288
    numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype);
S
Shengliang Guan 已提交
289
    if (numOfMsgs == 0) {
290
      dDebug("node:%s, get no msg from cqueue and exit thread", proc->name);
S
Shengliang Guan 已提交
291 292 293 294
      break;
    }

    if (numOfMsgs < 0) {
S
Shengliang Guan 已提交
295
      dError("node:%s, get no msg from cqueue since %s", proc->name, terrstr());
S
Shengliang Guan 已提交
296 297 298 299
      taosMsleep(1);
      continue;
    }

S
Shengliang Guan 已提交
300
    if (ftype != DND_FUNC_REQ) {
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
      dError("node:%s, invalid ftype:%d from cqueue", proc->name, ftype);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

    code = dmProcessNodeMsg(pWrapper, pMsg);
    if (code != 0) {
      dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pMsg, terrstr());
      SRpcMsg rsp = {
          .code = (terrno != 0 ? terrno : code),
          .pCont = pMsg->info.rsp,
          .contLen = pMsg->info.rspLen,
          .info = pMsg->info,
      };
      dmPutToProcPQueue(proc, &rsp, DND_FUNC_RSP);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
319 320
    }
  } while (1);
S
shm  
Shengliang Guan 已提交
321

S
Shengliang Guan 已提交
322 323
  return NULL;
}
S
shm  
Shengliang Guan 已提交
324

S
Shengliang Guan 已提交
325 326 327 328 329 330
static void *dmConsumParentQueue(void *param) {
  SProc        *proc = param;
  SMgmtWrapper *pWrapper = proc->wrapper;
  SProcQueue   *queue = proc->pqueue;
  int32_t       numOfMsgs = 0;
  int32_t       code = 0;
S
Shengliang Guan 已提交
331
  EProcFuncType ftype = DND_FUNC_REQ;
332
  SRpcMsg      *pMsg = NULL;
S
Shengliang Guan 已提交
333

S
Shengliang Guan 已提交
334
  dDebug("node:%s, start to consume from pqueue", proc->name);
S
Shengliang Guan 已提交
335
  do {
336
    numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype);
S
shm  
Shengliang Guan 已提交
337
    if (numOfMsgs == 0) {
S
Shengliang Guan 已提交
338
      dDebug("node:%s, get no msg from pqueue and exit thread", proc->name);
S
shm  
Shengliang Guan 已提交
339
      break;
S
Shengliang Guan 已提交
340 341 342
    }

    if (numOfMsgs < 0) {
S
Shengliang Guan 已提交
343
      dError("node:%s, get no msg from pqueue since %s", proc->name, terrstr());
S
shm  
Shengliang Guan 已提交
344 345
      taosMsleep(1);
      continue;
S
Shengliang Guan 已提交
346 347
    }

S
Shengliang Guan 已提交
348
    if (ftype == DND_FUNC_RSP) {
349 350
      dmRemoveProcRpcHandle(proc, pMsg->info.handle);
      rpcSendResponse(pMsg);
S
Shengliang Guan 已提交
351
    } else if (ftype == DND_FUNC_REGIST) {
352
      rpcRegisterBrokenLinkArg(pMsg);
S
Shengliang Guan 已提交
353
    } else if (ftype == DND_FUNC_RELEASE) {
354 355
      dmRemoveProcRpcHandle(proc, pMsg->info.handle);
      rpcReleaseHandle(pMsg->info.handle, (int8_t)pMsg->code);
S
shm  
Shengliang Guan 已提交
356
    } else {
357 358
      dError("node:%s, invalid ftype:%d from pqueue", proc->name, ftype);
      rpcFreeCont(pMsg->pCont);
S
shm  
Shengliang Guan 已提交
359
    }
S
Shengliang Guan 已提交
360

361
    taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
362 363 364
  } while (1);

  return NULL;
S
shm  
Shengliang Guan 已提交
365 366
}

S
Shengliang Guan 已提交
367 368
int32_t dmRunProc(SProc *proc) {
  TdThreadAttr thAttr = {0};
S
Shengliang Guan 已提交
369 370
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
shm  
Shengliang Guan 已提交
371

372
  if (proc->ptype & DND_PROC_PARENT) {
S
Shengliang Guan 已提交
373 374 375 376 377 378 379 380
    if (taosThreadCreate(&proc->pthread, &thAttr, dmConsumParentQueue, proc) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      dError("node:%s, failed to create pthread since %s", proc->name, terrstr());
      return -1;
    }
    dDebug("node:%s, thread:%" PRId64 " is created to consume pqueue", proc->name, proc->pthread);
  }

381
  if (proc->ptype & DND_PROC_CHILD) {
S
Shengliang Guan 已提交
382 383 384 385 386 387
    if (taosThreadCreate(&proc->cthread, &thAttr, dmConsumChildQueue, proc) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      dError("node:%s, failed to create cthread since %s", proc->name, terrstr());
      return -1;
    }
    dDebug("node:%s, thread:%" PRId64 " is created to consume cqueue", proc->name, proc->cthread);
S
shm  
Shengliang Guan 已提交
388 389
  }

S
Shengliang Guan 已提交
390
  taosThreadAttrDestroy(&thAttr);
S
shm  
Shengliang Guan 已提交
391 392 393
  return 0;
}

S
Shengliang Guan 已提交
394
void dmStopProc(SProc *proc) {
395
  proc->stop = true;
S
Shengliang Guan 已提交
396 397
  if (taosCheckPthreadValid(proc->pthread)) {
    dDebug("node:%s, start to join pthread:%" PRId64, proc->name, proc->pthread);
398
    tsem_post(&proc->pqueue->sem);
S
Shengliang Guan 已提交
399 400
    taosThreadJoin(proc->pthread, NULL);
    taosThreadClear(&proc->pthread);
S
shm  
Shengliang Guan 已提交
401
  }
S
shm  
Shengliang Guan 已提交
402

S
Shengliang Guan 已提交
403 404
  if (taosCheckPthreadValid(proc->cthread)) {
    dDebug("node:%s, start to join cthread:%" PRId64, proc->name, proc->cthread);
405
    tsem_post(&proc->cqueue->sem);
S
Shengliang Guan 已提交
406 407
    taosThreadJoin(proc->cthread, NULL);
    taosThreadClear(&proc->cthread);
S
shm  
Shengliang Guan 已提交
408 409 410
  }
}

S
Shengliang Guan 已提交
411 412
void dmCleanupProc(struct SMgmtWrapper *pWrapper) {
  SProc *proc = &pWrapper->proc;
413
  if (proc->name == NULL) return;
S
Shengliang Guan 已提交
414

S
Shengliang Guan 已提交
415
  dDebug("node:%s, start to cleanup proc", pWrapper->name);
S
Shengliang Guan 已提交
416 417 418 419
  dmStopProc(proc);
  dmCleanupProcQueue(proc->cqueue);
  dmCleanupProcQueue(proc->pqueue);
  taosHashCleanup(proc->hash);
420
  proc->hash = NULL;
S
Shengliang Guan 已提交
421
  dDebug("node:%s, proc is cleaned up", pWrapper->name);
422 423
}

424
void dmRemoveProcRpcHandle(SProc *proc, void *handle) {
425
  int64_t h = (int64_t)handle;
S
Shengliang Guan 已提交
426 427 428
  taosThreadMutexLock(&proc->cqueue->mutex);
  taosHashRemove(proc->hash, &h, sizeof(int64_t));
  taosThreadMutexUnlock(&proc->cqueue->mutex);
429 430
}

S
Shengliang Guan 已提交
431 432
void dmCloseProcRpcHandles(SProc *proc) {
  taosThreadMutexLock(&proc->cqueue->mutex);
433 434 435
  SRpcHandleInfo *pInfo = taosHashIterate(proc->hash, NULL);
  while (pInfo != NULL) {
    dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, pInfo->handle);
436
    SRpcMsg rpcMsg = {.code = TSDB_CODE_NODE_OFFLINE, .info = *pInfo};
S
Shengliang Guan 已提交
437
    rpcSendResponse(&rpcMsg);
438
    pInfo = taosHashIterate(proc->hash, pInfo);
439
  }
S
Shengliang Guan 已提交
440 441
  taosHashClear(proc->hash);
  taosThreadMutexUnlock(&proc->cqueue->mutex);
S
shm  
Shengliang Guan 已提交
442
}
S
shm  
Shengliang Guan 已提交
443

444
void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
S
Shengliang Guan 已提交
445
  int32_t retry = 0;
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
  while (1) {
    if (dmPushToProcQueue(proc, proc->pqueue, pMsg, ftype) == 0) {
      break;
    }

    if (retry == 10) {
      pMsg->code = terrno;
      if (pMsg->contLen > 0) {
        rpcFreeCont(pMsg->pCont);
        pMsg->pCont = NULL;
        pMsg->contLen = 0;
      }
      dError("node:%s, failed to push %s msg:%p type:%d handle:%p then discard data and return error", proc->name,
             dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle);
    } else {
      dError("node:%s, failed to push %s msg:%p type:%d handle:%p len:%d since %s, retry:%d", proc->name,
             dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, terrstr(), retry);
      retry++;
      taosMsleep(retry);
    }
S
shm  
Shengliang Guan 已提交
466
  }
S
shm  
Shengliang Guan 已提交
467
}
S
Shengliang Guan 已提交
468

469 470
int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
  return dmPushToProcQueue(proc, proc->cqueue, pMsg, ftype);
S
Shengliang Guan 已提交
471
}