tqPush.c 3.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
L
Liu Jicong 已提交
15 16

#include "tq.h"
17
#include "vnd.h"
L
Liu Jicong 已提交
18

19
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
wmmhello's avatar
wmmhello 已提交
20 21
  if (taosHashGetSize(pTq->pPushMgr) <= 0) {
    return 0;
22
  }
wmmhello's avatar
wmmhello 已提交
23 24 25 26 27 28 29
  SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME_PUSH};
  msg.pCont = rpcMallocCont(sizeof(SMsgHead));
  msg.contLen = sizeof(SMsgHead);
  SMsgHead *pHead = msg.pCont;
  pHead->vgId = TD_VID(pTq->pVnode);
  pHead->contLen = msg.contLen;
  tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
30 31 32 33
  return 0;
}

int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
L
Liu Jicong 已提交
34
  if (msgType == TDMT_VND_SUBMIT) {
35
    tqProcessSubmitReqForSubscribe(pTq);
L
Liu Jicong 已提交
36 37
  }

38
  taosRLockLatch(&pTq->pStreamMeta->lock);
39
  int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
40 41
  taosRUnLockLatch(&pTq->pStreamMeta->lock);

42
  tqTrace("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
43

44
  // push data for stream processing:
45
  // 1. the vnode has already been restored.
46 47
  // 2. the vnode should be the leader.
  // 3. the stream is not suspended yet.
48
  if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored) {
49
    if (numOfTasks == 0) {
50 51 52
      return 0;
    }

53
    if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE) {
54
      tqStartStreamTasks(pTq);
L
Liu Jicong 已提交
55
    }
L
Liu Jicong 已提交
56 57 58 59
  }

  return 0;
}
60

61
int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
62
  int32_t    vgId = TD_VID(pTq->pVnode);
63 64 65
  STqHandle* pHandle = (STqHandle*)handle;

  if (pHandle->msg == NULL) {
66
    pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg));
wmmhello's avatar
wmmhello 已提交
67 68
    memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
    pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
69
  } else {
70 71 72
//    tqPushDataRsp(pHandle, vgId);
    tqPushEmptyDataRsp(pHandle, vgId);

73
    void* tmp = pHandle->msg->pCont;
wmmhello's avatar
wmmhello 已提交
74 75
    memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
    pHandle->msg->pCont = tmp;
76 77 78 79 80
  }

  memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
  pHandle->msg->contLen = pMsg->contLen;
  int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
81
  tqInfo("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
82
          pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
83 84 85
  return 0;
}

wmmhello's avatar
wmmhello 已提交
86
int tqUnregisterPushHandle(STQ* pTq, void *handle) {
87 88 89
  STqHandle *pHandle = (STqHandle*)handle;
  int32_t    vgId = TD_VID(pTq->pVnode);

90 91 92
  if(taosHashGetSize(pTq->pPushMgr) <= 0) {
    return 0;
  }
93
  int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
wmmhello's avatar
wmmhello 已提交
94
  tqInfo("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
95

96
  if(pHandle->msg != NULL) {
97 98
//    tqPushDataRsp(pHandle, vgId);
    tqPushEmptyDataRsp(pHandle, vgId);
99 100 101 102 103

    rpcFreeCont(pHandle->msg->pCont);
    taosMemoryFree(pHandle->msg);
    pHandle->msg = NULL;
  }
104

105 106
  return 0;
}