tprocess.c 16.4 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "tprocess.h"
18
#include "taos.h"
S
shm  
Shengliang Guan 已提交
19
#include "taoserror.h"
20
#include "thash.h"
S
shm  
Shengliang Guan 已提交
21
#include "tlog.h"
S
shm  
Shengliang Guan 已提交
22
#include "tqueue.h"
S
shm  
Shengliang Guan 已提交
23

S
shm  
Shengliang Guan 已提交
24
typedef void *(*ProcThreadFp)(void *param);
S
shm  
Shengliang Guan 已提交
25 26

typedef struct SProcQueue {
S
shm  
Shengliang Guan 已提交
27 28 29 30 31 32 33 34 35
  int32_t       head;
  int32_t       tail;
  int32_t       total;
  int32_t       avail;
  int32_t       items;
  char          name[8];
  TdThreadMutex mutex;
  tsem_t        sem;
  char          pBuffer[];
S
shm  
Shengliang Guan 已提交
36 37
} SProcQueue;

S
shm  
Shengliang Guan 已提交
38
typedef struct SProcObj {
S
shm  
Shengliang Guan 已提交
39 40 41 42 43 44 45 46 47 48 49 50 51
  TdThread      thread;
  SProcQueue   *pChildQueue;
  SProcQueue   *pParentQueue;
  ProcConsumeFp childConsumeFp;
  ProcMallocFp  childMallocHeadFp;
  ProcFreeFp    childFreeHeadFp;
  ProcMallocFp  childMallocBodyFp;
  ProcFreeFp    childFreeBodyFp;
  ProcConsumeFp parentConsumeFp;
  ProcMallocFp  parentMallocHeadFp;
  ProcFreeFp    parentFreeHeadFp;
  ProcMallocFp  parentMallocBodyFp;
  ProcFreeFp    parentFreeBodyFp;
52
  void         *parent;
S
shm  
Shengliang Guan 已提交
53
  const char   *name;
54
  SHashObj     *hash;
S
shm  
Shengliang Guan 已提交
55 56 57
  int32_t       pid;
  bool          isChild;
  bool          stopFlag;
S
shm  
Shengliang Guan 已提交
58 59
} SProcObj;

S
shm  
Shengliang Guan 已提交
60 61 62 63 64
static inline int32_t CEIL8(int32_t v) {
  const int32_t c = ceil((float)(v) / 8) * 8;
  return c < 8 ? 8 : c;
}

S
shm  
Shengliang Guan 已提交
65
static int32_t taosProcInitMutex(SProcQueue *pQueue) {
S
Shengliang Guan 已提交
66
  TdThreadMutexAttr mattr = {0};
S
shm  
Shengliang Guan 已提交
67

wafwerar's avatar
wafwerar 已提交
68
  if (taosThreadMutexAttrInit(&mattr) != 0) {
S
shm  
Shengliang Guan 已提交
69 70
    terrno = TAOS_SYSTEM_ERROR(errno);
    uError("failed to init mutex while init attr since %s", terrstr());
S
shm  
Shengliang Guan 已提交
71
    return -1;
S
shm  
Shengliang Guan 已提交
72 73
  }

wafwerar's avatar
wafwerar 已提交
74
  if (taosThreadMutexAttrSetPshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) {
75
    taosThreadMutexAttrDestroy(&mattr);
S
shm  
Shengliang Guan 已提交
76 77
    terrno = TAOS_SYSTEM_ERROR(errno);
    uError("failed to init mutex while set shared since %s", terrstr());
S
shm  
Shengliang Guan 已提交
78
    return -1;
S
shm  
Shengliang Guan 已提交
79 80
  }

S
shm  
Shengliang Guan 已提交
81
  if (taosThreadMutexInit(&pQueue->mutex, &mattr) != 0) {
S
Shengliang Guan 已提交
82
    taosThreadMutexAttrDestroy(&mattr);
S
shm  
Shengliang Guan 已提交
83 84
    terrno = TAOS_SYSTEM_ERROR(errno);
    uError("failed to init mutex since %s", terrstr());
S
shm  
Shengliang Guan 已提交
85
    return -1;
S
shm  
Shengliang Guan 已提交
86 87
  }

wafwerar's avatar
wafwerar 已提交
88
  taosThreadMutexAttrDestroy(&mattr);
S
shm  
Shengliang Guan 已提交
89
  return 0;
S
shm  
Shengliang Guan 已提交
90 91
}

