streamTask.c 11.8 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "executor.h"
#include "tstream.h"
H
Haojun Liao 已提交
18
#include "wal.h"
L
Liu Jicong 已提交
19

20 21 22 23 24 25 26 27
static int32_t mndAddToTaskset(SArray* pArray, SStreamTask* pTask) {
  int32_t childId = taosArrayGetSize(pArray);
  pTask->selfChildId = childId;
  taosArrayPush(pArray, &pTask);
  return 0;
}

SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList) {
L
Liu Jicong 已提交
28 29
  SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
30
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
31 32
    return NULL;
  }
33 34 35

  pTask->id.taskId = tGenIdPI32();
  pTask->id.streamId = streamId;
36 37
  pTask->taskLevel = taskLevel;
  pTask->fillHistory = fillHistory;
38
  pTask->triggerParam = triggerParam;
39 40

  char buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
41
  sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId);
42 43

  pTask->id.idStr = taosStrdup(buf);
44
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
L
Liu Jicong 已提交
45 46 47
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;

48
  mndAddToTaskset(pTaskList, pTask);
L
Liu Jicong 已提交
49 50 51
  return pTask;
}

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
SStreamTask* streamTaskClone(SStreamTask* pTask) {
  SStreamTask* pDst = taosMemoryCalloc(1, sizeof(SStreamTask));
 /* pDst->

  SStreamId       id;
  int32_t         totalLevel;
  int8_t          taskLevel;
  int8_t          outputType;
  int16_t         dispatchMsgType;
  SStreamStatus   status;
  int32_t         selfChildId;
  int32_t         nodeId;      // vgroup id
  SEpSet          epSet;
  SCheckpointInfo chkInfo;
  STaskExec       exec;
  int8_t          fillHistory;  // fill history
  int64_t         ekey;         // end ts key
  int64_t         endVer;       // end version

  // children info
  SArray* childEpInfo;  // SArray<SStreamChildEpInfo*>
  int32_t nextCheckId;
  SArray* checkpointInfo;  // SArray<SStreamCheckpointInfo>

  // output
  union {
    STaskDispatcherFixedEp fixedEpDispatcher;
    STaskDispatcherShuffle shuffleDispatcher;
    STaskSinkTb            tbSink;
    STaskSinkSma           smaSink;
    STaskSinkFetch         fetchSink;
  };

  int8_t        inputStatus;
  int8_t        outputStatus;
  SStreamQueue* inputQueue;
  SStreamQueue* outputQueue;

  // trigger
  int8_t        triggerStatus;
  int64_t       triggerParam;
  void*         timer;
  SMsgCb*       pMsgCb;  // msg handle
  SStreamState* pState;  // state backend

  // the followings attributes don't be serialized
  int32_t             recoverTryingDownstream;
  int32_t             recoverWaitingUpstream;
  int64_t             checkReqId;
  SArray*             checkReqIds;  // shuffle
  int32_t             refCnt;
  int64_t             checkpointingId;
  int32_t             checkpointAlignCnt;
  struct SStreamMeta* pMeta;

  int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
  if (pTask->inputQueue) {
    streamQueueClose(pTask->inputQueue);
  }
  if (pTask->outputQueue) {
    streamQueueClose(pTask->outputQueue);
  }
  if (pTask->exec.qmsg) {
    taosMemoryFree(pTask->exec.qmsg);
  }

  if (pTask->exec.pExecutor) {
    qDestroyTask(pTask->exec.pExecutor);
    pTask->exec.pExecutor = NULL;
  }

  if (pTask->exec.pWalReader != NULL) {
    walCloseReader(pTask->exec.pWalReader);
  }

  taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree);
  if (pTask->outputType == TASK_OUTPUT__TABLE) {
    tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
    taosMemoryFree(pTask->tbSink.pTSchema);
    tSimpleHashCleanup(pTask->tbSink.pTblInfo);
  }

  if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
    taosArrayDestroy(pTask->checkReqIds);
    pTask->checkReqIds = NULL;
  }

  if (pTask->pState) {
    streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
  }

  if (pTask->id.idStr != NULL) {
    taosMemoryFree((void*)pTask->id.idStr);
  }

  taosMemoryFree(pTask);*/
  return NULL;
}

L
Liu Jicong 已提交
152 153 154 155
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) {
  if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1;
L
Liu Jicong 已提交
156
  /*if (tEncodeI64(pEncoder, pInfo->processedVer) < 0) return -1;*/
L
Liu Jicong 已提交
157 158 159 160 161 162 163 164
  if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1;
  return 0;
}

int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
  if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1;
L
Liu Jicong 已提交
165
  /*if (tDecodeI64(pDecoder, &pInfo->processedVer) < 0) return -1;*/
L
Liu Jicong 已提交
166 167 168 169
  if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1;
  return 0;
}

170
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
L
Liu Jicong 已提交
171
  if (tStartEncode(pEncoder) < 0) return -1;
172 173
  if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1;
L
Liu Jicong 已提交
174
  if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1;
175 176
  if (tEncodeI8(pEncoder, pTask->taskLevel) < 0) return -1;
  if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1;
L
Liu Jicong 已提交
177
  if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
178

179 180
  if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1;
  if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1;
L
Liu Jicong 已提交
181

L
Liu Jicong 已提交
182
  if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1;
L
Liu Jicong 已提交
183 184 185
  if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
  if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;

