streamTask.c 7.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "executor.h"
#include "tstream.h"

L
Liu Jicong 已提交
19
SStreamTask* tNewSStreamTask(int64_t streamId) {
L
Liu Jicong 已提交
20 21 22 23 24 25
  SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return NULL;
  }
  pTask->taskId = tGenIdPI32();
  pTask->streamId = streamId;
L
Liu Jicong 已提交
26
  pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
L
Liu Jicong 已提交
27 28 29 30 31 32
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;

  return pTask;
}

L
Liu Jicong 已提交
33 34 35 36
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) {
  if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1;
L
Liu Jicong 已提交
37
  /*if (tEncodeI64(pEncoder, pInfo->processedVer) < 0) return -1;*/
L
Liu Jicong 已提交
38 39 40 41 42 43 44 45
  if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1;
  return 0;
}

int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
  if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1;
L
Liu Jicong 已提交
46
  /*if (tDecodeI64(pDecoder, &pInfo->processedVer) < 0) return -1;*/
L
Liu Jicong 已提交
47 48 49 50
  if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1;
  return 0;
}

L
Liu Jicong 已提交
51
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
L
Liu Jicong 已提交
52
  if (tStartEncode(pEncoder) < 0) return -1;
L
Liu Jicong 已提交
53 54
  if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
L
Liu Jicong 已提交
55
  if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1;
56 57
  if (tEncodeI8(pEncoder, pTask->taskLevel) < 0) return -1;
  if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1;
L
Liu Jicong 已提交
58
  if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
59 60

  if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1;
L
Liu Jicong 已提交
61
  if (tEncodeI8(pEncoder, pTask->schedStatus) < 0) return -1;
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63
  if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1;
L
Liu Jicong 已提交
64 65 66
  if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
  if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;

L
Liu Jicong 已提交
67 68 69 70
  if (tEncodeI64(pEncoder, pTask->recoverSnapVer) < 0) return -1;
  if (tEncodeI64(pEncoder, pTask->startVer) < 0) return -1;
  if (tEncodeI8(pEncoder, pTask->fillHistory) < 0) return -1;

L
Liu Jicong 已提交
71 72 73 74 75 76 77
  int32_t epSz = taosArrayGetSize(pTask->childEpInfo);
  if (tEncodeI32(pEncoder, epSz) < 0) return -1;
  for (int32_t i = 0; i < epSz; i++) {
    SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->childEpInfo, i);
    if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
  }

78
  if (pTask->taskLevel != TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
79 80 81
    if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
  }

82
  if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
83 84 85
    if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
    if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1;
    if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
86
  } else if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
87
    if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
88
  } else if (pTask->outputType == TASK_OUTPUT__FETCH) {
L
Liu Jicong 已提交
89
    if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
90
  } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
L
Liu Jicong 已提交
91 92 93
    if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1;
    if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
    if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
94
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
95
    if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
L
Liu Jicong 已提交
96
    if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
L
Liu Jicong 已提交
97
  }
98
  if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1;
L
Liu Jicong 已提交
99

L
Liu Jicong 已提交
100
  tEndEncode(pEncoder);
L
Liu Jicong 已提交
101 102 103 104
  return pEncoder->pos;
}

int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
L
Liu Jicong 已提交
105
  if (tStartDecode(pDecoder) < 0) return -1;
L
Liu Jicong 已提交
106 107
  if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
L
Liu Jicong 已提交
108
  if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1;
109 110
  if (tDecodeI8(pDecoder, &pTask->taskLevel) < 0) return -1;
  if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1;
L
Liu Jicong 已提交
111
  if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
112 113

  if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1;
L
Liu Jicong 已提交
114
  if (tDecodeI8(pDecoder, &pTask->schedStatus) < 0) return -1;
L
Liu Jicong 已提交
115

L
Liu Jicong 已提交
116
  if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1;
L
Liu Jicong 已提交
117 118 119
  if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
  if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;

L
Liu Jicong 已提交
120 121 122 123
  if (tDecodeI64(pDecoder, &pTask->recoverSnapVer) < 0) return -1;
  if (tDecodeI64(pDecoder, &pTask->startVer) < 0) return -1;
  if (tDecodeI8(pDecoder, &pTask->fillHistory) < 0) return -1;

L
Liu Jicong 已提交
124 125 126 127 128 129
  int32_t epSz;
  if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
  pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*));
  for (int32_t i = 0; i < epSz; i++) {
    SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
    if (pInfo == NULL) return -1;
5
54liuyao 已提交
130 131 132 133
    if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) {
      taosMemoryFreeClear(pInfo);
      return -1;
    }
L
Liu Jicong 已提交
134 135 136
    taosArrayPush(pTask->childEpInfo, &pInfo);
  }

137
  if (pTask->taskLevel != TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
138 139 140
    if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
  }

141
  if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
142 143 144 145 146
    if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
    if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1;
    pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
    if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
    if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
147
  } else if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
148
    if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
149
  } else if (pTask->outputType == TASK_OUTPUT__FETCH) {
L
Liu Jicong 已提交
150
    if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
151
  } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
L
Liu Jicong 已提交
152 153 154
    if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1;
    if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
    if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
155
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
156
    if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
L
Liu Jicong 已提交
157
    if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
L
Liu Jicong 已提交
158
  }
159
  if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1;
L
Liu Jicong 已提交
160

L
Liu Jicong 已提交
161
  tEndDecode(pDecoder);
L
Liu Jicong 已提交
162 163 164 165
  return 0;
}

void tFreeSStreamTask(SStreamTask* pTask) {
L
Liu Jicong 已提交
166
  qDebug("free stream task %d", pTask->taskId);
L
Liu Jicong 已提交
167 168
  if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
  if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
L
Liu Jicong 已提交
169
  if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
170
  if (pTask->exec.executor) qDestroyTask(pTask->exec.executor);
L
Liu Jicong 已提交
171
  taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree);
L
Liu Jicong 已提交
172 173 174 175 176 177 178
  if (pTask->outputType == TASK_OUTPUT__TABLE) {
    tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper);
    taosMemoryFree(pTask->tbSink.pTSchema);
  }
  if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
  }
179 180 181

  if (pTask->pState) streamStateClose(pTask->pState);

L
Liu Jicong 已提交
182 183
  taosMemoryFree(pTask);
}