dmProc.c 16.9 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmMgmt.h"
S
shm  
Shengliang Guan 已提交
18

S
shm  
Shengliang Guan 已提交
19 20 21 22 23
static inline int32_t CEIL8(int32_t v) {
  const int32_t c = ceil((float)(v) / 8) * 8;
  return c < 8 ? 8 : c;
}

S
Shengliang Guan 已提交
24
static int32_t dmInitProcMutex(SProcQueue *queue) {
S
Shengliang Guan 已提交
25
  TdThreadMutexAttr mattr = {0};
S
shm  
Shengliang Guan 已提交
26

wafwerar's avatar
wafwerar 已提交
27
  if (taosThreadMutexAttrInit(&mattr) != 0) {
S
shm  
Shengliang Guan 已提交
28
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
29
    dError("node:%s, failed to init mutex while init attr since %s", queue->name, terrstr());
S
shm  
Shengliang Guan 已提交
30
    return -1;
S
shm  
Shengliang Guan 已提交
31 32
  }

wafwerar's avatar
wafwerar 已提交
33
  if (taosThreadMutexAttrSetPshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) {
34
    taosThreadMutexAttrDestroy(&mattr);
S
shm  
Shengliang Guan 已提交
35
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
36
    dError("node:%s, failed to init mutex while set shared since %s", queue->name, terrstr());
S
shm  
Shengliang Guan 已提交
37
    return -1;
S
shm  
Shengliang Guan 已提交
38 39
  }

S
Shengliang Guan 已提交
40
  if (taosThreadMutexInit(&queue->mutex, &mattr) != 0) {
S
Shengliang Guan 已提交
41
    taosThreadMutexAttrDestroy(&mattr);
S
shm  
Shengliang Guan 已提交
42
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
43
    dError("node:%s, failed to init mutex since %s", queue->name, terrstr());
S
shm  
Shengliang Guan 已提交
44
    return -1;
S
shm  
Shengliang Guan 已提交
45 46
  }

wafwerar's avatar
wafwerar 已提交
47
  taosThreadMutexAttrDestroy(&mattr);
S
shm  
Shengliang Guan 已提交
48
  return 0;
S
shm  
Shengliang Guan 已提交
49 50
}

S
Shengliang Guan 已提交
51 52
static int32_t dmInitProcSem(SProcQueue *queue) {
  if (tsem_init(&queue->sem, 1, 0) != 0) {
S
shm  
Shengliang Guan 已提交
53
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
54
    dError("node:%s, failed to init sem since %s", queue->name, terrstr());
S
shm  
Shengliang Guan 已提交
55 56 57
    return -1;
  }

S
shm  
Shengliang Guan 已提交
58
  return 0;
S
shm  
Shengliang Guan 已提交
59 60
}

S
Shengliang Guan 已提交
61 62 63
static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) {
  SProcQueue *queue = (SProcQueue *)(ptr);

S
shm  
Shengliang Guan 已提交
64 65 66 67
  int32_t bufSize = size - CEIL8(sizeof(SProcQueue));
  if (bufSize <= 1024) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
S
shm  
Shengliang Guan 已提交
68 69
  }

70
  if (proc->ptype & DND_PROC_PARENT) {
S
Shengliang Guan 已提交
71
    if (dmInitProcMutex(queue) != 0) {
S
shm  
Shengliang Guan 已提交
72 73
      return NULL;
    }
S
shm  
Shengliang Guan 已提交
74

S
Shengliang Guan 已提交
75
    if (dmInitProcSem(queue) != 0) {
S
shm  
Shengliang Guan 已提交
76 77
      return NULL;
    }
S
shm  
Shengliang Guan 已提交
78

S
Shengliang Guan 已提交
79 80 81 82 83 84
    tstrncpy(queue->name, proc->name, sizeof(queue->name));
    queue->head = 0;
    queue->tail = 0;
    queue->total = bufSize;
    queue->avail = bufSize;
    queue->items = 0;
S
shm  
Shengliang Guan 已提交
85 86
  }

S
Shengliang Guan 已提交
87
  return queue;
S
shm  
Shengliang Guan 已提交
88 89 90
}

