tprocess.c 15.6 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tprocess.h"
#include "taoserror.h"
#include "tlog.h"
S
shm  
Shengliang Guan 已提交
20
#include "tqueue.h"
S
shm  
Shengliang Guan 已提交
21

S
shm  
Shengliang Guan 已提交
22 23 24 25
// todo
#include <sys/shm.h>
#include <sys/wait.h>

S
shm  
Shengliang Guan 已提交
26
#define SHM_DEFAULT_SIZE (20 * 1024 * 1024)
S
shm  
Shengliang Guan 已提交
27
typedef void *(*ProcThreadFp)(void *param);
S
shm  
Shengliang Guan 已提交
28 29

typedef struct SProcQueue {
S
shm  
Shengliang Guan 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42
  int32_t          head;
  int32_t          tail;
  int32_t          total;
  int32_t          avail;
  int32_t          items;
  char            *pBuffer;
  ProcMallocFp     mallocHeadFp;
  ProcFreeFp       freeHeadFp;
  ProcMallocFp     mallocBodyFp;
  ProcFreeFp       freeBodyFp;
  ProcConsumeFp    consumeFp;
  void            *pParent;
  tsem_t           sem;
S
Shengliang Guan 已提交
43
  TdThreadMutex   *mutex;
S
shm  
Shengliang Guan 已提交
44 45 46
  int32_t          mutexShmid;
  int32_t          bufferShmid;
  const char      *name;
S
shm  
Shengliang Guan 已提交
47 48
} SProcQueue;

S
shm  
Shengliang Guan 已提交
49
typedef struct SProcObj {
S
Shengliang Guan 已提交
50
  TdThread    childThread;
S
shm  
Shengliang Guan 已提交
51
  SProcQueue *pChildQueue;
S
Shengliang Guan 已提交
52
  TdThread    parentThread;
S
shm  
Shengliang Guan 已提交
53
  SProcQueue *pParentQueue;
S
shm  
Shengliang Guan 已提交
54
  const char *name;
S
shm  
Shengliang Guan 已提交
55 56 57 58 59
  int32_t     pid;
  bool        isChild;
  bool        stopFlag;
} SProcObj;

S
shm  
Shengliang Guan 已提交
60 61 62 63 64
static inline int32_t CEIL8(int32_t v) {
  const int32_t c = ceil((float)(v) / 8) * 8;
  return c < 8 ? 8 : c;
}

S
Shengliang Guan 已提交
65 66 67 68 69
static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) {
  TdThreadMutex    *pMutex = NULL;
  TdThreadMutexAttr mattr = {0};
  int32_t           shmid = -1;
  int32_t           code = -1;
S
shm  
Shengliang Guan 已提交
70 71 72 73 74 75 76 77 78 79 80 81 82

  if (pthread_mutexattr_init(&mattr) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    uError("failed to init mutex while init attr since %s", terrstr());
    goto _OVER;
  }

  if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    uError("failed to init mutex while set shared since %s", terrstr());
    goto _OVER;
  }

S
shm  
Shengliang Guan 已提交
83
  shmid = shmget(IPC_PRIVATE, sizeof(TdThreadMutex), IPC_CREAT | 0600);
S
shm  
Shengliang Guan 已提交
84 85 86 87 88 89
  if (shmid <= 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    uError("failed to init mutex while shmget since %s", terrstr());
    goto _OVER;
  }

S
Shengliang Guan 已提交
90
  pMutex = (TdThreadMutex *)shmat(shmid, NULL, 0);
S
shm  
Shengliang Guan 已提交
91 92 93 94 95 96
  if (pMutex == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    uError("failed to init mutex while shmat since %s", terrstr());
    goto _OVER;
  }

S
Shengliang Guan 已提交
97
  if (taosThreadMutexInit(pMutex, &mattr) != 0) {
S
shm  
Shengliang Guan 已提交
98 99 100 101 102 103 104 105 106
    terrno = TAOS_SYSTEM_ERROR(errno);
    uError("failed to init mutex since %s", terrstr());
    goto _OVER;
  }

  code = 0;

