streamTask.c 10.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "executor.h"
#include "tstream.h"
H
Haojun Liao 已提交
18
#include "wal.h"
L
Liu Jicong 已提交
19

20
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
21
  int32_t childId = taosArrayGetSize(pArray);
22
  pTask->info.selfChildId = childId;
23 24 25 26 27
  taosArrayPush(pArray, &pTask);
  return 0;
}

SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList) {
L
Liu Jicong 已提交
28 29
  SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
30
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
31 32
    return NULL;
  }
33 34 35

  pTask->id.taskId = tGenIdPI32();
  pTask->id.streamId = streamId;
36 37
  pTask->info.taskLevel = taskLevel;
  pTask->info.fillHistory = fillHistory;
38
  pTask->triggerParam = triggerParam;
39 40

  char buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
41
  sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId);
42 43

  pTask->id.idStr = taosStrdup(buf);
44
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
45
  pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY;
L
Liu Jicong 已提交
46
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
47
  pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
L
Liu Jicong 已提交
48

49
  addToTaskset(pTaskList, pTask);
L
Liu Jicong 已提交
50 51 52
  return pTask;
}

L
Liu Jicong 已提交
53 54 55 56
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) {
  if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1;
L
Liu Jicong 已提交
57
  /*if (tEncodeI64(pEncoder, pInfo->processedVer) < 0) return -1;*/
L
Liu Jicong 已提交
58 59 60 61 62 63 64 65
  if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1;
  return 0;
}

int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
  if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1;
L
Liu Jicong 已提交
66
  /*if (tDecodeI64(pDecoder, &pInfo->processedVer) < 0) return -1;*/
L
Liu Jicong 已提交
67 68 69 70
  if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1;
  return 0;
}

71
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
L
Liu Jicong 已提交
72
  if (tStartEncode(pEncoder) < 0) return -1;
73 74
  if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1;
75 76
  if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1;
  if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1;
77
  if (tEncodeI8(pEncoder, pTask->outputInfo.type) < 0) return -1;
78
  if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1;
79

80 81
  if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1;
  if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1;
L
Liu Jicong 已提交
82

83 84 85
  if (tEncodeI32(pEncoder, pTask->info.selfChildId) < 0) return -1;
  if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1;
  if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1;
L
Liu Jicong 已提交
86

87 88
  if (tEncodeI64(pEncoder, pTask->chkInfo.id) < 0) return -1;
  if (tEncodeI64(pEncoder, pTask->chkInfo.version) < 0) return -1;
89
  if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1;
L
Liu Jicong 已提交
90

91 92
  if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1;
  if (tEncodeI32(pEncoder, pTask->historyTaskId.taskId)) return -1;
93 94 95
  if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1;
  if (tEncodeI32(pEncoder, pTask->streamTaskId.taskId)) return -1;

96 97 98 99 100
  if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1;
  if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1;
  if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1;
  if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1;

101
  int32_t epSz = taosArrayGetSize(pTask->pUpstreamEpInfoList);
L
Liu Jicong 已提交
102 103
  if (tEncodeI32(pEncoder, epSz) < 0) return -1;
  for (int32_t i = 0; i < epSz; i++) {
104
    SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i);
L
Liu Jicong 已提交
105 106 107
    if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
  }

108
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
109 110 111
    if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
  }

112
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
113 114 115
    if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
    if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1;
    if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
116
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
117
    if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
118
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
L
Liu Jicong 已提交
119
    if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
120
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
L
Liu Jicong 已提交
121 122 123
    if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1;
    if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
    if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
124
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
125
    if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
L
Liu Jicong 已提交
126
    if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
L
Liu Jicong 已提交
127
  }
128
  if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1;
L
Liu Jicong 已提交
129

L
Liu Jicong 已提交
130
  tEndEncode(pEncoder);
L
Liu Jicong 已提交
131 132 133
  return pEncoder->pos;
}

134
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
L
Liu Jicong 已提交
135
  if (tStartDecode(pDecoder) < 0) return -1;
136 137
  if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
138 139
  if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1;
  if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1;
140
  if (tDecodeI8(pDecoder, &pTask->outputInfo.type) < 0) return -1;
141
  if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1;
142

143 144
  if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1;
  if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1;
