streamExec.c 8.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17 18 19 20 21

static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
  void* exec = pTask->exec.executor;

  // set input
22
  SStreamQueueItem* pItem = (SStreamQueueItem*)data;
L
Liu Jicong 已提交
23
  if (pItem->type == STREAM_INPUT__GET_RES) {
24
    SStreamTrigger* pTrigger = (SStreamTrigger*)data;
L
Liu Jicong 已提交
25
    qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK, false);
26
  } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
27
    ASSERT(pTask->isDataScan);
L
Liu Jicong 已提交
28
    SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
L
Liu Jicong 已提交
29
    qDebug("task %d %p set submit input %p %p %d", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
L
Liu Jicong 已提交
30
    qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT, false);
31
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
32
    SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
33
    SArray*           blocks = pBlock->blocks;
L
Liu Jicong 已提交
34
    qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
L
Liu Jicong 已提交
35
    qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK, false);
L
Liu Jicong 已提交
36 37 38
  } else if (pItem->type == STREAM_INPUT__DROP) {
    // TODO exec drop
    return 0;
L
Liu Jicong 已提交
39 40 41 42 43 44 45 46 47
  }

  // exec
  while (1) {
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
    if (qExecTask(exec, &output, &ts) < 0) {
      ASSERT(false);
    }
48
    if (output == NULL) {
5
54liuyao 已提交
49
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
50
        SSDataBlock       block = {0};
5
54liuyao 已提交
51 52
        SStreamDataBlock* pRetrieveBlock = (SStreamDataBlock*)data;
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
L
Liu Jicong 已提交
53
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
54
        block.info.type = STREAM_PULL_OVER;
L
Liu Jicong 已提交
55 56
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
57 58 59
      }
      break;
    }
L
Liu Jicong 已提交
60 61 62 63 64 65 66 67

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
68 69 70 71
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
    taosArrayPush(pRes, &block);
L
Liu Jicong 已提交
72 73 74 75
  }
  return 0;
}

L
Liu Jicong 已提交
76 77 78 79 80 81 82 83 84
static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock* pBlock) {
  ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
  int32_t             childId = pBlock->childId;
  int64_t             ver = pBlock->sourceVer;
  SStreamChildEpInfo* pChildInfo = taosArrayGetP(pTask->childEpInfo, childId);
  pChildInfo->processedVer = ver;
  return 0;
}

85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
  ASSERT(pTask->execType != TASK_EXEC__NONE);

  void* exec = pTask->exec.executor;

  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
        ASSERT(0);
      }
      if (output == NULL) break;

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
      block.info.childId = pTask->selfChildId;
      taosArrayPush(pRes, &block);

      if (++batchCnt >= batchNum) break;
    }
    if (taosArrayGetSize(pRes) == 0) {
      taosArrayDestroy(pRes);
      break;
    }
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
    qRes->childId = pTask->selfChildId;

    if (streamTaskOutput(pTask, qRes) < 0) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      taosFreeQitem(qRes);
      return -1;
    }

    if (pTask->dispatchType != TASK_DISPATCH__NONE) {
      ASSERT(pTask->sinkType == TASK_SINK__NONE);
      streamDispatch(pTask, pTask->pMsgCb);
    }
  }

  return 0;
}

L
Liu Jicong 已提交
142
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
143 144
  int32_t cnt = 0;
  void*   data = NULL;
L
Liu Jicong 已提交
145
  while (1) {
146 147 148 149 150 151 152
    SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
    if (qItem == NULL) {
      qDebug("stream exec over, queue empty");
      break;
    }
    if (data == NULL) {
      data = qItem;
L
Liu Jicong 已提交
153 154 155
      if (qItem->type == STREAM_INPUT__DATA_BLOCK) {
        /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
      }
156 157 158 159
      streamQueueProcessSuccess(pTask->inputQueue);
    } else {
      if (streamAppendQueueItem(data, qItem) < 0) {
        streamQueueProcessFail(pTask->inputQueue);
L
Liu Jicong 已提交
160 161
        break;
      } else {
162
        cnt++;
L
Liu Jicong 已提交
163
        /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
164 165 166
        streamQueueProcessSuccess(pTask->inputQueue);
        taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
        taosFreeQitem(qItem);
L
Liu Jicong 已提交
167 168
      }
    }
169 170 171
  }
  if (pTask->taskStatus == TASK_STATUS__DROPPING) {
    if (data) streamFreeQitem(data);
L
Liu Jicong 已提交
172
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
173 174
    return NULL;
  }