_OVER:
  if (code != 0) {
S
shm  
Shengliang Guan 已提交
107 108 109 110 111 112 113
    if (pMutex != NULL) {
      taosThreadMutexDestroy(pMutex);
      shmdt(pMutex);
    }
    if (shmid >= 0) {
      shmctl(shmid, IPC_RMID, NULL);
    }
S
shm  
Shengliang Guan 已提交
114 115 116 117 118 119 120 121 122
  } else {
    *ppMutex = pMutex;
    *pShmid = shmid;
  }

  pthread_mutexattr_destroy(&mattr);
  return code;
}

S
shm  
Shengliang Guan 已提交
123
static void taosProcDestroyMutex(TdThreadMutex *pMutex, int32_t shmid) {
S
shm  
Shengliang Guan 已提交
124
  if (pMutex != NULL) {
S
Shengliang Guan 已提交
125
    taosThreadMutexDestroy(pMutex);
S
shm  
Shengliang Guan 已提交
126
  }
S
shm  
Shengliang Guan 已提交
127 128
  if (shmid >= 0) {
    shmctl(shmid, IPC_RMID, NULL);
S
shm  
Shengliang Guan 已提交
129 130 131 132 133 134 135 136 137 138 139
  }
}

static int32_t taosProcInitBuffer(void **ppBuffer, int32_t size) {
  int32_t shmid = shmget(IPC_PRIVATE, size, IPC_CREAT | 0600);
  if (shmid <= 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    uError("failed to init buffer while shmget since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
140
  void *shmptr = shmat(shmid, NULL, 0);
S
shm  
Shengliang Guan 已提交
141 142 143 144 145 146 147 148 149 150 151
  if (shmptr == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    uError("failed to init buffer while shmat since %s", terrstr());
    shmctl(shmid, IPC_RMID, NULL);
    return -1;
  }

  *ppBuffer = shmptr;
  return shmid;
}

S
shm  
Shengliang Guan 已提交
152 153 154 155
static void taosProcDestroyBuffer(void *pBuffer, int32_t shmid) {
  if (shmid > 0) {
    shmdt(pBuffer);
    shmctl(shmid, IPC_RMID, NULL);
S
shm  
Shengliang Guan 已提交
156 157 158
  }
}

S
Shengliang Guan 已提交
159
static SProcQueue *taosProcInitQueue(int32_t size) {
S
shm  
Shengliang Guan 已提交
160 161
  if (size <= 0) size = SHM_DEFAULT_SIZE;

S
shm  
Shengliang Guan 已提交
162 163
  int32_t bufSize = CEIL8(size);
  int32_t headSize = CEIL8(sizeof(SProcQueue));
S
shm  
Shengliang Guan 已提交
164

S
shm  
Shengliang Guan 已提交
165 166
  SProcQueue *pQueue = NULL;
  int32_t     shmId = taosProcInitBuffer((void **)&pQueue, bufSize + headSize);
S
shm  
Shengliang Guan 已提交
167
  if (shmId < 0) {
S
shm  
Shengliang Guan 已提交
168 169 170
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
S
shm  
Shengliang Guan 已提交
171 172 173
  pQueue->bufferShmid = shmId;

  if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) {
S
shm  
Shengliang Guan 已提交
174
    taosProcDestroyBuffer(pQueue, pQueue->bufferShmid);
S
shm  
Shengliang Guan 已提交
175 176 177
    return NULL;
  }

S
shm  
Shengliang Guan 已提交
178
  if (tsem_init(&pQueue->sem, 1, 0) != 0) {
S
shm  
Shengliang Guan 已提交
179 180
    taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid);
    taosProcDestroyBuffer(pQueue, pQueue->bufferShmid);
S
shm  
Shengliang Guan 已提交
181 182 183 184
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

S
shm  
Shengliang Guan 已提交
185 186
  if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) {
    tsem_destroy(&pQueue->sem);
S
shm  
Shengliang Guan 已提交
187 188
    taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid);
    taosProcDestroyBuffer(pQueue, pQueue->bufferShmid);
S
shm  
Shengliang Guan 已提交
189 190 191
    return NULL;
  }

S
shm  
Shengliang Guan 已提交
192 193 194 195 196 197
  pQueue->head = 0;
  pQueue->tail = 0;
  pQueue->total = bufSize;
  pQueue->avail = bufSize;
  pQueue->items = 0;
  pQueue->pBuffer = (char *)pQueue + headSize;
S
shm  
Shengliang Guan 已提交
198 199 200
  return pQueue;
}