S
shm  
Shengliang Guan 已提交
92 93
static int32_t taosProcInitSem(SProcQueue *pQueue) {
  if (tsem_init(&pQueue->sem, 1, 0) != 0) {
S
shm  
Shengliang Guan 已提交
94
    terrno = TAOS_SYSTEM_ERROR(errno);
S
shm  
Shengliang Guan 已提交
95
    uError("failed to init sem");
S
shm  
Shengliang Guan 已提交
96 97 98
    return -1;
  }

S
shm  
Shengliang Guan 已提交
99
  return 0;
S
shm  
Shengliang Guan 已提交
100 101
}

S
shm  
Shengliang Guan 已提交
102 103 104 105 106
static SProcQueue *taosProcInitQueue(const char *name, bool isChild, char *ptr, int32_t size) {
  int32_t bufSize = size - CEIL8(sizeof(SProcQueue));
  if (bufSize <= 1024) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
S
shm  
Shengliang Guan 已提交
107 108
  }

S
shm  
Shengliang Guan 已提交
109
  SProcQueue *pQueue = (SProcQueue *)(ptr);
S
shm  
Shengliang Guan 已提交
110

S
shm  
Shengliang Guan 已提交
111 112 113 114
  if (!isChild) {
    if (taosProcInitMutex(pQueue) != 0) {
      return NULL;
    }
S
shm  
Shengliang Guan 已提交
115

S
shm  
Shengliang Guan 已提交
116 117 118
    if (taosProcInitSem(pQueue) != 0) {
      return NULL;
    }
S
shm  
Shengliang Guan 已提交
119

S
shm  
Shengliang Guan 已提交
120 121 122 123 124 125
    tstrncpy(pQueue->name, name, sizeof(pQueue->name));
    pQueue->head = 0;
    pQueue->tail = 0;
    pQueue->total = bufSize;
    pQueue->avail = bufSize;
    pQueue->items = 0;
S
shm  
Shengliang Guan 已提交
126 127
  }

S
shm  
Shengliang Guan 已提交
128 129 130 131 132 133 134 135
  return pQueue;
}

#if 0
static void taosProcDestroyMutex(SProcQueue *pQueue) {
  if (pQueue->mutex != NULL) {
    taosThreadMutexDestroy(pQueue->mutex);
    pQueue->mutex = NULL;
S
shm  
Shengliang Guan 已提交
136
  }
S
shm  
Shengliang Guan 已提交
137
}
S
shm  
Shengliang Guan 已提交
138

S
shm  
Shengliang Guan 已提交
139 140 141 142
static void taosProcDestroySem(SProcQueue *pQueue) {
  if (pQueue->sem != NULL) {
    tsem_destroy(pQueue->sem);
    pQueue->sem = NULL;
S
shm  
Shengliang Guan 已提交
143
  }
S
shm  
Shengliang Guan 已提交
144
}
S
shm  
Shengliang Guan 已提交
145
#endif
S
shm  
Shengliang Guan 已提交
146

S
shm  
Shengliang Guan 已提交
147
static void taosProcCleanupQueue(SProcQueue *pQueue) {
S
shm  
Shengliang Guan 已提交
148
#if 0  
S
shm  
Shengliang Guan 已提交
149
  if (pQueue != NULL) {
S
shm  
Shengliang Guan 已提交
150 151
    taosProcDestroyMutex(pQueue);
    taosProcDestroySem(pQueue);
S
shm  
Shengliang Guan 已提交
152
  }
S
shm  
Shengliang Guan 已提交
153
#endif
S
shm  
Shengliang Guan 已提交
154
}
S
shm  
Shengliang Guan 已提交
155

