streamExec.c 8.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17

L
Liu Jicong 已提交
18
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
L
Liu Jicong 已提交
19 20 21
  void* exec = pTask->exec.executor;

  // set input
L
Liu Jicong 已提交
22
  const SStreamQueueItem* pItem = (const SStreamQueueItem*)data;
L
Liu Jicong 已提交
23
  if (pItem->type == STREAM_INPUT__GET_RES) {
L
Liu Jicong 已提交
24
    const SStreamTrigger* pTrigger = (const SStreamTrigger*)data;
L
Liu Jicong 已提交
25
    qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
26
  } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
27
    ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
L
Liu Jicong 已提交
28
    const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data;
J
jiajingbin 已提交
29
    qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
L
Liu Jicong 已提交
30
    qSetMultiStreamInput(exec, pSubmit->data, 1, STREAM_INPUT__DATA_SUBMIT);
31
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
32 33
    const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data;
    SArray*                 blocks = pBlock->blocks;
L
Liu Jicong 已提交
34
    qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
L
Liu Jicong 已提交
35
    qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK);
36
  } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
L
Liu Jicong 已提交
37 38
    const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data;
    SArray*                    blocks = pMerged->reqs;
L
Liu Jicong 已提交
39
    qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size);
L
Liu Jicong 已提交
40
    qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT);
41 42
  } else {
    ASSERT(0);
L
Liu Jicong 已提交
43 44 45 46 47 48 49 50 51
  }

  // exec
  while (1) {
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
    if (qExecTask(exec, &output, &ts) < 0) {
      ASSERT(false);
    }
52
    if (output == NULL) {
5
54liuyao 已提交
53
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
54 55
        SSDataBlock             block = {0};
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)data;
5
54liuyao 已提交
56
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
L
Liu Jicong 已提交
57
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
58
        block.info.type = STREAM_PULL_OVER;
L
Liu Jicong 已提交
59 60
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
L
Liu Jicong 已提交
61

S
Shengliang Guan 已提交
62
        qDebug("task %d(child %d) processed retrieve, reqId %" PRId64, pTask->taskId, pTask->selfChildId,
L
Liu Jicong 已提交
63
               pRetrieveBlock->reqId);
64 65 66
      }
      break;
    }
L
Liu Jicong 已提交
67 68 69 70 71 72 73 74

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

75
    qDebug("task %d(child %d) executed and get block", pTask->taskId, pTask->selfChildId);
J
jiajingbin 已提交
76

L
Liu Jicong 已提交
77 78 79 80
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
    taosArrayPush(pRes, &block);
L
Liu Jicong 已提交
81 82 83 84
  }
  return 0;
}

L
Liu Jicong 已提交
85
#if 0
L
Liu Jicong 已提交
86 87 88 89 90
static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock* pBlock) {
  ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
  int32_t             childId = pBlock->childId;
  int64_t             ver = pBlock->sourceVer;
  SStreamChildEpInfo* pChildInfo = taosArrayGetP(pTask->childEpInfo, childId);
L
Liu Jicong 已提交
91
  /*pChildInfo-> = ver;*/
L
Liu Jicong 已提交
92 93
  return 0;
}
L
Liu Jicong 已提交
94
#endif
L
Liu Jicong 已提交
95

96
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
97
  ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143

  void* exec = pTask->exec.executor;

  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
        ASSERT(0);
      }
      if (output == NULL) break;

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
      block.info.childId = pTask->selfChildId;
      taosArrayPush(pRes, &block);

      if (++batchCnt >= batchNum) break;
    }
    if (taosArrayGetSize(pRes) == 0) {
      taosArrayDestroy(pRes);
      break;
    }
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
    qRes->childId = pTask->selfChildId;

    if (streamTaskOutput(pTask, qRes) < 0) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      taosFreeQitem(qRes);
      return -1;
    }

144
    if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
145
      streamDispatch(pTask);