S
shm  
Shengliang Guan 已提交
201
static void taosProcCleanupQueue(SProcQueue *pQueue) {
S
shm  
Shengliang Guan 已提交
202
  if (pQueue != NULL) {
S
shm  
Shengliang Guan 已提交
203
    uDebug("proc:%s, queue:%p clean up", pQueue->name, pQueue);
S
shm  
Shengliang Guan 已提交
204
    tsem_destroy(&pQueue->sem);
S
shm  
Shengliang Guan 已提交
205 206
    taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid);
    taosProcDestroyBuffer(pQueue, pQueue->bufferShmid);
S
shm  
Shengliang Guan 已提交
207
  }
S
shm  
Shengliang Guan 已提交
208 209
}

S
shm  
Shengliang Guan 已提交
210
static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody,
S
Shengliang Guan 已提交
211
                                 int32_t rawBodyLen, ProcFuncType ftype) {
S
shm  
Shengliang Guan 已提交
212 213 214
  const int32_t headLen = CEIL8(rawHeadLen);
  const int32_t bodyLen = CEIL8(rawBodyLen);
  const int32_t fullLen = headLen + bodyLen + 8;
S
shm  
Shengliang Guan 已提交
215

S
Shengliang Guan 已提交
216
  taosThreadMutexLock(pQueue->mutex);
S
shm  
Shengliang Guan 已提交
217
  if (fullLen > pQueue->avail) {
S
Shengliang Guan 已提交
218
    taosThreadMutexUnlock(pQueue->mutex);
S
shm  
Shengliang Guan 已提交
219 220 221 222
    terrno = TSDB_CODE_OUT_OF_SHM_MEM;
    return -1;
  }

S
Shengliang Guan 已提交
223
  const int32_t pos = pQueue->tail;
S
shm  
Shengliang Guan 已提交
224
  if (pQueue->tail < pQueue->total) {
S
Shengliang Guan 已提交
225 226 227
    *(int16_t *)(pQueue->pBuffer + pQueue->tail) = headLen;
    *(int8_t *)(pQueue->pBuffer + pQueue->tail + 2) = (int8_t)ftype;
    *(int32_t *)(pQueue->pBuffer + pQueue->tail + 4) = bodyLen;
S
shm  
Shengliang Guan 已提交
228
  } else {
S
Shengliang Guan 已提交
229
    *(int16_t *)(pQueue->pBuffer) = headLen;
S
Shengliang Guan 已提交
230
    *(int8_t *)(pQueue->pBuffer + 2) = (int8_t)ftype;
S
shm  
Shengliang Guan 已提交
231 232 233
    *(int32_t *)(pQueue->pBuffer + 4) = bodyLen;
  }

S
shm  
Shengliang Guan 已提交
234
  if (pQueue->tail < pQueue->head) {
S
shm  
Shengliang Guan 已提交
235 236 237
    memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen);
    memcpy(pQueue->pBuffer + pQueue->tail + 8 + headLen, pBody, rawBodyLen);
    pQueue->tail = pQueue->tail + 8 + headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
238 239
  } else {
    int32_t remain = pQueue->total - pQueue->tail;
S
shm  
Shengliang Guan 已提交
240 241 242 243 244 245 246 247 248
    if (remain == 0) {
      memcpy(pQueue->pBuffer + 8, pHead, rawHeadLen);
      memcpy(pQueue->pBuffer + 8 + headLen, pBody, rawBodyLen);
      pQueue->tail = 8 + headLen + bodyLen;
    } else if (remain == 8) {
      memcpy(pQueue->pBuffer, pHead, rawHeadLen);
      memcpy(pQueue->pBuffer + headLen, pBody, rawBodyLen);
      pQueue->tail = headLen + bodyLen;
    } else if (remain < 8 + headLen) {
S
Shengliang Guan 已提交
249
      memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, remain - 8);
S
shm  
Shengliang Guan 已提交
250 251 252
      memcpy(pQueue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8));
      memcpy(pQueue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
      pQueue->tail = headLen - (remain - 8) + bodyLen;
S
Shengliang Guan 已提交
253 254 255
    } else if (remain < 8 + headLen + bodyLen) {
      memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen);
      memcpy(pQueue->pBuffer + pQueue->tail + 8 + headLen, pBody, remain - 8 - headLen);
