tqPush.c 2.9 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
L
Liu Jicong 已提交
15 16

#include "tq.h"
17
#include "vnd.h"
L
Liu Jicong 已提交
18

19
int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
L
Liu Jicong 已提交
20

L
Liu Jicong 已提交
21
  if (msgType == TDMT_VND_SUBMIT) {
22
    tqProcessSubmitReqForSubscribe(pTq);
L
Liu Jicong 已提交
23 24
  }

25 26
  int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
  tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
27

28
  // push data for stream processing:
29
  // 1. the vnode has already been restored.
30 31
  // 2. the vnode should be the leader.
  // 3. the stream is not suspended yet.
32
  if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored) {
33
    if (numOfTasks == 0) {
34 35 36
      return 0;
    }

L
Liu Jicong 已提交
37
    if (msgType == TDMT_VND_SUBMIT) {
38
      tqStartStreamTasks(pTq);
L
Liu Jicong 已提交
39
    }
H
Haojun Liao 已提交
40

L
Liu Jicong 已提交
41 42
    if (msgType == TDMT_VND_DELETE) {
      tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver);
L
Liu Jicong 已提交
43 44 45 46 47
    }
  }

  return 0;
}
48

49
int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
50
  int32_t    vgId = TD_VID(pTq->pVnode);
51 52 53
  STqHandle* pHandle = (STqHandle*)handle;

  if (pHandle->msg == NULL) {
54
    pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg));
wmmhello's avatar
wmmhello 已提交
55 56
    memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
    pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
57 58
  } else {
    void* tmp = pHandle->msg->pCont;
wmmhello's avatar
wmmhello 已提交
59 60
    memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
    pHandle->msg->pCont = tmp;
61 62 63 64 65
  }

  memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
  pHandle->msg->contLen = pMsg->contLen;
  int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
66 67
  tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
          pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
68 69 70 71 72 73 74 75 76
  return 0;
}

int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
  STqHandle *pHandle = (STqHandle*)handle;
  int32_t    vgId = TD_VID(pTq->pVnode);

  int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
  tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
77

78 79 80 81 82 83 84
  if(pHandle->msg != NULL) {
    tqPushDataRsp(pTq, pHandle);

    rpcFreeCont(pHandle->msg->pCont);
    taosMemoryFree(pHandle->msg);
    pHandle->msg = NULL;
  }
85

86 87
  return 0;
}