146 147 148 149 150
    }
  }

  return 0;
}
L
Liu Jicong 已提交
151 152
// TODO: handle version
int32_t streamExecForAll(SStreamTask* pTask) {
L
Liu Jicong 已提交
153
  while (1) {
L
Liu Jicong 已提交
154
    int32_t cnt = 1;
L
Liu Jicong 已提交
155 156 157 158
    void*   data = NULL;
    while (1) {
      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
L
Liu Jicong 已提交
159
        qDebug("stream task exec over, queue empty, task: %d", pTask->taskId);
L
Liu Jicong 已提交
160
        break;
L
Liu Jicong 已提交
161 162 163
      }
      if (data == NULL) {
        data = qItem;
164
        streamQueueProcessSuccess(pTask->inputQueue);
165
        if (pTask->taskLevel == TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
166
          break;
L
Liu Jicong 已提交
167
        }
L
Liu Jicong 已提交
168
      } else {
J
jiacy-jcy 已提交
169 170
        void* newRet;
        if ((newRet = streamAppendQueueItem(data, qItem)) == NULL) {
L
Liu Jicong 已提交
171 172 173 174
          streamQueueProcessFail(pTask->inputQueue);
          break;
        } else {
          cnt++;
J
jiacy-jcy 已提交
175
          data = newRet;
L
Liu Jicong 已提交
176 177 178
          /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
          streamQueueProcessSuccess(pTask->inputQueue);
        }
L
Liu Jicong 已提交
179 180
      }
    }
181

L
Liu Jicong 已提交
182 183
    if (pTask->taskStatus == TASK_STATUS__DROPPING) {
      if (data) streamFreeQitem(data);
L
Liu Jicong 已提交
184
      return 0;
L
Liu Jicong 已提交
185
    }
L
Liu Jicong 已提交
186

L
Liu Jicong 已提交
187
    if (data == NULL) {
L
Liu Jicong 已提交
188 189 190
      break;
    }

191
    if (pTask->taskLevel == TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
192 193 194
      ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
      streamTaskOutput(pTask, data);
      continue;
L
Liu Jicong 已提交
195
    }
L
Liu Jicong 已提交
196

L
Liu Jicong 已提交
197 198
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));

L
Liu Jicong 已提交
199 200 201
    qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
    streamTaskExecImpl(pTask, data, pRes);
    qDebug("stream task %d exec end", pTask->taskId);
202

L
Liu Jicong 已提交
203 204 205
    if (taosArrayGetSize(pRes) != 0) {
      SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
      if (qRes == NULL) {
206
        // TODO log failed ver
L
Liu Jicong 已提交
207 208
        streamQueueProcessFail(pTask->inputQueue);
        taosArrayDestroy(pRes);
L
Liu Jicong 已提交
209
        streamFreeQitem(data);
L
Liu Jicong 已提交
210
        return -1;
L
Liu Jicong 已提交
211 212 213
      }
      qRes->type = STREAM_INPUT__DATA_BLOCK;
      qRes->blocks = pRes;
L
Liu Jicong 已提交
214

L
Liu Jicong 已提交
215 216 217 218 219
      if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) {
        SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
        qRes->childId = pTask->selfChildId;
        qRes->sourceVer = pSubmit->ver;
      }
L
Liu Jicong 已提交
220 221 222 223 224 225

      if (streamTaskOutput(pTask, qRes) < 0) {
        // TODO save failed ver
        /*streamQueueProcessFail(pTask->inputQueue);*/
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
        taosFreeQitem(qRes);
L
Liu Jicong 已提交
226
        streamFreeQitem(data);
L
Liu Jicong 已提交
227 228
        return -1;
      }
L
Liu Jicong 已提交
229
      /*streamQueueProcessSuccess(pTask->inputQueue);*/
L
Liu Jicong 已提交
230 231
    } else {
      taosArrayDestroy(pRes);
L
Liu Jicong 已提交
232
    }
L
Liu Jicong 已提交
233
    streamFreeQitem(data);
L
Liu Jicong 已提交
234
  }
L
Liu Jicong 已提交
235
  return 0;
L
Liu Jicong 已提交
236 237
}

L
Liu Jicong 已提交
238 239 240 241 242 243 244 245 246 247
int32_t streamTryExec(SStreamTask* pTask) {
  int8_t schedStatus =
      atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
    if (code < 0) {
      atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__FAILED);
      return -1;
    }
    atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE);
L
Liu Jicong 已提交
248

L
Liu Jicong 已提交
249 250
    if (!taosQueueEmpty(pTask->inputQueue->queue)) {
      streamSchedExec(pTask);
L
Liu Jicong 已提交
251 252
    }
  }
L
Liu Jicong 已提交
253
  return 0;
L
Liu Jicong 已提交
254
}