S
shm  
Shengliang Guan 已提交
256 257
      memcpy(pQueue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
      pQueue->tail = bodyLen - (remain - 8 - headLen);
S
shm  
Shengliang Guan 已提交
258
    } else {
S
Shengliang Guan 已提交
259 260 261
      memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen);
      memcpy(pQueue->pBuffer + pQueue->tail + headLen + 8, pBody, rawBodyLen);
      pQueue->tail = pQueue->tail + headLen + bodyLen + 8;
S
shm  
Shengliang Guan 已提交
262 263 264 265 266
    }
  }

  pQueue->avail -= fullLen;
  pQueue->items++;
S
Shengliang Guan 已提交
267
  taosThreadMutexUnlock(pQueue->mutex);
S
shm  
Shengliang Guan 已提交
268
  tsem_post(&pQueue->sem);
S
shm  
Shengliang Guan 已提交
269

S
Shengliang Guan 已提交
270 271
  uTrace("proc:%s, push msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype,
         pQueue->items, headLen, pHead, bodyLen, pBody);
S
shm  
Shengliang Guan 已提交
272 273 274
  return 0;
}

S
Shengliang Guan 已提交
275 276
static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHeadLen, void **ppBody,
                                int32_t *pBodyLen, ProcFuncType *pFuncType) {
S
shm  
Shengliang Guan 已提交
277 278
  tsem_wait(&pQueue->sem);

S
Shengliang Guan 已提交
279
  taosThreadMutexLock(pQueue->mutex);
S
shm  
Shengliang Guan 已提交
280
  if (pQueue->total - pQueue->avail <= 0) {
S
Shengliang Guan 已提交
281
    taosThreadMutexUnlock(pQueue->mutex);
S
shm  
Shengliang Guan 已提交
282 283
    tsem_post(&pQueue->sem);
    terrno = TSDB_CODE_OUT_OF_SHM_MEM;
S
shm  
Shengliang Guan 已提交
284
    return 0;
S
shm  
Shengliang Guan 已提交
285 286
  }

S
Shengliang Guan 已提交
287 288
  int16_t headLen = 0;
  int8_t  ftype = 0;
S
shm  
Shengliang Guan 已提交
289 290
  int32_t bodyLen = 0;
  if (pQueue->head < pQueue->total) {
S
Shengliang Guan 已提交
291 292
    headLen = *(int16_t *)(pQueue->pBuffer + pQueue->head);
    ftype = *(int8_t *)(pQueue->pBuffer + pQueue->head + 2);
S
shm  
Shengliang Guan 已提交
293 294
    bodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4);
  } else {
S
Shengliang Guan 已提交
295
    headLen = *(int16_t *)(pQueue->pBuffer);
296
    ftype = *(int8_t *)(pQueue->pBuffer + 2);
S
shm  
Shengliang Guan 已提交
297 298
    bodyLen = *(int32_t *)(pQueue->pBuffer + 4);
  }
S
shm  
Shengliang Guan 已提交
299