L
Liu Jicong 已提交
175

176
  if (data == NULL) return pRes;
L
Liu Jicong 已提交
177

L
Liu Jicong 已提交
178 179 180 181 182 183
  if (pTask->execType == TASK_EXEC__NONE) {
    ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
    streamTaskOutput(pTask, data);
    return pRes;
  }

184 185 186 187 188 189 190 191 192
  qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
  streamTaskExecImpl(pTask, data, pRes);
  qDebug("stream task %d exec end", pTask->taskId);

  if (taosArrayGetSize(pRes) != 0) {
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
    if (qRes == NULL) {
      streamQueueProcessFail(pTask->inputQueue);
      taosArrayDestroy(pRes);
L
Liu Jicong 已提交
193 194
      return NULL;
    }
195 196 197 198
    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
    if (streamTaskOutput(pTask, qRes) < 0) {
      /*streamQueueProcessFail(pTask->inputQueue);*/
L
Liu Jicong 已提交
199
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
200 201
      taosFreeQitem(qRes);
      return NULL;
L
Liu Jicong 已提交
202
    }
L
Liu Jicong 已提交
203 204 205 206 207
    if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) {
      SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
      qRes->childId = pTask->selfChildId;
      qRes->sourceVer = pSubmit->ver;
    }
208 209
    /*streamQueueProcessSuccess(pTask->inputQueue);*/
    pRes = taosArrayInit(0, sizeof(SSDataBlock));
L
Liu Jicong 已提交
210
  }
211 212

  streamFreeQitem(data);
L
Liu Jicong 已提交
213 214 215 216 217 218 219 220
  return pRes;
}

// TODO: handle version
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
  SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
  if (pRes == NULL) return -1;
  while (1) {
L
Liu Jicong 已提交
221 222 223
    int8_t execStatus =
        atomic_val_compare_exchange_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE, TASK_EXEC_STATUS__EXECUTING);
    if (execStatus == TASK_EXEC_STATUS__IDLE) {
L
Liu Jicong 已提交
224
      // first run
L
Liu Jicong 已提交
225
      qDebug("stream exec, enter exec status");
L
Liu Jicong 已提交
226 227 228 229
      pRes = streamExecForQall(pTask, pRes);
      if (pRes == NULL) goto FAIL;

      // set status closing
L
Liu Jicong 已提交
230
      atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING);
L
Liu Jicong 已提交
231 232

      // second run, make sure inputQ and qall are cleared
L
Liu Jicong 已提交
233
      qDebug("stream exec, enter closing status");
L
Liu Jicong 已提交
234 235 236
      pRes = streamExecForQall(pTask, pRes);
      if (pRes == NULL) goto FAIL;

237
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
238
      atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
L
Liu Jicong 已提交
239
      qDebug("stream exec, return result");
L
Liu Jicong 已提交
240
      return 0;
L
Liu Jicong 已提交
241
    } else if (execStatus == TASK_EXEC_STATUS__CLOSING) {
L
Liu Jicong 已提交
242
      continue;
L
Liu Jicong 已提交
243
    } else if (execStatus == TASK_EXEC_STATUS__EXECUTING) {
L
Liu Jicong 已提交
244
      ASSERT(taosArrayGetSize(pRes) == 0);
245
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
246
      return 0;
L
Liu Jicong 已提交
247 248 249 250 251 252
    } else {
      ASSERT(0);
    }
  }
FAIL:
  if (pRes) taosArrayDestroy(pRes);
253 254 255 256 257 258 259
  if (pTask->taskStatus == TASK_STATUS__DROPPING) {
    tFreeSStreamTask(pTask);
    return 0;
  } else {
    atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
    return -1;
  }
L
Liu Jicong 已提交
260
}