#if 0
S
Shengliang Guan 已提交
91 92 93 94
static void dmDestroyProcQueue(SProcQueue *queue) {
  if (queue->mutex != NULL) {
    taosThreadMutexDestroy(queue->mutex);
    queue->mutex = NULL;
S
shm  
Shengliang Guan 已提交
95
  }
S
shm  
Shengliang Guan 已提交
96
}
S
shm  
Shengliang Guan 已提交
97

S
Shengliang Guan 已提交
98 99 100 101
static void dmDestroyProcSem(SProcQueue *queue) {
  if (queue->sem != NULL) {
    tsem_destroy(queue->sem);
    queue->sem = NULL;
S
shm  
Shengliang Guan 已提交
102
  }
S
shm  
Shengliang Guan 已提交
103
}
S
shm  
Shengliang Guan 已提交
104
#endif
S
shm  
Shengliang Guan 已提交
105

S
Shengliang Guan 已提交
106
static void dmCleanupProcQueue(SProcQueue *queue) {
S
shm  
Shengliang Guan 已提交
107
#if 0  
S
Shengliang Guan 已提交
108 109 110
  if (queue != NULL) {
    dmDestroyProcQueue(queue);
    dmDestroyProcSem(queue);
S
shm  
Shengliang Guan 已提交
111
  }
S
shm  
Shengliang Guan 已提交
112
#endif
S
shm  
Shengliang Guan 已提交
113
}
S
shm  
Shengliang Guan 已提交
114

S
Shengliang Guan 已提交
115
static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHead, int16_t rawHeadLen,
116 117
                                 const char *pBody, int32_t rawBodyLen, int64_t handle, int64_t handleRef,
                                 EProcFuncType ftype) {
S
Shengliang Guan 已提交
118 119 120 121 122
  if (rawHeadLen == 0 || pHead == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    return -1;
  }

S
shm  
Shengliang Guan 已提交
123 124 125
  const int32_t headLen = CEIL8(rawHeadLen);
  const int32_t bodyLen = CEIL8(rawBodyLen);
  const int32_t fullLen = headLen + bodyLen + 8;
S
shm  
Shengliang Guan 已提交
126

S
Shengliang Guan 已提交
127 128 129
  taosThreadMutexLock(&queue->mutex);
  if (fullLen > queue->avail) {
    taosThreadMutexUnlock(&queue->mutex);
S
shm  
Shengliang Guan 已提交
130 131 132 133
    terrno = TSDB_CODE_OUT_OF_SHM_MEM;
    return -1;
  }

S
Shengliang Guan 已提交
134
  if (handle != 0 && ftype == DND_FUNC_REQ) {
S
Shengliang Guan 已提交
135 136
    if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) {
      taosThreadMutexUnlock(&queue->mutex);
137 138 139 140
      return -1;
    }
  }

S
Shengliang Guan 已提交
141 142 143 144 145
  const int32_t pos = queue->tail;
  if (queue->tail < queue->total) {
    *(int16_t *)(queue->pBuffer + queue->tail) = rawHeadLen;
    *(int8_t *)(queue->pBuffer + queue->tail + 2) = (int8_t)ftype;
    *(int32_t *)(queue->pBuffer + queue->tail + 4) = rawBodyLen;
S
shm  
Shengliang Guan 已提交
146
  } else {
S
Shengliang Guan 已提交
147 148 149
    *(int16_t *)(queue->pBuffer) = rawHeadLen;
    *(int8_t *)(queue->pBuffer + 2) = (int8_t)ftype;
    *(int32_t *)(queue->pBuffer + 4) = rawBodyLen;
S
shm  
Shengliang Guan 已提交
150 151
  }

S
Shengliang Guan 已提交
152 153 154 155
  if (queue->tail < queue->head) {
    memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
    memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, rawBodyLen);
    queue->tail = queue->tail + 8 + headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
156
  } else {
S
Shengliang Guan 已提交
157
    int32_t remain = queue->total - queue->tail;
S
shm  
Shengliang Guan 已提交
158
    if (remain == 0) {
S
Shengliang Guan 已提交
159 160 161
      memcpy(queue->pBuffer + 8, pHead, rawHeadLen);
      memcpy(queue->pBuffer + 8 + headLen, pBody, rawBodyLen);
      queue->tail = 8 + headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
162
    } else if (remain == 8) {
S
Shengliang Guan 已提交
163 164 165
      memcpy(queue->pBuffer, pHead, rawHeadLen);
      memcpy(queue->pBuffer + headLen, pBody, rawBodyLen);
      queue->tail = headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
166
    } else if (remain < 8 + headLen) {
S
Shengliang Guan 已提交
167 168 169 170
      memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8);
      memcpy(queue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8));
      memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
      queue->tail = headLen - (remain - 8) + bodyLen;