S
shm  
Shengliang Guan 已提交
300 301
  void *pHead = (*pQueue->mallocHeadFp)(headLen);
  void *pBody = (*pQueue->mallocBodyFp)(bodyLen);
S
shm  
Shengliang Guan 已提交
302
  if (pHead == NULL || pBody == NULL) {
S
Shengliang Guan 已提交
303
    taosThreadMutexUnlock(pQueue->mutex);
S
shm  
Shengliang Guan 已提交
304
    tsem_post(&pQueue->sem);
S
shm  
Shengliang Guan 已提交
305 306
    (*pQueue->freeHeadFp)(pHead);
    (*pQueue->freeBodyFp)(pBody);
S
shm  
Shengliang Guan 已提交
307 308 309 310
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
311
  const int32_t pos = pQueue->head;
S
shm  
Shengliang Guan 已提交
312
  if (pQueue->head < pQueue->tail) {
S
shm  
Shengliang Guan 已提交
313 314 315
    memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
    memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, bodyLen);
    pQueue->head = pQueue->head + 8 + headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
316 317
  } else {
    int32_t remain = pQueue->total - pQueue->head;
S
shm  
Shengliang Guan 已提交
318 319 320 321 322 323 324 325 326 327 328 329 330
    if (remain == 0) {
      memcpy(pHead, pQueue->pBuffer + 8, headLen);
      memcpy(pBody, pQueue->pBuffer + 8 + headLen, bodyLen);
      pQueue->head = 8 + headLen + bodyLen;
    } else if (remain == 8) {
      memcpy(pHead, pQueue->pBuffer, headLen);
      memcpy(pBody, pQueue->pBuffer + headLen, bodyLen);
      pQueue->head = headLen + bodyLen;
    } else if (remain < 8 + headLen) {
      memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, remain - 8);
      memcpy(pHead + remain - 8, pQueue->pBuffer, headLen - (remain - 8));
      memcpy(pBody, pQueue->pBuffer + headLen - (remain - 8), bodyLen);
      pQueue->head = headLen - (remain - 8) + bodyLen;
S
Shengliang Guan 已提交
331
    } else if (remain < 8 + headLen + bodyLen) {
S
shm  
Shengliang Guan 已提交
332 333 334 335
      memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
      memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, remain - 8 - headLen);
      memcpy(pBody + remain - 8 - headLen, pQueue->pBuffer, bodyLen - (remain - 8 - headLen));
      pQueue->head = bodyLen - (remain - 8 - headLen);
S
shm  
Shengliang Guan 已提交
336
    } else {
S
shm  
Shengliang Guan 已提交
337 338 339
      memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
      memcpy(pBody, pQueue->pBuffer + pQueue->head + headLen + 8, bodyLen);
      pQueue->head = pQueue->head + headLen + bodyLen + 8;
S
shm  
Shengliang Guan 已提交
340 341 342
    }
  }

S
shm  
Shengliang Guan 已提交
343
  pQueue->avail = pQueue->avail + headLen + bodyLen + 8;
S
shm  
Shengliang Guan 已提交
344
  pQueue->items--;
S
Shengliang Guan 已提交
345
  taosThreadMutexUnlock(pQueue->mutex);
S
shm  
Shengliang Guan 已提交
346

S
shm  
Shengliang Guan 已提交
347 348 349 350
  *ppHead = pHead;
  *ppBody = pBody;
  *pHeadLen = headLen;
  *pBodyLen = bodyLen;
S
Shengliang Guan 已提交
351
  *pFuncType = (ProcFuncType)ftype;
S
shm  
Shengliang Guan 已提交
352

S
Shengliang Guan 已提交
353 354
  uTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype,
         pQueue->items, headLen, pHead, bodyLen, pBody);
S
shm  
Shengliang Guan 已提交
355
  return 1;
S
shm  
Shengliang Guan 已提交
356 357 358
}

