streamExec.c 8.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17 18 19 20 21

static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
  void* exec = pTask->exec.executor;

  // set input
22
  SStreamQueueItem* pItem = (SStreamQueueItem*)data;
L
Liu Jicong 已提交
23
  if (pItem->type == STREAM_INPUT__GET_RES) {
24
    SStreamTrigger* pTrigger = (SStreamTrigger*)data;
L
Liu Jicong 已提交
25
    qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK, false);
26
  } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
27
    ASSERT(pTask->isDataScan);
L
Liu Jicong 已提交
28
    SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
L
Liu Jicong 已提交
29
    qDebug("task %d %p set submit input %p %p %d", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
L
Liu Jicong 已提交
30
    qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT, false);
31
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
32
    SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
33
    SArray*           blocks = pBlock->blocks;
L
Liu Jicong 已提交
34
    qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
L
Liu Jicong 已提交
35
    qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK, false);
L
Liu Jicong 已提交
36 37 38
  } else if (pItem->type == STREAM_INPUT__DROP) {
    // TODO exec drop
    return 0;
L
Liu Jicong 已提交
39 40 41 42 43 44 45 46 47
  }

  // exec
  while (1) {
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
    if (qExecTask(exec, &output, &ts) < 0) {
      ASSERT(false);
    }
48
    if (output == NULL) {
5
54liuyao 已提交
49
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
50
        SSDataBlock       block = {0};
5
54liuyao 已提交
51 52
        SStreamDataBlock* pRetrieveBlock = (SStreamDataBlock*)data;
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
L
Liu Jicong 已提交
53
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
54
        block.info.type = STREAM_PULL_OVER;
L
Liu Jicong 已提交
55 56
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
L
Liu Jicong 已提交
57 58 59

        qDebug("task %d(child %d) processed retrieve, reqId %ld", pTask->taskId, pTask->selfChildId,
               pRetrieveBlock->reqId);
60 61 62
      }
      break;
    }
L
Liu Jicong 已提交
63 64 65 66 67 68 69 70

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
71 72 73 74
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
    taosArrayPush(pRes, &block);
L
Liu Jicong 已提交
75 76 77 78
  }
  return 0;
}

L
Liu Jicong 已提交
79 80 81 82 83 84 85 86 87
static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock* pBlock) {
  ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
  int32_t             childId = pBlock->childId;
  int64_t             ver = pBlock->sourceVer;
  SStreamChildEpInfo* pChildInfo = taosArrayGetP(pTask->childEpInfo, childId);
  pChildInfo->processedVer = ver;
  return 0;
}

88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
  ASSERT(pTask->execType != TASK_EXEC__NONE);

  void* exec = pTask->exec.executor;

  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
        ASSERT(0);
      }
      if (output == NULL) break;

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
      block.info.childId = pTask->selfChildId;
      taosArrayPush(pRes, &block);

      if (++batchCnt >= batchNum) break;
    }
    if (taosArrayGetSize(pRes) == 0) {
      taosArrayDestroy(pRes);
      break;
    }
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
    qRes->childId = pTask->selfChildId;

    if (streamTaskOutput(pTask, qRes) < 0) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      taosFreeQitem(qRes);
      return -1;
    }

    if (pTask->dispatchType != TASK_DISPATCH__NONE) {
      ASSERT(pTask->sinkType == TASK_SINK__NONE);
      streamDispatch(pTask, pTask->pMsgCb);
    }
  }

  return 0;
}

L
Liu Jicong 已提交
145 146
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
  while (1) {
L
Liu Jicong 已提交
147 148 149 150 151 152
    int32_t cnt = 0;
    void*   data = NULL;
    while (1) {
      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
        qDebug("stream exec over, queue empty");
L
Liu Jicong 已提交
153
        break;
L
Liu Jicong 已提交
154 155 156
      }
      if (data == NULL) {
        data = qItem;
157
        streamQueueProcessSuccess(pTask->inputQueue);
L
Liu Jicong 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
        if (qItem->type == STREAM_INPUT__DATA_BLOCK) {
          /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
        } else {
          break;
        }
      } else {
        if (streamAppendQueueItem(data, qItem) < 0) {
          streamQueueProcessFail(pTask->inputQueue);
          break;
        } else {
          cnt++;
          /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
          streamQueueProcessSuccess(pTask->inputQueue);
          taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
          taosFreeQitem(qItem);
        }
L
Liu Jicong 已提交
174 175
      }
    }