S
Shengliang Guan 已提交
171
    } else if (remain < 8 + headLen + bodyLen) {
S
Shengliang Guan 已提交
172 173 174 175
      memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
      memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen);
      memcpy(queue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
      queue->tail = bodyLen - (remain - 8 - headLen);
S
shm  
Shengliang Guan 已提交
176
    } else {
S
Shengliang Guan 已提交
177 178 179
      memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
      memcpy(queue->pBuffer + queue->tail + headLen + 8, pBody, rawBodyLen);
      queue->tail = queue->tail + headLen + bodyLen + 8;
S
shm  
Shengliang Guan 已提交
180 181 182
    }
  }

S
Shengliang Guan 已提交
183 184 185 186
  queue->avail -= fullLen;
  queue->items++;
  taosThreadMutexUnlock(&queue->mutex);
  tsem_post(&queue->sem);
S
shm  
Shengliang Guan 已提交
187

188
  dTrace("node:%s, push %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen,
189
         pBody, bodyLen, pos, queue->items);
S
shm  
Shengliang Guan 已提交
190 191 192
  return 0;
}

S
Shengliang Guan 已提交
193 194 195
static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHeadLen, void **ppBody, int32_t *pBodyLen,
                                  EProcFuncType *pFuncType) {
  tsem_wait(&queue->sem);
S
shm  
Shengliang Guan 已提交
196

S
Shengliang Guan 已提交
197 198 199
  taosThreadMutexLock(&queue->mutex);
  if (queue->total - queue->avail <= 0) {
    taosThreadMutexUnlock(&queue->mutex);
S
shm  
Shengliang Guan 已提交
200
    terrno = TSDB_CODE_OUT_OF_SHM_MEM;
S
shm  
Shengliang Guan 已提交
201
    return 0;
S
shm  
Shengliang Guan 已提交
202 203
  }

S
Shengliang Guan 已提交
204
  int16_t rawHeadLen = 0;
S
Shengliang Guan 已提交
205
  int8_t  ftype = 0;
S
Shengliang Guan 已提交
206
  int32_t rawBodyLen = 0;
S
Shengliang Guan 已提交
207 208 209 210
  if (queue->head < queue->total) {
    rawHeadLen = *(int16_t *)(queue->pBuffer + queue->head);
    ftype = *(int8_t *)(queue->pBuffer + queue->head + 2);
    rawBodyLen = *(int32_t *)(queue->pBuffer + queue->head + 4);
S
shm  
Shengliang Guan 已提交
211
  } else {
S
Shengliang Guan 已提交
212 213 214
    rawHeadLen = *(int16_t *)(queue->pBuffer);
    ftype = *(int8_t *)(queue->pBuffer + 2);
    rawBodyLen = *(int32_t *)(queue->pBuffer + 4);
S
shm  
Shengliang Guan 已提交
215
  }
S
Shengliang Guan 已提交
216
  int16_t headLen = CEIL8(rawHeadLen);
S
Shengliang Guan 已提交
217
  int32_t bodyLen = CEIL8(rawBodyLen);
S
shm  
Shengliang Guan 已提交
218

S
Shengliang Guan 已提交
219 220
  void *pHead = taosAllocateQitem(headLen, DEF_QITEM);
  void *pBody = rpcMallocCont(bodyLen);
S
shm  
Shengliang Guan 已提交
221
  if (pHead == NULL || pBody == NULL) {
S
Shengliang Guan 已提交
222 223 224 225
    taosThreadMutexUnlock(&queue->mutex);
    tsem_post(&queue->sem);
    taosFreeQitem(pHead);
    rpcFreeCont(pBody);
S
shm  
Shengliang Guan 已提交
226 227 228 229
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
230 231 232 233 234
  const int32_t pos = queue->head;
  if (queue->head < queue->tail) {
    memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
    memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, bodyLen);
    queue->head = queue->head + 8 + headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
235
  } else {
S
Shengliang Guan 已提交
236
    int32_t remain = queue->total - queue->head;
S
shm  
Shengliang Guan 已提交
237
    if (remain == 0) {
S
Shengliang Guan 已提交
238 239 240
      memcpy(pHead, queue->pBuffer + 8, headLen);
      memcpy(pBody, queue->pBuffer + 8 + headLen, bodyLen);
      queue->head = 8 + headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
241
    } else if (remain == 8) {
S
Shengliang Guan 已提交
242 243 244
      memcpy(pHead, queue->pBuffer, headLen);
      memcpy(pBody, queue->pBuffer + headLen, bodyLen);
      queue->head = headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
245
    } else if (remain < 8 + headLen) {
S
Shengliang Guan 已提交
246 247 248 249
      memcpy(pHead, queue->pBuffer + queue->head + 8, remain - 8);
      memcpy((char *)pHead + remain - 8, queue->pBuffer, headLen - (remain - 8));
      memcpy(pBody, queue->pBuffer + headLen - (remain - 8), bodyLen);
      queue->head = headLen - (remain - 8) + bodyLen;
S
Shengliang Guan 已提交
250
    } else if (remain < 8 + headLen + bodyLen) {
S
Shengliang Guan 已提交
251 252 253 254
      memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
      memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, remain - 8 - headLen);
      memcpy((char *)pBody + remain - 8 - headLen, queue->pBuffer, bodyLen - (remain - 8 - headLen));
      queue->head = bodyLen - (remain - 8 - headLen);
S
shm  
Shengliang Guan 已提交
255
    } else {
S
Shengliang Guan 已提交
256 257 258
      memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
      memcpy(pBody, queue->pBuffer + queue->head + headLen + 8, bodyLen);
      queue->head = queue->head + headLen + bodyLen + 8;
S
shm  
Shengliang Guan 已提交
259 260 261
    }
  }