156
static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen,
157 158
                                 const char *pBody, int32_t rawBodyLen, int64_t handle, int64_t handleRef,
                                 EProcFuncType ftype) {
S
Shengliang Guan 已提交
159 160 161 162 163
  if (rawHeadLen == 0 || pHead == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    return -1;
  }

S
shm  
Shengliang Guan 已提交
164 165 166
  const int32_t headLen = CEIL8(rawHeadLen);
  const int32_t bodyLen = CEIL8(rawBodyLen);
  const int32_t fullLen = headLen + bodyLen + 8;
S
shm  
Shengliang Guan 已提交
167

S
shm  
Shengliang Guan 已提交
168
  taosThreadMutexLock(&pQueue->mutex);
S
shm  
Shengliang Guan 已提交
169
  if (fullLen > pQueue->avail) {
S
shm  
Shengliang Guan 已提交
170
    taosThreadMutexUnlock(&pQueue->mutex);
S
shm  
Shengliang Guan 已提交
171 172 173 174
    terrno = TSDB_CODE_OUT_OF_SHM_MEM;
    return -1;
  }

S
Shengliang Guan 已提交
175
  if (handle != 0 && ftype == PROC_FUNC_REQ) {
176
    if (taosHashPut(pProc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) {
177 178 179 180 181 182
      taosThreadMutexUnlock(&pQueue->mutex);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
  }

S
Shengliang Guan 已提交
183
  const int32_t pos = pQueue->tail;
S
shm  
Shengliang Guan 已提交
184
  if (pQueue->tail < pQueue->total) {
S
Shengliang Guan 已提交
185
    *(int16_t *)(pQueue->pBuffer + pQueue->tail) = rawHeadLen;
S
Shengliang Guan 已提交
186
    *(int8_t *)(pQueue->pBuffer + pQueue->tail + 2) = (int8_t)ftype;
S
Shengliang Guan 已提交
187
    *(int32_t *)(pQueue->pBuffer + pQueue->tail + 4) = rawBodyLen;
S
shm  
Shengliang Guan 已提交
188
  } else {
S
Shengliang Guan 已提交
189
    *(int16_t *)(pQueue->pBuffer) = rawHeadLen;
S
Shengliang Guan 已提交
190
    *(int8_t *)(pQueue->pBuffer + 2) = (int8_t)ftype;
S
Shengliang Guan 已提交
191
    *(int32_t *)(pQueue->pBuffer + 4) = rawBodyLen;
S
shm  
Shengliang Guan 已提交
192 193
  }

S
shm  
Shengliang Guan 已提交
194
  if (pQueue->tail < pQueue->head) {
S
shm  
Shengliang Guan 已提交
195 196 197
    memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen);
    memcpy(pQueue->pBuffer + pQueue->tail + 8 + headLen, pBody, rawBodyLen);
    pQueue->tail = pQueue->tail + 8 + headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
198 199
  } else {
    int32_t remain = pQueue->total - pQueue->tail;
S
shm  
Shengliang Guan 已提交
200 201 202 203 204 205 206 207 208
    if (remain == 0) {
      memcpy(pQueue->pBuffer + 8, pHead, rawHeadLen);
      memcpy(pQueue->pBuffer + 8 + headLen, pBody, rawBodyLen);
      pQueue->tail = 8 + headLen + bodyLen;
    } else if (remain == 8) {
      memcpy(pQueue->pBuffer, pHead, rawHeadLen);
      memcpy(pQueue->pBuffer + headLen, pBody, rawBodyLen);
      pQueue->tail = headLen + bodyLen;
    } else if (remain < 8 + headLen) {
S
Shengliang Guan 已提交
209
      memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, remain - 8);
S
shm  
Shengliang Guan 已提交
210 211 212
      memcpy(pQueue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8));
      memcpy(pQueue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
      pQueue->tail = headLen - (remain - 8) + bodyLen;
S
Shengliang Guan 已提交
213 214 215
    } else if (remain < 8 + headLen + bodyLen) {
      memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen);
      memcpy(pQueue->pBuffer + pQueue->tail + 8 + headLen, pBody, remain - 8 - headLen);
S
shm  
Shengliang Guan 已提交
216 217
      memcpy(pQueue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
      pQueue->tail = bodyLen - (remain - 8 - headLen);
S
shm  
Shengliang Guan 已提交
218
    } else {
S
Shengliang Guan 已提交
219 220 221
      memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen);
      memcpy(pQueue->pBuffer + pQueue->tail + headLen + 8, pBody, rawBodyLen);
      pQueue->tail = pQueue->tail + headLen + bodyLen + 8;
S
shm  
Shengliang Guan 已提交
222 223 224 225 226
    }
  }

  pQueue->avail -= fullLen;
  pQueue->items++;
S
shm  
Shengliang Guan 已提交
227
  taosThreadMutexUnlock(&pQueue->mutex);
S
shm  
Shengliang Guan 已提交
228
  tsem_post(&pQueue->sem);
S
shm  
Shengliang Guan 已提交
229

