tqPush.c 3.8 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
L
Liu Jicong 已提交
15 16

#include "tq.h"
17
#include "vnd.h"
L
Liu Jicong 已提交
18

19 20 21 22 23 24 25 26 27 28 29
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
  int32_t vgId = TD_VID(pTq->pVnode);

  taosWLockLatch(&pTq->lock);

  if (taosHashGetSize(pTq->pPushMgr) > 0) {
    void* pIter = taosHashIterate(pTq->pPushMgr, NULL);

    while (pIter) {
      STqHandle* pHandle = *(STqHandle**)pIter;
      tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
L
Liu Jicong 已提交
30

31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
      if (ASSERT(pHandle->msg != NULL)) {
        tqError("pHandle->msg should not be null");
        break;
      }else{
        SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
        tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
        taosMemoryFree(pHandle->msg);
        pHandle->msg = NULL;
      }

      pIter = taosHashIterate(pTq->pPushMgr, pIter);
    }

    taosHashClear(pTq->pPushMgr);
  }

  // unlock
  taosWUnLockLatch(&pTq->lock);
  return 0;
}

int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
L
Liu Jicong 已提交
53
  if (msgType == TDMT_VND_SUBMIT) {
54
    tqProcessSubmitReqForSubscribe(pTq);
L
Liu Jicong 已提交
55 56
  }

57 58
  int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
  tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
59

60
  // push data for stream processing:
61
  // 1. the vnode has already been restored.
62 63
  // 2. the vnode should be the leader.
  // 3. the stream is not suspended yet.
64
  if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored) {
65
    if (numOfTasks == 0) {
66 67 68
      return 0;
    }

69
    if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE) {
70
      tqStartStreamTasks(pTq);
L
Liu Jicong 已提交
71
    }
L
Liu Jicong 已提交
72 73 74 75
  }

  return 0;
}
76

77
int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
78
  int32_t    vgId = TD_VID(pTq->pVnode);
79 80 81
  STqHandle* pHandle = (STqHandle*)handle;

  if (pHandle->msg == NULL) {
82
    pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg));
wmmhello's avatar
wmmhello 已提交
83 84
    memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
    pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
85
  } else {
86
    tqPushDataRsp(pHandle, vgId);
87
    void* tmp = pHandle->msg->pCont;
wmmhello's avatar
wmmhello 已提交
88 89
    memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
    pHandle->msg->pCont = tmp;
90 91 92 93 94
  }

  memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
  pHandle->msg->contLen = pMsg->contLen;
  int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
95 96
  tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
          pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
97 98 99 100 101 102 103
  return 0;
}

int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
  STqHandle *pHandle = (STqHandle*)handle;
  int32_t    vgId = TD_VID(pTq->pVnode);

104 105 106
  if(taosHashGetSize(pTq->pPushMgr) <= 0) {
    return 0;
  }
107
  int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
108
  tqDebug("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
109

110
  if(pHandle->msg != NULL) {
H
Haojun Liao 已提交
111
    tqPushDataRsp(pHandle, vgId);
112 113 114 115 116

    rpcFreeCont(pHandle->msg->pCont);
    taosMemoryFree(pHandle->msg);
    pHandle->msg = NULL;
  }
117

118 119
  return 0;
}