S
Shengliang Guan 已提交
262 263 264
  queue->avail = queue->avail + headLen + bodyLen + 8;
  queue->items--;
  taosThreadMutexUnlock(&queue->mutex);
S
shm  
Shengliang Guan 已提交
265

S
shm  
Shengliang Guan 已提交
266 267
  *ppHead = pHead;
  *ppBody = pBody;
S
Shengliang Guan 已提交
268 269
  *pHeadLen = rawHeadLen;
  *pBodyLen = rawBodyLen;
S
Shengliang Guan 已提交
270
  *pFuncType = (EProcFuncType)ftype;
S
shm  
Shengliang Guan 已提交
271

272 273
  dTrace("node:%s, pop %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen, pBody,
         bodyLen, pos, queue->items);
S
shm  
Shengliang Guan 已提交
274
  return 1;
S
shm  
Shengliang Guan 已提交
275 276
}

S
Shengliang Guan 已提交
277 278
int32_t dmInitProc(struct SMgmtWrapper *pWrapper) {
  SProc *proc = &pWrapper->proc;
279 280
  if (proc->name != NULL) return 0;

S
Shengliang Guan 已提交
281 282
  proc->wrapper = pWrapper;
  proc->name = pWrapper->name;
S
shm  
Shengliang Guan 已提交
283

S
Shengliang Guan 已提交
284
  SShm   *shm = &proc->shm;
S
shm  
Shengliang Guan 已提交
285
  int32_t cstart = 0;
S
Shengliang Guan 已提交
286
  int32_t csize = CEIL8(shm->size / 2);
S
shm  
Shengliang Guan 已提交
287
  int32_t pstart = csize;
S
Shengliang Guan 已提交
288 289
  int32_t psize = CEIL8(shm->size - pstart);
  if (pstart + psize > shm->size) {
S
shm  
Shengliang Guan 已提交
290 291 292
    psize -= 8;
  }

S
Shengliang Guan 已提交
293 294 295 296 297 298 299 300
  proc->hash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  proc->cqueue = dmInitProcQueue(proc, (char *)shm->ptr + cstart, csize);
  proc->pqueue = dmInitProcQueue(proc, (char *)shm->ptr + pstart, psize);
  if (proc->cqueue == NULL || proc->pqueue == NULL || proc->hash == NULL) {
    dmCleanupProcQueue(proc->cqueue);
    dmCleanupProcQueue(proc->pqueue);
    taosHashCleanup(proc->hash);
    return -1;
S
shm  
Shengliang Guan 已提交
301 302
  }

S
Shengliang Guan 已提交
303 304
  dDebug("node:%s, proc is initialized, cqueue:%p pqueue:%p", proc->name, proc->cqueue, proc->pqueue);
  return 0;
S
shm  
Shengliang Guan 已提交
305 306
}

