stream.c 4.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "streamInc.h"

int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) {
  int8_t execStatus = atomic_load_8(&pTask->status);
  if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
    SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
    if (pRunReq == NULL) return -1;

    // TODO: do we need htonl?
    pRunReq->head.vgId = vgId;
    pRunReq->streamId = pTask->streamId;
    pRunReq->taskId = pTask->taskId;
    SRpcMsg msg = {
29
        .msgType = TDMT_STREAM_TASK_RUN,
L
Liu Jicong 已提交
30 31 32 33 34 35 36 37 38
        .pCont = pRunReq,
        .contLen = sizeof(SStreamTaskRunReq),
    };
    tmsgPutToQueue(pMsgCb, FETCH_QUEUE, &msg);
  }
  return 0;
}

int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
L
Liu Jicong 已提交
39
  SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
L
Liu Jicong 已提交
40 41 42
  int8_t            status;

  // enqueue
L
Liu Jicong 已提交
43 44 45 46 47
  if (pData != NULL) {
    pData->type = STREAM_DATA_TYPE_SSDATA_BLOCK;
    pData->sourceVg = pReq->sourceVg;
    // decode
    /*pData->blocks = pReq->data;*/
L
Liu Jicong 已提交
48
    /*pBlock->sourceVer = pReq->sourceVer;*/
L
Liu Jicong 已提交
49 50
    streamDispatchReqToData(pReq, pData);
    if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) {
L
Liu Jicong 已提交
51 52 53 54 55 56 57 58 59 60
      status = TASK_INPUT_STATUS__NORMAL;
    } else {
      status = TASK_INPUT_STATUS__FAILED;
    }
  } else {
    streamTaskInputFail(pTask);
    status = TASK_INPUT_STATUS__FAILED;
  }

  // rsp by input status
61
  void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
L
Liu Jicong 已提交
62
  ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
63
  SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
L
Liu Jicong 已提交
64 65 66
  pCont->inputStatus = status;
  pCont->streamId = pReq->streamId;
  pCont->taskId = pReq->sourceTaskId;
67 68
  pRsp->pCont = buf;
  pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
L
Liu Jicong 已提交
69 70 71 72 73 74 75 76 77 78 79 80
  tmsgSendRsp(pRsp);
  return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}

int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
  // 1. handle input
  streamTaskEnqueue(pTask, pReq, pRsp);

  // 2. try exec
  // 2.1. idle: exec
  // 2.2. executing: return
  // 2.3. closing: keep trying
81 82 83 84 85 86 87 88 89 90 91 92
  if (pTask->execType != TASK_EXEC__NONE) {
    streamExec(pTask, pMsgCb);
  } else {
    ASSERT(pTask->sinkType != TASK_SINK__NONE);
    while (1) {
      void* data = streamQueueNextItem(pTask->inputQueue);
      if (data == NULL) return 0;
      if (streamTaskOutput(pTask, data) < 0) {
        ASSERT(0);
      }
    }
  }
L
Liu Jicong 已提交
93 94 95 96

  // 3. handle output
  // 3.1 check and set status
  // 3.2 dispatch / sink
97
  if (pTask->dispatchType != TASK_DISPATCH__NONE) {
L
Liu Jicong 已提交
98
    streamDispatch(pTask, pMsgCb);
99
  }
L
Liu Jicong 已提交
100 101 102 103 104

  return 0;
}

int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) {
105 106 107
  ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
  int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
  ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
L
Liu Jicong 已提交
108 109
  if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
    // TODO: init recover timer
110
    return 0;
L
Liu Jicong 已提交
111 112
  }
  // continue dispatch
113
  if (pTask->dispatchType != TASK_DISPATCH__NONE) {
L
Liu Jicong 已提交
114
    streamDispatch(pTask, pMsgCb);
115
  }
L
Liu Jicong 已提交
116 117 118 119 120
  return 0;
}

int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) {
  streamExec(pTask, pMsgCb);
121
  if (pTask->dispatchType != TASK_DISPATCH__NONE) {
L
Liu Jicong 已提交
122
    streamDispatch(pTask, pMsgCb);
123
  }
L
Liu Jicong 已提交
124 125 126 127 128 129 130 131 132 133 134 135
  return 0;
}

int32_t streamProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) {
  //
  return 0;
}

int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) {
  //
  return 0;
}