You need to sign in or sign up before continuing.
streamExec.c 6.8 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17 18 19 20 21

static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
  void* exec = pTask->exec.executor;

  // set input
22
  SStreamQueueItem* pItem = (SStreamQueueItem*)data;
L
Liu Jicong 已提交
23
  if (pItem->type == STREAM_INPUT__GET_RES) {
24
    SStreamTrigger* pTrigger = (SStreamTrigger*)data;
L
Liu Jicong 已提交
25
    qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK, false);
26
  } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
27
    ASSERT(pTask->isDataScan);
L
Liu Jicong 已提交
28
    SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
L
Liu Jicong 已提交
29
    qDebug("task %d %p set submit input %p %p %d", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
L
Liu Jicong 已提交
30
    qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT, false);
31
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
32
    SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
33
    SArray*           blocks = pBlock->blocks;
L
Liu Jicong 已提交
34
    qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
L
Liu Jicong 已提交
35
    qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK, false);
L
Liu Jicong 已提交
36 37 38
  } else if (pItem->type == STREAM_INPUT__DROP) {
    // TODO exec drop
    return 0;
L
Liu Jicong 已提交
39 40 41 42 43 44 45 46 47
  }

  // exec
  while (1) {
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
    if (qExecTask(exec, &output, &ts) < 0) {
      ASSERT(false);
    }
48
    if (output == NULL) {
5
54liuyao 已提交
49
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
L
Liu Jicong 已提交
50
        SSDataBlock       block = {0};
5
54liuyao 已提交
51 52
        SStreamDataBlock* pRetrieveBlock = (SStreamDataBlock*)data;
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
L
Liu Jicong 已提交
53
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
54
        block.info.type = STREAM_PULL_OVER;
L
Liu Jicong 已提交
55 56
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
57 58 59
      }
      break;
    }
L
Liu Jicong 已提交
60 61 62 63 64 65 66 67

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
68 69 70 71
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
    taosArrayPush(pRes, &block);
L
Liu Jicong 已提交
72 73 74 75
  }
  return 0;
}

L
Liu Jicong 已提交
76 77 78 79 80 81 82 83 84
static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock* pBlock) {
  ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
  int32_t             childId = pBlock->childId;
  int64_t             ver = pBlock->sourceVer;
  SStreamChildEpInfo* pChildInfo = taosArrayGetP(pTask->childEpInfo, childId);
  pChildInfo->processedVer = ver;
  return 0;
}

L
Liu Jicong 已提交
85
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
86 87
  int32_t cnt = 0;
  void*   data = NULL;
L
Liu Jicong 已提交
88
  while (1) {
89 90 91 92 93 94 95
    SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
    if (qItem == NULL) {
      qDebug("stream exec over, queue empty");
      break;
    }
    if (data == NULL) {
      data = qItem;
L
Liu Jicong 已提交
96 97 98
      if (qItem->type == STREAM_INPUT__DATA_BLOCK) {
        /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
      }
99 100 101 102
      streamQueueProcessSuccess(pTask->inputQueue);
    } else {
      if (streamAppendQueueItem(data, qItem) < 0) {
        streamQueueProcessFail(pTask->inputQueue);
L
Liu Jicong 已提交
103 104
        break;
      } else {
105
        cnt++;
L
Liu Jicong 已提交
106
        /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
107 108 109
        streamQueueProcessSuccess(pTask->inputQueue);
        taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
        taosFreeQitem(qItem);
L
Liu Jicong 已提交
110 111
      }
    }
112 113 114
  }
  if (pTask->taskStatus == TASK_STATUS__DROPPING) {
    if (data) streamFreeQitem(data);
L
Liu Jicong 已提交
115
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
116 117
    return NULL;
  }
L
Liu Jicong 已提交
118

119
  if (data == NULL) return pRes;
L
Liu Jicong 已提交
120

L
Liu Jicong 已提交
121 122 123 124 125 126
  if (pTask->execType == TASK_EXEC__NONE) {
    ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
    streamTaskOutput(pTask, data);
    return pRes;
  }

127 128 129 130 131 132 133 134 135
  qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
  streamTaskExecImpl(pTask, data, pRes);
  qDebug("stream task %d exec end", pTask->taskId);

  if (taosArrayGetSize(pRes) != 0) {
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
    if (qRes == NULL) {
      streamQueueProcessFail(pTask->inputQueue);
      taosArrayDestroy(pRes);
L
Liu Jicong 已提交
136 137
      return NULL;
    }
138 139 140 141
    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
    if (streamTaskOutput(pTask, qRes) < 0) {
      /*streamQueueProcessFail(pTask->inputQueue);*/
L
Liu Jicong 已提交
142
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
143 144
      taosFreeQitem(qRes);
      return NULL;
L
Liu Jicong 已提交
145
    }
L
Liu Jicong 已提交
146 147 148 149 150
    if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) {
      SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
      qRes->childId = pTask->selfChildId;
      qRes->sourceVer = pSubmit->ver;
    }
151 152
    /*streamQueueProcessSuccess(pTask->inputQueue);*/
    pRes = taosArrayInit(0, sizeof(SSDataBlock));
L
Liu Jicong 已提交
153
  }
154 155

  streamFreeQitem(data);
L
Liu Jicong 已提交
156 157 158 159 160 161 162 163
  return pRes;
}

// TODO: handle version
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
  SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
  if (pRes == NULL) return -1;
  while (1) {
L
Liu Jicong 已提交
164 165 166
    int8_t execStatus =
        atomic_val_compare_exchange_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE, TASK_EXEC_STATUS__EXECUTING);
    if (execStatus == TASK_EXEC_STATUS__IDLE) {
L
Liu Jicong 已提交
167
      // first run
L
Liu Jicong 已提交
168
      qDebug("stream exec, enter exec status");
L
Liu Jicong 已提交
169 170 171 172
      pRes = streamExecForQall(pTask, pRes);
      if (pRes == NULL) goto FAIL;

      // set status closing
L
Liu Jicong 已提交
173
      atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING);
L
Liu Jicong 已提交
174 175

      // second run, make sure inputQ and qall are cleared
L
Liu Jicong 已提交
176
      qDebug("stream exec, enter closing status");
L
Liu Jicong 已提交
177 178 179
      pRes = streamExecForQall(pTask, pRes);
      if (pRes == NULL) goto FAIL;

180
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
181
      atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
L
Liu Jicong 已提交
182
      qDebug("stream exec, return result");
L
Liu Jicong 已提交
183
      return 0;
L
Liu Jicong 已提交
184
    } else if (execStatus == TASK_EXEC_STATUS__CLOSING) {
L
Liu Jicong 已提交
185
      continue;
L
Liu Jicong 已提交
186
    } else if (execStatus == TASK_EXEC_STATUS__EXECUTING) {
L
Liu Jicong 已提交
187
      ASSERT(taosArrayGetSize(pRes) == 0);
188
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
189
      return 0;
L
Liu Jicong 已提交
190 191 192 193 194 195
    } else {
      ASSERT(0);
    }
  }
FAIL:
  if (pRes) taosArrayDestroy(pRes);
196 197 198 199 200 201 202
  if (pTask->taskStatus == TASK_STATUS__DROPPING) {
    tFreeSStreamTask(pTask);
    return 0;
  } else {
    atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
    return -1;
  }
L
Liu Jicong 已提交
203
}