streamExec.c 8.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17 18 19 20 21

static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
  void* exec = pTask->exec.executor;

  // set input
22
  SStreamQueueItem* pItem = (SStreamQueueItem*)data;
L
Liu Jicong 已提交
23
  if (pItem->type == STREAM_INPUT__GET_RES) {
24
    SStreamTrigger* pTrigger = (SStreamTrigger*)data;
L
Liu Jicong 已提交
25
    qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
26
  } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
27
    ASSERT(pTask->isDataScan);
L
Liu Jicong 已提交
28
    SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
J
jiajingbin 已提交
29
    qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
L
Liu Jicong 已提交
30
    qSetMultiStreamInput(exec, pSubmit->data, 1, STREAM_INPUT__DATA_SUBMIT);
31
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
32
    SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
33
    SArray*           blocks = pBlock->blocks;
L
Liu Jicong 已提交
34
    qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
L
Liu Jicong 已提交
35
    qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK);
36 37 38
  } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
    SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data;
    SArray*              blocks = pMerged->reqs;
L
Liu Jicong 已提交
39
    qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size);
L
Liu Jicong 已提交
40
    qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT);
41 42
  } else {
    ASSERT(0);
L
Liu Jicong 已提交
43 44 45 46 47 48 49 50 51
  }

  // exec
  while (1) {
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
    if (qExecTask(exec, &output, &ts) < 0) {
      ASSERT(false);
    }
52
    if (output == NULL) {
5
54liuyao 已提交
53
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
54
        SSDataBlock       block = {0};
5
54liuyao 已提交
55 56
        SStreamDataBlock* pRetrieveBlock = (SStreamDataBlock*)data;
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
L
Liu Jicong 已提交
57
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
58
        block.info.type = STREAM_PULL_OVER;
L
Liu Jicong 已提交
59 60
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
L
Liu Jicong 已提交
61 62 63

        qDebug("task %d(child %d) processed retrieve, reqId %ld", pTask->taskId, pTask->selfChildId,
               pRetrieveBlock->reqId);
64 65 66
      }
      break;
    }
L
Liu Jicong 已提交
67 68 69 70 71 72 73 74

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

J
jiajingbin 已提交
75 76
    qDebug("task %d(child %d) executed and get block");

L
Liu Jicong 已提交
77 78 79 80
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
    taosArrayPush(pRes, &block);
L
Liu Jicong 已提交
81 82 83 84
  }
  return 0;
}

L
Liu Jicong 已提交
85 86 87 88 89 90 91 92 93
static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock* pBlock) {
  ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
  int32_t             childId = pBlock->childId;
  int64_t             ver = pBlock->sourceVer;
  SStreamChildEpInfo* pChildInfo = taosArrayGetP(pTask->childEpInfo, childId);
  pChildInfo->processedVer = ver;
  return 0;
}

94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
  ASSERT(pTask->execType != TASK_EXEC__NONE);

  void* exec = pTask->exec.executor;

  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
        ASSERT(0);
      }
      if (output == NULL) break;

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
      block.info.childId = pTask->selfChildId;
      taosArrayPush(pRes, &block);

      if (++batchCnt >= batchNum) break;
    }
    if (taosArrayGetSize(pRes) == 0) {
      taosArrayDestroy(pRes);
      break;
    }
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
    qRes->childId = pTask->selfChildId;

    if (streamTaskOutput(pTask, qRes) < 0) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      taosFreeQitem(qRes);
      return -1;
    }

    if (pTask->dispatchType != TASK_DISPATCH__NONE) {
      ASSERT(pTask->sinkType == TASK_SINK__NONE);
L
Liu Jicong 已提交
144
      streamDispatch(pTask);
145 146 147 148 149 150
    }
  }

  return 0;
}

L
Liu Jicong 已提交
151 152
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
  while (1) {
L
Liu Jicong 已提交
153
    int32_t cnt = 1;
L
Liu Jicong 已提交
154 155 156 157 158
    void*   data = NULL;
    while (1) {
      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
        qDebug("stream exec over, queue empty");
L
Liu Jicong 已提交
159
        break;
L
Liu Jicong 已提交
160 161 162
      }
      if (data == NULL) {
        data = qItem;
163
        streamQueueProcessSuccess(pTask->inputQueue);
164
        if (pTask->execType == TASK_EXEC__NONE) break;
165 166 167
        /*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/
        /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
        /*}*/
L
Liu Jicong 已提交
168
      } else {
J
jiacy-jcy 已提交
169 170
        void* newRet;
        if ((newRet = streamAppendQueueItem(data, qItem)) == NULL) {
L
Liu Jicong 已提交
171 172 173 174
          streamQueueProcessFail(pTask->inputQueue);
          break;
        } else {
          cnt++;
J
jiacy-jcy 已提交
175
          data = newRet;
L
Liu Jicong 已提交
176 177 178
          /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
          streamQueueProcessSuccess(pTask->inputQueue);
        }
L
Liu Jicong 已提交
179 180
      }
    }