S
Shengliang Guan 已提交
230 231
  uTrace("proc:%s, push msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype,
         pQueue->items, headLen, pHead, bodyLen, pBody);
S
shm  
Shengliang Guan 已提交
232 233 234
  return 0;
}

S
shm  
Shengliang Guan 已提交
235
static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHeadLen, void **ppBody, int32_t *pBodyLen,
S
Shengliang Guan 已提交
236
                                EProcFuncType *pFuncType, ProcMallocFp mallocHeadFp, ProcFreeFp freeHeadFp,
S
shm  
Shengliang Guan 已提交
237
                                ProcMallocFp mallocBodyFp, ProcFreeFp freeBodyFp) {
S
shm  
Shengliang Guan 已提交
238 239
  tsem_wait(&pQueue->sem);

S
shm  
Shengliang Guan 已提交
240
  taosThreadMutexLock(&pQueue->mutex);
S
shm  
Shengliang Guan 已提交
241
  if (pQueue->total - pQueue->avail <= 0) {
S
shm  
Shengliang Guan 已提交
242
    taosThreadMutexUnlock(&pQueue->mutex);
S
shm  
Shengliang Guan 已提交
243
    terrno = TSDB_CODE_OUT_OF_SHM_MEM;
S
shm  
Shengliang Guan 已提交
244
    return 0;
S
shm  
Shengliang Guan 已提交
245 246
  }

S
Shengliang Guan 已提交
247
  int16_t rawHeadLen = 0;
S
Shengliang Guan 已提交
248
  int8_t  ftype = 0;
S
Shengliang Guan 已提交
249
  int32_t rawBodyLen = 0;
S
shm  
Shengliang Guan 已提交
250
  if (pQueue->head < pQueue->total) {
S
Shengliang Guan 已提交
251
    rawHeadLen = *(int16_t *)(pQueue->pBuffer + pQueue->head);
S
Shengliang Guan 已提交
252
    ftype = *(int8_t *)(pQueue->pBuffer + pQueue->head + 2);
S
Shengliang Guan 已提交
253
    rawBodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4);
S
shm  
Shengliang Guan 已提交
254
  } else {
S
Shengliang Guan 已提交
255
    rawHeadLen = *(int16_t *)(pQueue->pBuffer);
256
    ftype = *(int8_t *)(pQueue->pBuffer + 2);
S
Shengliang Guan 已提交
257
    rawBodyLen = *(int32_t *)(pQueue->pBuffer + 4);
S
shm  
Shengliang Guan 已提交
258
  }
S
Shengliang Guan 已提交
259
  int16_t headLen = CEIL8(rawHeadLen);
S
Shengliang Guan 已提交
260
  int32_t bodyLen = CEIL8(rawBodyLen);
S
shm  
Shengliang Guan 已提交
261

S
shm  
Shengliang Guan 已提交
262 263
  void *pHead = (*mallocHeadFp)(headLen);
  void *pBody = (*mallocBodyFp)(bodyLen);
S
shm  
Shengliang Guan 已提交
264
  if (pHead == NULL || pBody == NULL) {
S
shm  
Shengliang Guan 已提交
265
    taosThreadMutexUnlock(&pQueue->mutex);
S
shm  
Shengliang Guan 已提交
266
    tsem_post(&pQueue->sem);
S
shm  
Shengliang Guan 已提交
267 268
    (*freeHeadFp)(pHead);
    (*freeBodyFp)(pBody);
S
shm  
Shengliang Guan 已提交
269 270 271 272
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
273
  const int32_t pos = pQueue->head;
S
shm  
Shengliang Guan 已提交
274
  if (pQueue->head < pQueue->tail) {
S
shm  
Shengliang Guan 已提交
275 276 277
    memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
    memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, bodyLen);
    pQueue->head = pQueue->head + 8 + headLen + bodyLen;
S
shm  
Shengliang Guan 已提交
278 279
  } else {
    int32_t remain = pQueue->total - pQueue->head;
S
shm  
Shengliang Guan 已提交
280 281 282 283 284 285 286 287 288 289
    if (remain == 0) {
      memcpy(pHead, pQueue->pBuffer + 8, headLen);
      memcpy(pBody, pQueue->pBuffer + 8 + headLen, bodyLen);
      pQueue->head = 8 + headLen + bodyLen;
    } else if (remain == 8) {
      memcpy(pHead, pQueue->pBuffer, headLen);
      memcpy(pBody, pQueue->pBuffer + headLen, bodyLen);
      pQueue->head = headLen + bodyLen;
    } else if (remain < 8 + headLen) {
      memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, remain - 8);
290
      memcpy((char *)pHead + remain - 8, pQueue->pBuffer, headLen - (remain - 8));
S
shm  
Shengliang Guan 已提交
291 292
      memcpy(pBody, pQueue->pBuffer + headLen - (remain - 8), bodyLen);
      pQueue->head = headLen - (remain - 8) + bodyLen;
S
Shengliang Guan 已提交
293
    } else if (remain < 8 + headLen + bodyLen) {
S
shm  
Shengliang Guan 已提交
294 295
      memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
      memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, remain - 8 - headLen);