S
Shengliang Guan 已提交
307 308 309 310 311 312 313 314 315 316
static void *dmConsumChildQueue(void *param) {
  SProc        *proc = param;
  SMgmtWrapper *pWrapper = proc->wrapper;
  SProcQueue   *queue = proc->cqueue;
  void         *pHead = NULL;
  void         *pBody = NULL;
  int16_t       headLen = 0;
  int32_t       bodyLen = 0;
  int32_t       numOfMsgs = 0;
  int32_t       code = 0;
S
Shengliang Guan 已提交
317
  EProcFuncType ftype = DND_FUNC_REQ;
318
  SRpcMsg      *pReq = NULL;
S
Shengliang Guan 已提交
319

S
Shengliang Guan 已提交
320
  dDebug("node:%s, start to consume from cqueue", proc->name);
S
Shengliang Guan 已提交
321 322 323
  do {
    numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
    if (numOfMsgs == 0) {
324
      dDebug("node:%s, get no msg from cqueue and exit thread", proc->name);
S
Shengliang Guan 已提交
325 326 327 328
      break;
    }

    if (numOfMsgs < 0) {
S
Shengliang Guan 已提交
329
      dError("node:%s, get no msg from cqueue since %s", proc->name, terrstr());
S
Shengliang Guan 已提交
330 331 332 333
      taosMsleep(1);
      continue;
    }

S
Shengliang Guan 已提交
334
    if (ftype != DND_FUNC_REQ) {
335
      dFatal("node:%s, get msg:%p from cqueue, invalid ftype:%d", proc->name, pHead, ftype);
S
Shengliang Guan 已提交
336 337 338 339
      taosFreeQitem(pHead);
      rpcFreeCont(pBody);
    } else {
      pReq = pHead;
S
Shengliang Guan 已提交
340
      pReq->pCont = pBody;
S
Shengliang Guan 已提交
341 342
      code = dmProcessNodeMsg(pWrapper, pReq);
      if (code != 0) {
S
Shengliang Guan 已提交
343
        dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pReq, terrstr());
S
Shengliang Guan 已提交
344
        SRpcMsg rspMsg = {
S
Shengliang Guan 已提交
345 346 347
            .info = pReq->info,
            .pCont = pReq->info.rsp,
            .contLen = pReq->info.rspLen,
S
Shengliang Guan 已提交
348
        };
S
Shengliang Guan 已提交
349
        dmPutToProcPQueue(proc, &rspMsg, sizeof(SRpcMsg), rspMsg.pCont, rspMsg.contLen, DND_FUNC_RSP);
S
Shengliang Guan 已提交
350 351 352 353 354 355
        taosFreeQitem(pHead);
        rpcFreeCont(pBody);
        rpcFreeCont(rspMsg.pCont);
      }
    }
  } while (1);
S
shm  
Shengliang Guan 已提交
356

S
Shengliang Guan 已提交
357 358
  return NULL;
}
S
shm  
Shengliang Guan 已提交
359

