streamExec.c 9.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17

L
Liu Jicong 已提交
18
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
L
Liu Jicong 已提交
19 20 21
  void* exec = pTask->exec.executor;

  // set input
L
Liu Jicong 已提交
22
  const SStreamQueueItem* pItem = (const SStreamQueueItem*)data;
L
Liu Jicong 已提交
23
  if (pItem->type == STREAM_INPUT__GET_RES) {
L
Liu Jicong 已提交
24
    const SStreamTrigger* pTrigger = (const SStreamTrigger*)data;
L
Liu Jicong 已提交
25
    qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
26
  } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
27
    ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
L
Liu Jicong 已提交
28
    const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data;
J
jiajingbin 已提交
29
    qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
L
Liu Jicong 已提交
30
    qSetMultiStreamInput(exec, pSubmit->data, 1, STREAM_INPUT__DATA_SUBMIT);
31
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
32 33
    const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data;
    SArray*                 blocks = pBlock->blocks;
L
Liu Jicong 已提交
34
    qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
L
Liu Jicong 已提交
35
    qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK);
36
  } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
L
Liu Jicong 已提交
37 38
    const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data;
    SArray*                    blocks = pMerged->reqs;
L
Liu Jicong 已提交
39
    qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size);
L
Liu Jicong 已提交
40
    qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT);
L
Liu Jicong 已提交
41 42 43
  } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
    const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data;
    qSetMultiStreamInput(exec, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
44 45
  } else {
    ASSERT(0);
L
Liu Jicong 已提交
46 47 48 49 50 51 52 53 54
  }

  // exec
  while (1) {
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
    if (qExecTask(exec, &output, &ts) < 0) {
      ASSERT(false);
    }
55
    if (output == NULL) {
5
54liuyao 已提交
56
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
57 58
        SSDataBlock             block = {0};
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)data;
5
54liuyao 已提交
59
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
L
Liu Jicong 已提交
60
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
61
        block.info.type = STREAM_PULL_OVER;
L
Liu Jicong 已提交
62 63
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
L
Liu Jicong 已提交
64

S
Shengliang Guan 已提交
65
        qDebug("task %d(child %d) processed retrieve, reqId %" PRId64, pTask->taskId, pTask->selfChildId,
L
Liu Jicong 已提交
66
               pRetrieveBlock->reqId);
67 68 69
      }
      break;
    }
L
Liu Jicong 已提交
70 71 72 73 74 75 76 77

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

78
    qDebug("task %d(child %d) executed and get block", pTask->taskId, pTask->selfChildId);
J
jiajingbin 已提交
79

L
Liu Jicong 已提交
80 81 82 83
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
    taosArrayPush(pRes, &block);
L
Liu Jicong 已提交
84 85 86 87
  }
  return 0;
}

88 89 90 91 92
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
  ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);

  void* exec = pTask->exec.executor;

L
Liu Jicong 已提交
93 94
  qSetStreamOpOpen(exec);

95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
        ASSERT(0);
      }
      if (output == NULL) break;

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
      block.info.childId = pTask->selfChildId;
      taosArrayPush(pRes, &block);

      if (++batchCnt >= batchSz) break;
    }
    if (taosArrayGetSize(pRes) == 0) {
      taosArrayDestroy(pRes);
      break;
    }
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
    streamTaskOutput(pTask, qRes);
L
Liu Jicong 已提交
132 133 134 135

    if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
      streamDispatch(pTask);
    }
136 137 138 139 140
  }
  return 0;
}

#if 0
L
Liu Jicong 已提交
141
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) {
142
  ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172

  void* exec = pTask->exec.executor;

  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
        ASSERT(0);
      }
      if (output == NULL) break;

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
      block.info.childId = pTask->selfChildId;
      taosArrayPush(pRes, &block);

      if (++batchCnt >= batchNum) break;
    }
    if (taosArrayGetSize(pRes) == 0) {
      taosArrayDestroy(pRes);
      break;
    }
L
Liu Jicong 已提交
173 174 175 176 177 178
    if (dispatch) {
      SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
      if (qRes == NULL) {
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
        return -1;
      }
179

L
Liu Jicong 已提交
180 181 182
      qRes->type = STREAM_INPUT__DATA_BLOCK;
      qRes->blocks = pRes;
      qRes->childId = pTask->selfChildId;
183

L
Liu Jicong 已提交
184 185 186 187 188
      if (streamTaskOutput(pTask, qRes) < 0) {
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
        taosFreeQitem(qRes);
        return -1;
      }
189

L
Liu Jicong 已提交
190 191 192
      if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
        streamDispatch(pTask);
      }
5
54liuyao 已提交
193 194
    } else {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
195 196 197 198 199
    }
  }

  return 0;
}
200
#endif
L
Liu Jicong 已提交
201