296
      memcpy((char *)pBody + remain - 8 - headLen, pQueue->pBuffer, bodyLen - (remain - 8 - headLen));
S
shm  
Shengliang Guan 已提交
297
      pQueue->head = bodyLen - (remain - 8 - headLen);
S
shm  
Shengliang Guan 已提交
298
    } else {
S
shm  
Shengliang Guan 已提交
299 300 301
      memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
      memcpy(pBody, pQueue->pBuffer + pQueue->head + headLen + 8, bodyLen);
      pQueue->head = pQueue->head + headLen + bodyLen + 8;
S
shm  
Shengliang Guan 已提交
302 303 304
    }
  }

S
shm  
Shengliang Guan 已提交
305
  pQueue->avail = pQueue->avail + headLen + bodyLen + 8;
S
shm  
Shengliang Guan 已提交
306
  pQueue->items--;
S
shm  
Shengliang Guan 已提交
307
  taosThreadMutexUnlock(&pQueue->mutex);
S
shm  
Shengliang Guan 已提交
308

S
shm  
Shengliang Guan 已提交
309 310
  *ppHead = pHead;
  *ppBody = pBody;
S
Shengliang Guan 已提交
311 312
  *pHeadLen = rawHeadLen;
  *pBodyLen = rawBodyLen;
S
Shengliang Guan 已提交
313
  *pFuncType = (EProcFuncType)ftype;
S
shm  
Shengliang Guan 已提交
314

S
Shengliang Guan 已提交
315
  uTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype,
S
Shengliang Guan 已提交
316
         pQueue->items, rawHeadLen, pHead, rawBodyLen, pBody);
S
shm  
Shengliang Guan 已提交
317
  return 1;
S
shm  
Shengliang Guan 已提交
318 319 320
}

SProcObj *taosProcInit(const SProcCfg *pCfg) {
wafwerar's avatar
wafwerar 已提交
321
  SProcObj *pProc = taosMemoryCalloc(1, sizeof(SProcObj));
S
shm  
Shengliang Guan 已提交
322 323 324 325 326
  if (pProc == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

S
shm  
Shengliang Guan 已提交
327 328 329 330 331 332 333 334
  int32_t cstart = 0;
  int32_t csize = CEIL8(pCfg->shm.size / 2);
  int32_t pstart = csize;
  int32_t psize = CEIL8(pCfg->shm.size - pstart);
  if (pstart + psize > pCfg->shm.size) {
    psize -= 8;
  }

S
shm  
Shengliang Guan 已提交
335
  pProc->name = pCfg->name;
S
shm  
Shengliang Guan 已提交
336 337
  pProc->pChildQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + cstart, csize);
  pProc->pParentQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + pstart, psize);
338
  pProc->hash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
S
shm  
Shengliang Guan 已提交
339
  if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) {
S
shm  
Shengliang Guan 已提交
340
    taosProcCleanupQueue(pProc->pChildQueue);
wafwerar's avatar
wafwerar 已提交
341
    taosMemoryFree(pProc);
S
shm  
Shengliang Guan 已提交
342 343 344
    return NULL;
  }

S
shm  
Shengliang Guan 已提交
345
  pProc->name = pCfg->name;
346
  pProc->parent = pCfg->parent;