181

L
Liu Jicong 已提交
182 183 184 185 186
    if (pTask->taskStatus == TASK_STATUS__DROPPING) {
      if (data) streamFreeQitem(data);
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      return NULL;
    }
L
Liu Jicong 已提交
187

L
Liu Jicong 已提交
188
    if (data == NULL) break;
L
Liu Jicong 已提交
189

L
Liu Jicong 已提交
190 191 192
    if (pTask->execType == TASK_EXEC__NONE) {
      ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
      streamTaskOutput(pTask, data);
J
jiajingbin 已提交
193
      continue;
L
Liu Jicong 已提交
194
    }
L
Liu Jicong 已提交
195

L
Liu Jicong 已提交
196 197 198
    qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
    streamTaskExecImpl(pTask, data, pRes);
    qDebug("stream task %d exec end", pTask->taskId);
199

L
Liu Jicong 已提交
200 201 202
    if (taosArrayGetSize(pRes) != 0) {
      SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
      if (qRes == NULL) {
203
        // TODO log failed ver
L
Liu Jicong 已提交
204 205 206 207 208 209 210
        streamQueueProcessFail(pTask->inputQueue);
        taosArrayDestroy(pRes);
        return NULL;
      }
      qRes->type = STREAM_INPUT__DATA_BLOCK;
      qRes->blocks = pRes;
      if (streamTaskOutput(pTask, qRes) < 0) {
211
        // TODO log failed ver
L
Liu Jicong 已提交
212 213 214 215 216 217 218 219 220 221 222 223
        /*streamQueueProcessFail(pTask->inputQueue);*/
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
        taosFreeQitem(qRes);
        return NULL;
      }
      if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) {
        SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
        qRes->childId = pTask->selfChildId;
        qRes->sourceVer = pSubmit->ver;
      }
      /*streamQueueProcessSuccess(pTask->inputQueue);*/
      pRes = taosArrayInit(0, sizeof(SSDataBlock));
L
Liu Jicong 已提交
224
    }
225

L
Liu Jicong 已提交
226 227
    streamFreeQitem(data);
  }
L
Liu Jicong 已提交
228 229 230 231
  return pRes;
}

// TODO: handle version
L
Liu Jicong 已提交
232
int32_t streamExec(SStreamTask* pTask) {
L
Liu Jicong 已提交
233 234 235
  SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
  if (pRes == NULL) return -1;
  while (1) {
L
Liu Jicong 已提交
236 237 238
    int8_t execStatus =
        atomic_val_compare_exchange_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE, TASK_EXEC_STATUS__EXECUTING);
    if (execStatus == TASK_EXEC_STATUS__IDLE) {
L
Liu Jicong 已提交
239
      // first run
L
Liu Jicong 已提交
240
      qDebug("stream exec, enter exec status");
L
Liu Jicong 已提交
241 242 243 244
      pRes = streamExecForQall(pTask, pRes);
      if (pRes == NULL) goto FAIL;

      // set status closing
L
Liu Jicong 已提交
245
      atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING);
L
Liu Jicong 已提交
246 247

      // second run, make sure inputQ and qall are cleared
L
Liu Jicong 已提交
248
      qDebug("stream exec, enter closing status");
L
Liu Jicong 已提交
249 250 251
      pRes = streamExecForQall(pTask, pRes);
      if (pRes == NULL) goto FAIL;

252
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
253
      atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
L
Liu Jicong 已提交
254
      qDebug("stream exec, return result");
L
Liu Jicong 已提交
255
      return 0;
L
Liu Jicong 已提交
256
    } else if (execStatus == TASK_EXEC_STATUS__CLOSING) {
L
Liu Jicong 已提交
257
      continue;
L
Liu Jicong 已提交
258
    } else if (execStatus == TASK_EXEC_STATUS__EXECUTING) {
L
Liu Jicong 已提交
259
      ASSERT(taosArrayGetSize(pRes) == 0);
260
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
261
      return 0;
L
Liu Jicong 已提交
262 263 264 265 266 267
    } else {
      ASSERT(0);
    }
  }
FAIL:
  if (pRes) taosArrayDestroy(pRes);
268 269 270 271 272 273 274
  if (pTask->taskStatus == TASK_STATUS__DROPPING) {
    tFreeSStreamTask(pTask);
    return 0;
  } else {
    atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
    return -1;
  }
L
Liu Jicong 已提交
275
}