streamExec.c 5.8 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17 18 19

static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
  void* exec = pTask->exec.executor;
20
  bool  hasData = false;
L
Liu Jicong 已提交
21 22

  // set input
23 24 25 26 27
  SStreamQueueItem* pItem = (SStreamQueueItem*)data;
  if (pItem->type == STREAM_INPUT__TRIGGER) {
    SStreamTrigger* pTrigger = (SStreamTrigger*)data;
    qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_DATA_TYPE_SSDATA_BLOCK, false);
  } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
28
    ASSERT(pTask->isDataScan);
L
Liu Jicong 已提交
29 30
    SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
    qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
31
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
32
    SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
33
    SArray*           blocks = pBlock->blocks;
L
Liu Jicong 已提交
34
    qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false);
L
Liu Jicong 已提交
35 36 37
  } else if (pItem->type == STREAM_INPUT__DROP) {
    // TODO exec drop
    return 0;
L
Liu Jicong 已提交
38 39 40 41 42 43 44 45 46
  }

  // exec
  while (1) {
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
    if (qExecTask(exec, &output, &ts) < 0) {
      ASSERT(false);
    }
47
    if (output == NULL) {
5
54liuyao 已提交
48
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
49 50 51
        SSDataBlock block = {0};
        /*block.info.type = STREAM_PUSH_EMPTY;*/
        // block.info.childId = pTask->selfChildId;
5
54liuyao 已提交
52 53
        SStreamDataBlock* pRetrieveBlock = (SStreamDataBlock*)data;
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
L
Liu Jicong 已提交
54 55 56 57 58
        /*SSDataBlock* pBlock = createOneDataBlock(taosArrayGet(pRetrieveBlock->blocks, 0), true);*/
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
        block.info.type = STREAM_PUSH_EMPTY;
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
59 60 61 62
      }
      break;
    }
    hasData = true;
L
Liu Jicong 已提交
63 64 65 66 67 68 69 70

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
71
    // TODO: do we need free memory?
L
Liu Jicong 已提交
72 73 74 75 76 77 78
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
    taosArrayPush(pRes, &block);
    /*SSDataBlock* outputCopy = createOneDataBlock(output, true);*/
    /*outputCopy->info.childId = pTask->selfChildId;*/
    /*taosArrayPush(pRes, outputCopy);*/
L
Liu Jicong 已提交
79 80 81 82 83 84 85 86 87 88 89
  }
  return 0;
}

static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
  while (1) {
    void* data = streamQueueNextItem(pTask->inputQueue);
    if (data == NULL) break;

    streamTaskExecImpl(pTask, data, pRes);

L
Liu Jicong 已提交
90
    if (pTask->taskStatus == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
91
      taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
L
Liu Jicong 已提交
92 93 94
      return NULL;
    }

L
Liu Jicong 已提交
95 96 97 98 99 100 101 102 103 104 105
    if (taosArrayGetSize(pRes) != 0) {
      SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
      if (qRes == NULL) {
        streamQueueProcessFail(pTask->inputQueue);
        taosArrayDestroy(pRes);
        return NULL;
      }
      qRes->type = STREAM_INPUT__DATA_BLOCK;
      qRes->blocks = pRes;
      if (streamTaskOutput(pTask, qRes) < 0) {
        streamQueueProcessFail(pTask->inputQueue);
L
Liu Jicong 已提交
106
        taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
L
Liu Jicong 已提交
107 108 109 110
        taosFreeQitem(qRes);
        return NULL;
      }
      streamQueueProcessSuccess(pTask->inputQueue);
L
Liu Jicong 已提交
111 112 113 114 115 116 117
      pRes = taosArrayInit(0, sizeof(SSDataBlock));
    }

    int8_t type = ((SStreamQueueItem*)data)->type;
    if (type == STREAM_INPUT__TRIGGER) {
      blockDataDestroy(((SStreamTrigger*)data)->pBlock);
      taosFreeQitem(data);
118
    } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
119 120 121 122 123 124
      taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
      taosFreeQitem(data);
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
      ASSERT(pTask->isDataScan);
      streamDataSubmitRefDec((SStreamDataSubmit*)data);
      taosFreeQitem(data);
L
Liu Jicong 已提交
125 126 127 128 129 130 131 132 133 134
    }
  }
  return pRes;
}

// TODO: handle version
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
  SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
  if (pRes == NULL) return -1;
  while (1) {
L
Liu Jicong 已提交
135 136 137
    int8_t execStatus =
        atomic_val_compare_exchange_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE, TASK_EXEC_STATUS__EXECUTING);
    if (execStatus == TASK_EXEC_STATUS__IDLE) {
L
Liu Jicong 已提交
138 139 140 141 142
      // first run
      pRes = streamExecForQall(pTask, pRes);
      if (pRes == NULL) goto FAIL;

      // set status closing
L
Liu Jicong 已提交
143
      atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING);
L
Liu Jicong 已提交
144 145 146 147 148

      // second run, make sure inputQ and qall are cleared
      pRes = streamExecForQall(pTask, pRes);
      if (pRes == NULL) goto FAIL;

L
Liu Jicong 已提交
149
      taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
L
Liu Jicong 已提交
150
      atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
L
Liu Jicong 已提交
151
      return 0;
L
Liu Jicong 已提交
152
    } else if (execStatus == TASK_EXEC_STATUS__CLOSING) {
L
Liu Jicong 已提交
153
      continue;
L
Liu Jicong 已提交
154
    } else if (execStatus == TASK_EXEC_STATUS__EXECUTING) {
L
Liu Jicong 已提交
155
      ASSERT(taosArrayGetSize(pRes) == 0);
L
Liu Jicong 已提交
156
      taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
L
Liu Jicong 已提交
157
      return 0;
L
Liu Jicong 已提交
158 159 160 161 162 163
    } else {
      ASSERT(0);
    }
  }
FAIL:
  if (pRes) taosArrayDestroy(pRes);
L
Liu Jicong 已提交
164
  atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
L
Liu Jicong 已提交
165 166 167
  return -1;
}