streamTask.c 8.7 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "executor.h"
#include "tstream.h"
H
Haojun Liao 已提交
18
#include "wal.h"
L
Liu Jicong 已提交
19

20
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory) {
L
Liu Jicong 已提交
21 22
  SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
23
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
24 25
    return NULL;
  }
26 27 28

  pTask->id.taskId = tGenIdPI32();
  pTask->id.streamId = streamId;
29 30
  pTask->taskLevel = taskLevel;
  pTask->fillHistory = fillHistory;
31 32

  char buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
33
  sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId);
34 35

  pTask->id.idStr = taosStrdup(buf);
36
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
L
Liu Jicong 已提交
37 38 39 40 41 42
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;

  return pTask;
}

L
Liu Jicong 已提交
43 44 45 46
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) {
  if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1;
L
Liu Jicong 已提交
47
  /*if (tEncodeI64(pEncoder, pInfo->processedVer) < 0) return -1;*/
L
Liu Jicong 已提交
48 49 50 51 52 53 54 55
  if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1;
  return 0;
}

int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
  if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1;
L
Liu Jicong 已提交
56
  /*if (tDecodeI64(pDecoder, &pInfo->processedVer) < 0) return -1;*/
L
Liu Jicong 已提交
57 58 59 60
  if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1;
  return 0;
}

61
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
L
Liu Jicong 已提交
62
  if (tStartEncode(pEncoder) < 0) return -1;
63 64
  if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1;
L
Liu Jicong 已提交
65
  if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1;
66 67
  if (tEncodeI8(pEncoder, pTask->taskLevel) < 0) return -1;
  if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1;
L
Liu Jicong 已提交
68
  if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
69

70 71
  if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1;
  if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1;
L
Liu Jicong 已提交
72

L
Liu Jicong 已提交
73
  if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1;
L
Liu Jicong 已提交
74 75 76
  if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
  if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;

77 78
  if (tEncodeI64(pEncoder, pTask->chkInfo.id) < 0) return -1;
  if (tEncodeI64(pEncoder, pTask->chkInfo.version) < 0) return -1;
L
Liu Jicong 已提交
79 80
  if (tEncodeI8(pEncoder, pTask->fillHistory) < 0) return -1;

L
Liu Jicong 已提交
81 82 83 84 85 86 87
  int32_t epSz = taosArrayGetSize(pTask->childEpInfo);
  if (tEncodeI32(pEncoder, epSz) < 0) return -1;
  for (int32_t i = 0; i < epSz; i++) {
    SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->childEpInfo, i);
    if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
  }

88
  if (pTask->taskLevel != TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
89 90 91
    if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
  }

92
  if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
93 94 95
    if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
    if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1;
    if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
96
  } else if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
97
    if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
98
  } else if (pTask->outputType == TASK_OUTPUT__FETCH) {
L
Liu Jicong 已提交
99
    if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
100
  } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
L
Liu Jicong 已提交
101 102 103
    if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1;
    if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
    if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
104
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
105
    if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
L
Liu Jicong 已提交
106
    if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
L
Liu Jicong 已提交
107
  }
108
  if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1;
L
Liu Jicong 已提交
109

L
Liu Jicong 已提交
110
  tEndEncode(pEncoder);
L
Liu Jicong 已提交
111 112 113
  return pEncoder->pos;
}

114
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
L
Liu Jicong 已提交
115
  if (tStartDecode(pDecoder) < 0) return -1;
116 117
  if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
L
Liu Jicong 已提交
118
  if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1;
119 120
  if (tDecodeI8(pDecoder, &pTask->taskLevel) < 0) return -1;
  if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1;
L
Liu Jicong 已提交
121
  if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
122

123 124
  if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1;
  if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1;
L
Liu Jicong 已提交
125

L
Liu Jicong 已提交
126
  if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1;
L
Liu Jicong 已提交
127 128 129
  if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
  if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;

130 131
  if (tDecodeI64(pDecoder, &pTask->chkInfo.id) < 0) return -1;
  if (tDecodeI64(pDecoder, &pTask->chkInfo.version) < 0) return -1;
L
Liu Jicong 已提交
132 133
  if (tDecodeI8(pDecoder, &pTask->fillHistory) < 0) return -1;

L
Liu Jicong 已提交
134 135 136 137 138 139
  int32_t epSz;
  if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
  pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*));
  for (int32_t i = 0; i < epSz; i++) {
    SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
    if (pInfo == NULL) return -1;
5
54liuyao 已提交
140 141 142 143
    if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) {
      taosMemoryFreeClear(pInfo);
      return -1;
    }
L
Liu Jicong 已提交
144 145 146
    taosArrayPush(pTask->childEpInfo, &pInfo);
  }

147
  if (pTask->taskLevel != TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
148 149 150
    if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
  }

151
  if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
152 153 154 155 156
    if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
    if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1;
    pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
    if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
    if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
157
  } else if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
158
    if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
159
  } else if (pTask->outputType == TASK_OUTPUT__FETCH) {
L
Liu Jicong 已提交
160
    if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
161
  } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
L
Liu Jicong 已提交
162 163 164
    if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1;
    if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
    if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
165
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
166
    if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
L
Liu Jicong 已提交
167
    if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
L
Liu Jicong 已提交
168
  }
169
  if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1;
L
Liu Jicong 已提交
170

L
Liu Jicong 已提交
171
  tEndDecode(pDecoder);
L
Liu Jicong 已提交
172 173 174
  return 0;
}

175 176
void tFreeStreamTask(SStreamTask* pTask) {
  qDebug("free s-task:%s", pTask->id.idStr);
dengyihao's avatar
dengyihao 已提交
177
  int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
  if (pTask->inputQueue) {
    streamQueueClose(pTask->inputQueue);
  }
  if (pTask->outputQueue) {
    streamQueueClose(pTask->outputQueue);
  }
  if (pTask->exec.qmsg) {
    taosMemoryFree(pTask->exec.qmsg);
  }

  if (pTask->exec.pExecutor) {
    qDestroyTask(pTask->exec.pExecutor);
    pTask->exec.pExecutor = NULL;
  }

H
Haojun Liao 已提交
193 194 195 196
  if (pTask->exec.pWalReader != NULL) {
    walCloseReader(pTask->exec.pWalReader);
  }

L
Liu Jicong 已提交
197
  taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree);
L
Liu Jicong 已提交
198
  if (pTask->outputType == TASK_OUTPUT__TABLE) {
199
    tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
L
Liu Jicong 已提交
200
    taosMemoryFree(pTask->tbSink.pTSchema);
L
liuyao 已提交
201
    tSimpleHashCleanup(pTask->tbSink.pTblInfo);
L
Liu Jicong 已提交
202
  }
203

L
Liu Jicong 已提交
204 205
  if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
L
Liu Jicong 已提交
206 207
    taosArrayDestroy(pTask->checkReqIds);
    pTask->checkReqIds = NULL;
L
Liu Jicong 已提交
208
  }
209

210
  if (pTask->pState) {
dengyihao's avatar
dengyihao 已提交
211
    streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
212
  }
213

dengyihao's avatar
dengyihao 已提交
214
  if (pTask->id.idStr != NULL) {
215 216 217
    taosMemoryFree((void*)pTask->id.idStr);
  }

L
Liu Jicong 已提交
218 219
  taosMemoryFree(pTask);
}