S
Shengliang Guan 已提交
360 361 362 363 364 365 366 367 368 369
static void *dmConsumParentQueue(void *param) {
  SProc        *proc = param;
  SMgmtWrapper *pWrapper = proc->wrapper;
  SProcQueue   *queue = proc->pqueue;
  void         *pHead = NULL;
  void         *pBody = NULL;
  int16_t       headLen = 0;
  int32_t       bodyLen = 0;
  int32_t       numOfMsgs = 0;
  int32_t       code = 0;
S
Shengliang Guan 已提交
370
  EProcFuncType ftype = DND_FUNC_REQ;
S
Shengliang Guan 已提交
371 372
  SRpcMsg      *pRsp = NULL;

S
Shengliang Guan 已提交
373
  dDebug("node:%s, start to consume from pqueue", proc->name);
S
Shengliang Guan 已提交
374 375
  do {
    numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
S
shm  
Shengliang Guan 已提交
376
    if (numOfMsgs == 0) {
S
Shengliang Guan 已提交
377
      dDebug("node:%s, get no msg from pqueue and exit thread", proc->name);
S
shm  
Shengliang Guan 已提交
378
      break;
S
Shengliang Guan 已提交
379 380 381
    }

    if (numOfMsgs < 0) {
S
Shengliang Guan 已提交
382
      dError("node:%s, get no msg from pqueue since %s", proc->name, terrstr());
S
shm  
Shengliang Guan 已提交
383 384
      taosMsleep(1);
      continue;
S
Shengliang Guan 已提交
385 386
    }

S
Shengliang Guan 已提交
387
    if (ftype == DND_FUNC_RSP) {
S
Shengliang Guan 已提交
388 389
      pRsp = pHead;
      pRsp->pCont = pBody;
S
Shengliang Guan 已提交
390 391
      dTrace("node:%s, get rsp msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->info.handle);
      dmRemoveProcRpcHandle(proc, pRsp->info.handle);
S
Shengliang Guan 已提交
392
      rpcSendResponse(pRsp);
S
Shengliang Guan 已提交
393
    } else if (ftype == DND_FUNC_REGIST) {
S
Shengliang Guan 已提交
394
      pRsp = pHead;
395 396
      dTrace("node:%s, get regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code,
             pRsp->info.handle);
S
Shengliang Guan 已提交
397 398
      rpcRegisterBrokenLinkArg(pRsp);
      rpcFreeCont(pBody);
S
Shengliang Guan 已提交
399
    } else if (ftype == DND_FUNC_RELEASE) {
S
Shengliang Guan 已提交
400
      pRsp = pHead;
401 402
      dTrace("node:%s, get release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code,
             pRsp->info.handle);
S
Shengliang Guan 已提交
403 404
      dmRemoveProcRpcHandle(proc, pRsp->info.handle);
      rpcReleaseHandle(pRsp->info.handle, (int8_t)pRsp->code);
S
Shengliang Guan 已提交
405
      rpcFreeCont(pBody);
S
shm  
Shengliang Guan 已提交
406
    } else {
407
      dFatal("node:%s, get msg:%p from pqueue, invalid ftype:%d", proc->name, pHead, ftype);
S
Shengliang Guan 已提交
408
      rpcFreeCont(pBody);
S
shm  
Shengliang Guan 已提交
409
    }
S
Shengliang Guan 已提交
410 411 412 413 414

    taosFreeQitem(pHead);
  } while (1);

  return NULL;
S
shm  
Shengliang Guan 已提交
415 416
}

S
Shengliang Guan 已提交
417 418
int32_t dmRunProc(SProc *proc) {
  TdThreadAttr thAttr = {0};
S
Shengliang Guan 已提交
419 420
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
shm  
Shengliang Guan 已提交
421

422
  if (proc->ptype & DND_PROC_PARENT) {
S
Shengliang Guan 已提交
423 424 425 426 427 428 429 430
    if (taosThreadCreate(&proc->pthread, &thAttr, dmConsumParentQueue, proc) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      dError("node:%s, failed to create pthread since %s", proc->name, terrstr());
      return -1;
    }
    dDebug("node:%s, thread:%" PRId64 " is created to consume pqueue", proc->name, proc->pthread);
  }

431
  if (proc->ptype & DND_PROC_CHILD) {
S
Shengliang Guan 已提交
432 433 434 435 436 437
    if (taosThreadCreate(&proc->cthread, &thAttr, dmConsumChildQueue, proc) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      dError("node:%s, failed to create cthread since %s", proc->name, terrstr());
      return -1;
    }
    dDebug("node:%s, thread:%" PRId64 " is created to consume cqueue", proc->name, proc->cthread);
S
shm  
Shengliang Guan 已提交
438 439
  }

S
Shengliang Guan 已提交
440
  taosThreadAttrDestroy(&thAttr);
S
shm  
Shengliang Guan 已提交
441 442 443
  return 0;
}

S
Shengliang Guan 已提交
444
void dmStopProc(SProc *proc) {
445
  proc->stop = true;
S
Shengliang Guan 已提交
446 447
  if (taosCheckPthreadValid(proc->pthread)) {
    dDebug("node:%s, start to join pthread:%" PRId64, proc->name, proc->pthread);
448
    tsem_post(&proc->pqueue->sem);
S
Shengliang Guan 已提交
449 450
    taosThreadJoin(proc->pthread, NULL);
    taosThreadClear(&proc->pthread);
S
shm  
Shengliang Guan 已提交
451
  }
S
shm  
Shengliang Guan 已提交
452

S
Shengliang Guan 已提交
453 454
  if (taosCheckPthreadValid(proc->cthread)) {
    dDebug("node:%s, start to join cthread:%" PRId64, proc->name, proc->cthread);
455
    tsem_post(&proc->cqueue->sem);
S
Shengliang Guan 已提交
456 457
    taosThreadJoin(proc->cthread, NULL);
    taosThreadClear(&proc->cthread);
S
shm  
Shengliang Guan 已提交
458 459 460
  }
}

S
Shengliang Guan 已提交
461 462
void dmCleanupProc(struct SMgmtWrapper *pWrapper) {
  SProc *proc = &pWrapper->proc;
463
  if (proc->name == NULL) return;
S
Shengliang Guan 已提交
464 465 466 467 468 469 470

  dDebug("node:%s, start to clean up proc", pWrapper->name);
  dmStopProc(proc);
  dmCleanupProcQueue(proc->cqueue);
  dmCleanupProcQueue(proc->pqueue);
  taosHashCleanup(proc->hash);
  dDebug("node:%s, proc is cleaned up", pWrapper->name);
471 472
}

S
Shengliang Guan 已提交
473
int64_t dmRemoveProcRpcHandle(SProc *proc, void *handle) {
474
  int64_t h = (int64_t)handle;
S
Shengliang Guan 已提交
475
  taosThreadMutexLock(&proc->cqueue->mutex);
476

S
Shengliang Guan 已提交
477
  int64_t *pRef = taosHashGet(proc->hash, &h, sizeof(int64_t));
478 479 480 481 482
  int64_t  ref = 0;
  if (pRef != NULL) {
    ref = *pRef;
  }

S
Shengliang Guan 已提交
483 484
  taosHashRemove(proc->hash, &h, sizeof(int64_t));
  taosThreadMutexUnlock(&proc->cqueue->mutex);
485

486
  return ref;
487 488
}

S
Shengliang Guan 已提交
489 490 491
void dmCloseProcRpcHandles(SProc *proc) {
  taosThreadMutexLock(&proc->cqueue->mutex);
  void *h = taosHashIterate(proc->hash, NULL);
492 493
  while (h != NULL) {
    void *handle = *((void **)h);
S
Shengliang Guan 已提交
494 495 496
    h = taosHashIterate(proc->hash, h);

    dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, handle);
S
Shengliang Guan 已提交
497
    SRpcMsg rpcMsg = {.info.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
S
Shengliang Guan 已提交
498
    rpcSendResponse(&rpcMsg);
499
  }
S
Shengliang Guan 已提交
500 501
  taosHashClear(proc->hash);
  taosThreadMutexUnlock(&proc->cqueue->mutex);
S
shm  
Shengliang Guan 已提交
502
}
S
shm  
Shengliang Guan 已提交
503

S
Shengliang Guan 已提交
504 505
void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
                       EProcFuncType ftype) {
S
Shengliang Guan 已提交
506
  int32_t retry = 0;
S
Shengliang Guan 已提交
507
  while (dmPushToProcQueue(proc, proc->pqueue, pHead, headLen, pBody, bodyLen, 0, 0, ftype) != 0) {
S
Shengliang Guan 已提交
508
    dWarn("node:%s, failed to put msg:%p to pqueue since %s, retry:%d", proc->name, pHead, terrstr(), retry);
S
Shengliang Guan 已提交
509 510
    retry++;
    taosMsleep(retry);
S
shm  
Shengliang Guan 已提交
511
  }
S
shm  
Shengliang Guan 已提交
512
}
S
Shengliang Guan 已提交
513 514 515 516 517

int32_t dmPutToProcCQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
                          void *handle, int64_t ref, EProcFuncType ftype) {
  return dmPushToProcQueue(proc, proc->cqueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ref, ftype);
}