S
shm  
Shengliang Guan 已提交
347 348 349 350 351 352 353 354 355 356 357 358
  pProc->childMallocHeadFp = pCfg->childMallocHeadFp;
  pProc->childFreeHeadFp = pCfg->childFreeHeadFp;
  pProc->childMallocBodyFp = pCfg->childMallocBodyFp;
  pProc->childFreeBodyFp = pCfg->childFreeBodyFp;
  pProc->childConsumeFp = pCfg->childConsumeFp;
  pProc->parentMallocHeadFp = pCfg->parentMallocHeadFp;
  pProc->parentFreeHeadFp = pCfg->parentFreeHeadFp;
  pProc->parentMallocBodyFp = pCfg->parentMallocBodyFp;
  pProc->parentFreeBodyFp = pCfg->parentFreeBodyFp;
  pProc->parentConsumeFp = pCfg->parentConsumeFp;
  pProc->isChild = pCfg->isChild;

S
shm  
Shengliang Guan 已提交
359
  uDebug("proc:%s, is initialized, isChild:%d child queue:%p parent queue:%p", pProc->name, pProc->isChild,
S
shm  
Shengliang Guan 已提交
360
         pProc->pChildQueue, pProc->pParentQueue);
S
shm  
Shengliang Guan 已提交
361

S
shm  
Shengliang Guan 已提交
362 363 364
  return pProc;
}

S
shm  
Shengliang Guan 已提交
365
static void taosProcThreadLoop(SProcObj *pProc) {
S
shm  
Shengliang Guan 已提交
366
  void         *pHead, *pBody;
S
Shengliang Guan 已提交
367
  int16_t       headLen;
S
Shengliang Guan 已提交
368
  EProcFuncType ftype;
S
Shengliang Guan 已提交
369
  int32_t       bodyLen;
S
shm  
Shengliang Guan 已提交
370 371 372 373 374 375
  SProcQueue   *pQueue;
  ProcConsumeFp consumeFp;
  ProcMallocFp  mallocHeadFp;
  ProcFreeFp    freeHeadFp;
  ProcMallocFp  mallocBodyFp;
  ProcFreeFp    freeBodyFp;
S
shm  
Shengliang Guan 已提交
376

S
shm  
Shengliang Guan 已提交
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
  if (pProc->isChild) {
    pQueue = pProc->pChildQueue;
    consumeFp = pProc->childConsumeFp;
    mallocHeadFp = pProc->childMallocHeadFp;
    freeHeadFp = pProc->childFreeHeadFp;
    mallocBodyFp = pProc->childMallocBodyFp;
    freeBodyFp = pProc->childFreeBodyFp;
  } else {
    pQueue = pProc->pParentQueue;
    consumeFp = pProc->parentConsumeFp;
    mallocHeadFp = pProc->parentMallocHeadFp;
    freeHeadFp = pProc->parentFreeHeadFp;
    mallocBodyFp = pProc->parentMallocBodyFp;
    freeBodyFp = pProc->parentFreeBodyFp;
  }

S
Shengliang Guan 已提交
393
  uDebug("proc:%s, start to get msg from queue:%p, thread:%" PRId64, pProc->name, pQueue, pProc->thread);
S
shm  
Shengliang Guan 已提交
394

S
shm  
Shengliang Guan 已提交
395
  while (1) {
S
shm  
Shengliang Guan 已提交
396 397
    int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype, mallocHeadFp, freeHeadFp,
                                         mallocBodyFp, freeBodyFp);
S
shm  
Shengliang Guan 已提交
398
    if (numOfMsgs == 0) {
S
shm  
Shengliang Guan 已提交
399
      uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pProc->name, pQueue);
S
shm  
Shengliang Guan 已提交
400
      break;
S
shm  
Shengliang Guan 已提交
401
    } else if (numOfMsgs < 0) {
S
Shengliang Guan 已提交
402
      uError("proc:%s, get no msg from queue:%p since %s", pProc->name, pQueue, terrstr());
S
shm  
Shengliang Guan 已提交
403 404 405
      taosMsleep(1);
      continue;
    } else {
406
      (*consumeFp)(pProc->parent, pHead, headLen, pBody, bodyLen, ftype);
S
shm  
Shengliang Guan 已提交
407 408 409 410
    }
  }
}