186 187
  if (tEncodeI64(pEncoder, pTask->chkInfo.id) < 0) return -1;
  if (tEncodeI64(pEncoder, pTask->chkInfo.version) < 0) return -1;
L
Liu Jicong 已提交
188 189
  if (tEncodeI8(pEncoder, pTask->fillHistory) < 0) return -1;

L
Liu Jicong 已提交
190 191 192 193 194 195 196
  int32_t epSz = taosArrayGetSize(pTask->childEpInfo);
  if (tEncodeI32(pEncoder, epSz) < 0) return -1;
  for (int32_t i = 0; i < epSz; i++) {
    SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->childEpInfo, i);
    if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
  }

197
  if (pTask->taskLevel != TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
198 199 200
    if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
  }

201
  if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
202 203 204
    if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
    if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1;
    if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
205
  } else if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
206
    if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
207
  } else if (pTask->outputType == TASK_OUTPUT__FETCH) {
L
Liu Jicong 已提交
208
    if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
209
  } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
L
Liu Jicong 已提交
210 211 212
    if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1;
    if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
    if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
213
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
214
    if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
L
Liu Jicong 已提交
215
    if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
L
Liu Jicong 已提交
216
  }
217
  if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1;
L
Liu Jicong 已提交
218

L
Liu Jicong 已提交
219
  tEndEncode(pEncoder);
L
Liu Jicong 已提交
220 221 222
  return pEncoder->pos;
}

223
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
L
Liu Jicong 已提交
224
  if (tStartDecode(pDecoder) < 0) return -1;
225 226
  if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
L
Liu Jicong 已提交
227
  if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1;
228 229
  if (tDecodeI8(pDecoder, &pTask->taskLevel) < 0) return -1;
  if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1;
L
Liu Jicong 已提交
230
  if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
231

232 233
  if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1;
  if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1;
L
Liu Jicong 已提交
234

L
Liu Jicong 已提交
235
  if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1;
L
Liu Jicong 已提交
236 237 238
  if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
  if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;

239 240
  if (tDecodeI64(pDecoder, &pTask->chkInfo.id) < 0) return -1;
  if (tDecodeI64(pDecoder, &pTask->chkInfo.version) < 0) return -1;
L
Liu Jicong 已提交
241 242
  if (tDecodeI8(pDecoder, &pTask->fillHistory) < 0) return -1;

L
Liu Jicong 已提交
243 244 245 246 247 248
  int32_t epSz;
  if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
  pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*));
  for (int32_t i = 0; i < epSz; i++) {
    SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
    if (pInfo == NULL) return -1;
5
54liuyao 已提交
249 250 251 252
    if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) {
      taosMemoryFreeClear(pInfo);
      return -1;
    }
L
Liu Jicong 已提交
253 254 255
    taosArrayPush(pTask->childEpInfo, &pInfo);
  }

256
  if (pTask->taskLevel != TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
257 258 259
    if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
  }

260
  if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
261 262 263 264 265
    if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
    if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1;
    pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
    if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
    if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
266
  } else if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
267
    if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
268
  } else if (pTask->outputType == TASK_OUTPUT__FETCH) {
L
Liu Jicong 已提交
269
    if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
270
  } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
L
Liu Jicong 已提交
271 272 273
    if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1;
    if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
    if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
274
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
275
    if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
L
Liu Jicong 已提交
276
    if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
L
Liu Jicong 已提交
277
  }
278
  if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1;
L
Liu Jicong 已提交
279

L
Liu Jicong 已提交
280
  tEndDecode(pDecoder);
L
Liu Jicong 已提交
281 282 283
  return 0;
}

284 285
void tFreeStreamTask(SStreamTask* pTask) {
  qDebug("free s-task:%s", pTask->id.idStr);
dengyihao's avatar
dengyihao 已提交
286
  int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
  if (pTask->inputQueue) {
    streamQueueClose(pTask->inputQueue);
  }
  if (pTask->outputQueue) {
    streamQueueClose(pTask->outputQueue);
  }
  if (pTask->exec.qmsg) {
    taosMemoryFree(pTask->exec.qmsg);
  }

  if (pTask->exec.pExecutor) {
    qDestroyTask(pTask->exec.pExecutor);
    pTask->exec.pExecutor = NULL;
  }

H
Haojun Liao 已提交
302 303 304 305
  if (pTask->exec.pWalReader != NULL) {
    walCloseReader(pTask->exec.pWalReader);
  }

L
Liu Jicong 已提交
306
  taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree);
L
Liu Jicong 已提交
307
  if (pTask->outputType == TASK_OUTPUT__TABLE) {
308
    tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
L
Liu Jicong 已提交
309
    taosMemoryFree(pTask->tbSink.pTSchema);
L
liuyao 已提交
310
    tSimpleHashCleanup(pTask->tbSink.pTblInfo);
L
Liu Jicong 已提交
311
  }
312

L
Liu Jicong 已提交
313 314
  if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
L
Liu Jicong 已提交
315 316
    taosArrayDestroy(pTask->checkReqIds);
    pTask->checkReqIds = NULL;
L
Liu Jicong 已提交
317
  }
318

319
  if (pTask->pState) {
dengyihao's avatar
dengyihao 已提交
320
    streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
321
  }
322

dengyihao's avatar
dengyihao 已提交
323
  if (pTask->id.idStr != NULL) {
324 325 326
    taosMemoryFree((void*)pTask->id.idStr);
  }

L
Liu Jicong 已提交
327 328
  taosMemoryFree(pTask);
}