L
Liu Jicong 已提交
202
int32_t streamExecForAll(SStreamTask* pTask) {
L
Liu Jicong 已提交
203
  while (1) {
L
Liu Jicong 已提交
204
    int32_t batchCnt = 1;
L
Liu Jicong 已提交
205 206 207 208
    void*   data = NULL;
    while (1) {
      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
L
Liu Jicong 已提交
209
        qDebug("stream task exec over, queue empty, task: %d", pTask->taskId);
L
Liu Jicong 已提交
210
        break;
L
Liu Jicong 已提交
211 212 213
      }
      if (data == NULL) {
        data = qItem;
214
        streamQueueProcessSuccess(pTask->inputQueue);
215
        if (pTask->taskLevel == TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
216
          break;
L
Liu Jicong 已提交
217
        }
L
Liu Jicong 已提交
218
      } else {
J
jiacy-jcy 已提交
219
        void* newRet;
L
Liu Jicong 已提交
220
        if ((newRet = streamMergeQueueItem(data, qItem)) == NULL) {
L
Liu Jicong 已提交
221 222 223
          streamQueueProcessFail(pTask->inputQueue);
          break;
        } else {
L
Liu Jicong 已提交
224
          batchCnt++;
J
jiacy-jcy 已提交
225
          data = newRet;
L
Liu Jicong 已提交
226 227
          streamQueueProcessSuccess(pTask->inputQueue);
        }
L
Liu Jicong 已提交
228 229
      }
    }
230

L
Liu Jicong 已提交
231 232
    if (pTask->taskStatus == TASK_STATUS__DROPPING) {
      if (data) streamFreeQitem(data);
L
Liu Jicong 已提交
233
      return 0;
L
Liu Jicong 已提交
234
    }
L
Liu Jicong 已提交
235

L
Liu Jicong 已提交
236
    if (data == NULL) {
L
Liu Jicong 已提交
237 238 239
      break;
    }

240
    if (pTask->taskLevel == TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
241 242 243
      ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
      streamTaskOutput(pTask, data);
      continue;
L
Liu Jicong 已提交
244
    }
L
Liu Jicong 已提交
245

L
Liu Jicong 已提交
246 247
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));

L
Liu Jicong 已提交
248
    qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, batchCnt);
L
Liu Jicong 已提交
249 250
    streamTaskExecImpl(pTask, data, pRes);
    qDebug("stream task %d exec end", pTask->taskId);
251

L
Liu Jicong 已提交
252 253 254
    if (taosArrayGetSize(pRes) != 0) {
      SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
      if (qRes == NULL) {
L
Liu Jicong 已提交
255
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
256
        streamFreeQitem(data);
L
Liu Jicong 已提交
257
        return -1;
L
Liu Jicong 已提交
258 259 260
      }
      qRes->type = STREAM_INPUT__DATA_BLOCK;
      qRes->blocks = pRes;
L
Liu Jicong 已提交
261

L
Liu Jicong 已提交
262 263 264 265
      if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) {
        SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
        qRes->childId = pTask->selfChildId;
        qRes->sourceVer = pSubmit->ver;
L
Liu Jicong 已提交
266 267 268 269
      } else if (((SStreamQueueItem*)data)->type == STREAM_INPUT__MERGED_SUBMIT) {
        SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data;
        qRes->childId = pTask->selfChildId;
        qRes->sourceVer = pMerged->ver;
L
Liu Jicong 已提交
270
      }
L
Liu Jicong 已提交
271 272 273

      if (streamTaskOutput(pTask, qRes) < 0) {
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
274
        streamFreeQitem(data);
L
Liu Jicong 已提交
275
        taosFreeQitem(qRes);
L
Liu Jicong 已提交
276 277
        return -1;
      }
L
Liu Jicong 已提交
278 279
    } else {
      taosArrayDestroy(pRes);
L
Liu Jicong 已提交
280
    }
L
Liu Jicong 已提交
281
    streamFreeQitem(data);
L
Liu Jicong 已提交
282
  }
L
Liu Jicong 已提交
283
  return 0;
L
Liu Jicong 已提交
284 285
}

L
Liu Jicong 已提交
286 287 288 289 290 291 292 293 294 295
int32_t streamTryExec(SStreamTask* pTask) {
  int8_t schedStatus =
      atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
    if (code < 0) {
      atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__FAILED);
      return -1;
    }
    atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE);
L
Liu Jicong 已提交
296

L
Liu Jicong 已提交
297 298
    if (!taosQueueEmpty(pTask->inputQueue->queue)) {
      streamSchedExec(pTask);
L
Liu Jicong 已提交
299 300
    }
  }
L
Liu Jicong 已提交
301
  return 0;
L
Liu Jicong 已提交
302
}