SProcObj *taosProcInit(const SProcCfg *pCfg) {
wafwerar's avatar
wafwerar 已提交
359
  SProcObj *pProc = taosMemoryCalloc(1, sizeof(SProcObj));
S
shm  
Shengliang Guan 已提交
360 361 362 363 364
  if (pProc == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

S
shm  
Shengliang Guan 已提交
365
  pProc->name = pCfg->name;
S
Shengliang Guan 已提交
366 367
  pProc->pChildQueue = taosProcInitQueue(pCfg->childQueueSize);
  pProc->pParentQueue = taosProcInitQueue(pCfg->parentQueueSize);
S
shm  
Shengliang Guan 已提交
368
  if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) {
S
Shengliang Guan 已提交
369
    taosProcCleanupQueue(pProc->pChildQueue);
wafwerar's avatar
wafwerar 已提交
370
    taosMemoryFree(pProc);
S
shm  
Shengliang Guan 已提交
371 372 373
    return NULL;
  }

S
shm  
Shengliang Guan 已提交
374
  pProc->pChildQueue->name = pCfg->name;
S
shm  
Shengliang Guan 已提交
375 376 377 378 379 380
  pProc->pChildQueue->pParent = pCfg->pParent;
  pProc->pChildQueue->mallocHeadFp = pCfg->childMallocHeadFp;
  pProc->pChildQueue->freeHeadFp = pCfg->childFreeHeadFp;
  pProc->pChildQueue->mallocBodyFp = pCfg->childMallocBodyFp;
  pProc->pChildQueue->freeBodyFp = pCfg->childFreeBodyFp;
  pProc->pChildQueue->consumeFp = pCfg->childConsumeFp;
S
shm  
Shengliang Guan 已提交
381
  pProc->pParentQueue->name = pCfg->name;
S
shm  
Shengliang Guan 已提交
382 383 384 385 386 387 388
  pProc->pParentQueue->pParent = pCfg->pParent;
  pProc->pParentQueue->mallocHeadFp = pCfg->parentdMallocHeadFp;
  pProc->pParentQueue->freeHeadFp = pCfg->parentFreeHeadFp;
  pProc->pParentQueue->mallocBodyFp = pCfg->parentMallocBodyFp;
  pProc->pParentQueue->freeBodyFp = pCfg->parentFreeBodyFp;
  pProc->pParentQueue->consumeFp = pCfg->parentConsumeFp;

S
shm  
Shengliang Guan 已提交
389
  uDebug("proc:%s, is initialized, child queue:%p parent queue:%p", pProc->name, pProc->pChildQueue, pProc->pParentQueue);
S
shm  
Shengliang Guan 已提交
390

S
shm  
Shengliang Guan 已提交
391 392 393
  pProc->pid = fork();
  if (pProc->pid == 0) {
    pProc->isChild = 1;
S
Shengliang Guan 已提交
394
    prctl(PR_SET_NAME, pProc->name, NULL, NULL, NULL); 
S
shm  
Shengliang Guan 已提交
395 396 397
  } else {
    pProc->isChild = 0;
    uInfo("this is parent process, child pid:%d", pProc->pid);
S
shm  
Shengliang Guan 已提交
398
  }
S
shm  
Shengliang Guan 已提交
399

S
shm  
Shengliang Guan 已提交
400 401 402
  return pProc;
}