S
shm  
Shengliang Guan 已提交
411
int32_t taosProcRun(SProcObj *pProc) {
S
Shengliang Guan 已提交
412 413 414
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
shm  
Shengliang Guan 已提交
415

S
shm  
Shengliang Guan 已提交
416 417 418 419
  if (taosThreadCreate(&pProc->thread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    uError("failed to create thread since %s", terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
420 421
  }

S
Shengliang Guan 已提交
422
  uDebug("proc:%s, start to consume, thread:%" PRId64, pProc->name, pProc->thread);
S
shm  
Shengliang Guan 已提交
423 424 425
  return 0;
}

S
Shengliang Guan 已提交
426
void taosProcStop(SProcObj *pProc) {
S
shm  
Shengliang Guan 已提交
427
  if (!taosCheckPthreadValid(pProc->thread)) return;
S
shm  
Shengliang Guan 已提交
428

S
shm  
Shengliang Guan 已提交
429
  uDebug("proc:%s, start to join thread:%" PRId64, pProc->name, pProc->thread);
S
shm  
Shengliang Guan 已提交
430 431 432
  SProcQueue *pQueue;
  if (pProc->isChild) {
    pQueue = pProc->pChildQueue;
S
shm  
Shengliang Guan 已提交
433 434
  } else {
    pQueue = pProc->pParentQueue;
S
shm  
Shengliang Guan 已提交
435 436 437
  }
  tsem_post(&pQueue->sem);
  taosThreadJoin(pProc->thread, NULL);
wafwerar's avatar
wafwerar 已提交
438
  taosThreadClear(&pProc->thread);
S
shm  
Shengliang Guan 已提交
439
}
S
shm  
Shengliang Guan 已提交
440

S
shm  
Shengliang Guan 已提交
441 442
void taosProcCleanup(SProcObj *pProc) {
  if (pProc != NULL) {
S
shm  
Shengliang Guan 已提交
443
    uDebug("proc:%s, start to clean up", pProc->name);
S
shm  
Shengliang Guan 已提交
444
    taosProcStop(pProc);
S
shm  
Shengliang Guan 已提交
445 446
    taosProcCleanupQueue(pProc->pChildQueue);
    taosProcCleanupQueue(pProc->pParentQueue);
447 448 449 450 451
    if (pProc->hash != NULL) {
      taosHashCleanup(pProc->hash);
      pProc->hash = NULL;
    }

S
shm  
Shengliang Guan 已提交
452
    uDebug("proc:%s, is cleaned up", pProc->name);
wafwerar's avatar
wafwerar 已提交
453
    taosMemoryFree(pProc);
S
shm  
Shengliang Guan 已提交
454 455 456
  }
}

S
shm  
Shengliang Guan 已提交
457
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
458
                            void *handle, int64_t handleRef, EProcFuncType ftype) {
S
Shengliang Guan 已提交
459
  if (ftype != PROC_FUNC_REQ) {
S
Shengliang Guan 已提交
460 461 462
    terrno = TSDB_CODE_INVALID_PARA;
    return -1;
  }
463 464
  return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, handleRef,
                           ftype);
465 466
}

467
int64_t taosProcRemoveHandle(SProcObj *pProc, void *handle) {
468 469
  int64_t h = (int64_t)handle;
  taosThreadMutexLock(&pProc->pChildQueue->mutex);
470 471

  int64_t *handleRef = taosHashGet(pProc->hash, &h, sizeof(int64_t));
472 473
  taosHashRemove(pProc->hash, &h, sizeof(int64_t));
  taosThreadMutexUnlock(&pProc->pChildQueue->mutex);
474 475 476

  if (handleRef == NULL) return 0;
  return *handleRef;
477 478 479 480 481 482 483 484
}

void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) {
  taosThreadMutexLock(&pProc->pChildQueue->mutex);
  void *h = taosHashIterate(pProc->hash, NULL);
  while (h != NULL) {
    void *handle = *((void **)h);
    (*HandleFp)(handle);
S
Shengliang Guan 已提交
485
    h = taosHashIterate(pProc->hash, h);
486
  }
S
Shengliang Guan 已提交
487
  taosHashClear(pProc->hash);
488
  taosThreadMutexUnlock(&pProc->pChildQueue->mutex);
S
shm  
Shengliang Guan 已提交
489
}
S
shm  
Shengliang Guan 已提交
490

S
shm  
Shengliang Guan 已提交
491
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
S
Shengliang Guan 已提交
492
                          EProcFuncType ftype) {
S
Shengliang Guan 已提交
493
  int32_t retry = 0;
494
  while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, 0, ftype) != 0) {
S
Shengliang Guan 已提交
495
    uWarn("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry);
S
Shengliang Guan 已提交
496 497
    retry++;
    taosMsleep(retry);
S
shm  
Shengliang Guan 已提交
498
  }
S
shm  
Shengliang Guan 已提交
499
}