L
Liu Jicong 已提交
145

146 147 148
  if (tDecodeI32(pDecoder, &pTask->info.selfChildId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1;
  if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1;
L
Liu Jicong 已提交
149

150 151
  if (tDecodeI64(pDecoder, &pTask->chkInfo.id) < 0) return -1;
  if (tDecodeI64(pDecoder, &pTask->chkInfo.version) < 0) return -1;
152
  if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;
L
Liu Jicong 已提交
153

154 155
  if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1;
  if (tDecodeI32(pDecoder, &pTask->historyTaskId.taskId)) return -1;
156 157 158
  if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1;
  if (tDecodeI32(pDecoder, &pTask->streamTaskId.taskId)) return -1;

159 160 161 162 163
  if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1;
  if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1;
  if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1;
  if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1;

L
Liu Jicong 已提交
164 165
  int32_t epSz;
  if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
166 167

  pTask->pUpstreamEpInfoList = taosArrayInit(epSz, POINTER_BYTES);
L
Liu Jicong 已提交
168 169 170
  for (int32_t i = 0; i < epSz; i++) {
    SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
    if (pInfo == NULL) return -1;
5
54liuyao 已提交
171 172 173 174
    if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) {
      taosMemoryFreeClear(pInfo);
      return -1;
    }
175
    taosArrayPush(pTask->pUpstreamEpInfoList, &pInfo);
L
Liu Jicong 已提交
176 177
  }

178
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
179 180 181
    if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
  }

182
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
183 184 185 186 187
    if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
    if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1;
    pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
    if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
    if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
188
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
189
    if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
190
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
L
Liu Jicong 已提交
191
    if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
192
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
L
Liu Jicong 已提交
193 194 195
    if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1;
    if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
    if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
196
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
197
    if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
L
Liu Jicong 已提交
198
    if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
L
Liu Jicong 已提交
199
  }
200
  if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1;
L
Liu Jicong 已提交
201

L
Liu Jicong 已提交
202
  tEndDecode(pDecoder);
L
Liu Jicong 已提交
203 204 205
  return 0;
}

206 207
void tFreeStreamTask(SStreamTask* pTask) {
  qDebug("free s-task:%s", pTask->id.idStr);
H
Haojun Liao 已提交
208

dengyihao's avatar
dengyihao 已提交
209
  int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
210 211 212
  if (pTask->inputQueue) {
    streamQueueClose(pTask->inputQueue);
  }
H
Haojun Liao 已提交
213

214 215
  if (pTask->outputInfo.queue) {
    streamQueueClose(pTask->outputInfo.queue);
216
  }
H
Haojun Liao 已提交
217

218 219 220 221 222 223 224 225 226
  if (pTask->exec.qmsg) {
    taosMemoryFree(pTask->exec.qmsg);
  }

  if (pTask->exec.pExecutor) {
    qDestroyTask(pTask->exec.pExecutor);
    pTask->exec.pExecutor = NULL;
  }

H
Haojun Liao 已提交
227 228 229 230
  if (pTask->exec.pWalReader != NULL) {
    walCloseReader(pTask->exec.pWalReader);
  }

231
  taosArrayDestroyP(pTask->pUpstreamEpInfoList, taosMemoryFree);
232
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
233
    tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
L
Liu Jicong 已提交
234
    taosMemoryFree(pTask->tbSink.pTSchema);
L
liuyao 已提交
235
    tSimpleHashCleanup(pTask->tbSink.pTblInfo);
236
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
237
    taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
L
Liu Jicong 已提交
238 239
    taosArrayDestroy(pTask->checkReqIds);
    pTask->checkReqIds = NULL;
L
Liu Jicong 已提交
240
  }
241

242
  if (pTask->pState) {
dengyihao's avatar
dengyihao 已提交
243
    streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
244
  }
245

dengyihao's avatar
dengyihao 已提交
246
  if (pTask->id.idStr != NULL) {
247 248 249
    taosMemoryFree((void*)pTask->id.idStr);
  }

L
liuyao 已提交
250 251 252 253
  if (pTask->pNameMap) {
    tSimpleHashCleanup(pTask->pNameMap);
  }

L
Liu Jicong 已提交
254 255
  taosMemoryFree(pTask);
}