S
shm  
Shengliang Guan 已提交
403 404 405 406
static void taosProcThreadLoop(SProcQueue *pQueue) {
  ProcConsumeFp consumeFp = pQueue->consumeFp;
  void         *pParent = pQueue->pParent;
  void         *pHead, *pBody;
S
Shengliang Guan 已提交
407 408 409
  int16_t       headLen;
  ProcFuncType  ftype;
  int32_t       bodyLen;
S
shm  
Shengliang Guan 已提交
410

S
shm  
Shengliang Guan 已提交
411
  uDebug("proc:%s, start to get msg from queue:%p", pQueue->name, pQueue);
S
shm  
Shengliang Guan 已提交
412

S
shm  
Shengliang Guan 已提交
413
  while (1) {
S
Shengliang Guan 已提交
414
    int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
S
shm  
Shengliang Guan 已提交
415 416
    if (numOfMsgs == 0) {
      uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pQueue->name, pQueue);
S
shm  
Shengliang Guan 已提交
417
      break;
S
shm  
Shengliang Guan 已提交
418 419
    } else if (numOfMsgs < 0) {
      uTrace("proc:%s, get no msg from queue:%p since %s", pQueue->name, pQueue, terrstr());
S
shm  
Shengliang Guan 已提交
420 421 422
      taosMsleep(1);
      continue;
    } else {
S
Shengliang Guan 已提交
423
      (*consumeFp)(pParent, pHead, headLen, pBody, bodyLen, ftype);
S
shm  
Shengliang Guan 已提交
424 425 426 427
    }
  }
}

S
shm  
Shengliang Guan 已提交
428
int32_t taosProcRun(SProcObj *pProc) {
S
Shengliang Guan 已提交
429 430 431
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
shm  
Shengliang Guan 已提交
432

S
shm  
Shengliang Guan 已提交
433
  if (pProc->isChild) {
S
Shengliang Guan 已提交
434
    if (taosThreadCreate(&pProc->childThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pChildQueue) != 0) {
S
shm  
Shengliang Guan 已提交
435 436 437 438
      terrno = TAOS_SYSTEM_ERROR(errno);
      uError("failed to create thread since %s", terrstr());
      return -1;
    }
S
shm  
Shengliang Guan 已提交
439
    uDebug("proc:%s, child start to consume queue:%p", pProc->name, pProc->pChildQueue);
S
shm  
Shengliang Guan 已提交
440
  } else {
S
Shengliang Guan 已提交
441
    if (taosThreadCreate(&pProc->parentThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pParentQueue) != 0) {
S
shm  
Shengliang Guan 已提交
442 443 444 445
      terrno = TAOS_SYSTEM_ERROR(errno);
      uError("failed to create thread since %s", terrstr());
      return -1;
    }
S
shm  
Shengliang Guan 已提交
446
    uDebug("proc:%s, parent start to consume queue:%p", pProc->name, pProc->pParentQueue);
S
shm  
Shengliang Guan 已提交
447 448 449 450 451 452 453
  }

  return 0;
}

void taosProcStop(SProcObj *pProc) {
  pProc->stopFlag = true;
S
shm  
Shengliang Guan 已提交
454
  // todo join
S
shm  
Shengliang Guan 已提交
455 456
}

S
shm  
Shengliang Guan 已提交
457 458
bool taosProcIsChild(SProcObj *pProc) { return pProc->isChild; }

S
shm  
Shengliang Guan 已提交
459 460
int32_t taosProcChildId(SProcObj *pProc) { return pProc->pid; }

S
shm  
Shengliang Guan 已提交
461 462
void taosProcCleanup(SProcObj *pProc) {
  if (pProc != NULL) {
S
shm  
Shengliang Guan 已提交
463
    uDebug("proc:%s, clean up", pProc->name);
S
shm  
Shengliang Guan 已提交
464
    taosProcStop(pProc);
S
Shengliang Guan 已提交
465 466
    taosProcCleanupQueue(pProc->pChildQueue);
    taosProcCleanupQueue(pProc->pParentQueue);
wafwerar's avatar
wafwerar 已提交
467
    taosMemoryFree(pProc);
S
shm  
Shengliang Guan 已提交
468 469 470
  }
}

S
shm  
Shengliang Guan 已提交
471
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
S
Shengliang Guan 已提交
472 473
                            ProcFuncType ftype) {
  return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, ftype);
S
shm  
Shengliang Guan 已提交
474
}
S
shm  
Shengliang Guan 已提交
475

S
shm  
Shengliang Guan 已提交
476
int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
S
Shengliang Guan 已提交
477 478
                             ProcFuncType ftype) {
  return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype);
S
shm  
Shengliang Guan 已提交
479
}