L
Liu Jicong 已提交
176 177 178 179 180
    if (pTask->taskStatus == TASK_STATUS__DROPPING) {
      if (data) streamFreeQitem(data);
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      return NULL;
    }
L
Liu Jicong 已提交
181

L
Liu Jicong 已提交
182
    if (data == NULL) break;
L
Liu Jicong 已提交
183

L
Liu Jicong 已提交
184 185 186 187 188
    if (pTask->execType == TASK_EXEC__NONE) {
      ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
      streamTaskOutput(pTask, data);
      return pRes;
    }
L
Liu Jicong 已提交
189

L
Liu Jicong 已提交
190 191 192
    qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
    streamTaskExecImpl(pTask, data, pRes);
    qDebug("stream task %d exec end", pTask->taskId);
193

L
Liu Jicong 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
    if (taosArrayGetSize(pRes) != 0) {
      SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
      if (qRes == NULL) {
        streamQueueProcessFail(pTask->inputQueue);
        taosArrayDestroy(pRes);
        return NULL;
      }
      qRes->type = STREAM_INPUT__DATA_BLOCK;
      qRes->blocks = pRes;
      if (streamTaskOutput(pTask, qRes) < 0) {
        /*streamQueueProcessFail(pTask->inputQueue);*/
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
        taosFreeQitem(qRes);
        return NULL;
      }
      if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) {
        SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
        qRes->childId = pTask->selfChildId;
        qRes->sourceVer = pSubmit->ver;
      }
      /*streamQueueProcessSuccess(pTask->inputQueue);*/
      pRes = taosArrayInit(0, sizeof(SSDataBlock));
L
Liu Jicong 已提交
216
    }
217

L
Liu Jicong 已提交
218 219
    streamFreeQitem(data);
  }
L
Liu Jicong 已提交
220 221 222 223 224 225 226 227
  return pRes;
}

// TODO: handle version
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
  SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
  if (pRes == NULL) return -1;
  while (1) {
L
Liu Jicong 已提交
228 229 230
    int8_t execStatus =
        atomic_val_compare_exchange_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE, TASK_EXEC_STATUS__EXECUTING);
    if (execStatus == TASK_EXEC_STATUS__IDLE) {
L
Liu Jicong 已提交
231
      // first run
L
Liu Jicong 已提交
232
      qDebug("stream exec, enter exec status");
L
Liu Jicong 已提交
233 234 235 236
      pRes = streamExecForQall(pTask, pRes);
      if (pRes == NULL) goto FAIL;

      // set status closing
L
Liu Jicong 已提交
237
      atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING);
L
Liu Jicong 已提交
238 239

      // second run, make sure inputQ and qall are cleared
L
Liu Jicong 已提交
240
      qDebug("stream exec, enter closing status");
L
Liu Jicong 已提交
241 242 243
      pRes = streamExecForQall(pTask, pRes);
      if (pRes == NULL) goto FAIL;

244
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
245
      atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
L
Liu Jicong 已提交
246
      qDebug("stream exec, return result");
L
Liu Jicong 已提交
247
      return 0;
L
Liu Jicong 已提交
248
    } else if (execStatus == TASK_EXEC_STATUS__CLOSING) {
L
Liu Jicong 已提交
249
      continue;
L
Liu Jicong 已提交
250
    } else if (execStatus == TASK_EXEC_STATUS__EXECUTING) {
L
Liu Jicong 已提交
251
      ASSERT(taosArrayGetSize(pRes) == 0);
252
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
253
      return 0;
L
Liu Jicong 已提交
254 255 256 257 258 259
    } else {
      ASSERT(0);
    }
  }
FAIL:
  if (pRes) taosArrayDestroy(pRes);
260 261 262 263 264 265 266
  if (pTask->taskStatus == TASK_STATUS__DROPPING) {
    tFreeSStreamTask(pTask);
    return 0;
  } else {
    atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
    return -1;
  }
L
Liu Jicong 已提交
267
}