streamDispatch.c 7.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInc.h"
L
Liu Jicong 已提交
17 18 19 20 21 22 23 24

int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->sourceChildId) < 0) return -1;
L
Liu Jicong 已提交
25
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
L
Liu Jicong 已提交
26 27 28 29 30 31 32 33 34 35
  if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
  ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
  ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
  for (int32_t i = 0; i < pReq->blockNum; i++) {
    int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
    void*   data = taosArrayGetP(pReq->data, i);
    if (tEncodeI32(pEncoder, len) < 0) return -1;
    if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
  }
  tEndEncode(pEncoder);
36
  return pEncoder->pos;
L
Liu Jicong 已提交
37 38 39 40 41 42 43 44 45
}

int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->sourceChildId) < 0) return -1;
L
Liu Jicong 已提交
46
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
L
Liu Jicong 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
  if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
  ASSERT(pReq->blockNum > 0);
  pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
  pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t));
  for (int32_t i = 0; i < pReq->blockNum; i++) {
    int32_t  len1;
    uint64_t len2;
    void*    data;
    if (tDecodeI32(pDecoder, &len1) < 0) return -1;
    if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1;
    ASSERT(len1 == len2);
    taosArrayPush(pReq->dataLen, &len1);
    taosArrayPush(pReq->data, &data);
  }
  tEndDecode(pDecoder);
  return 0;
}

65 66 67 68 69 70 71 72 73 74
static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
  int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
  void*   buf = taosMemoryCalloc(1, dataStrLen);
  if (buf == NULL) return -1;

  SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
  pRetrieve->useconds = 0;
  pRetrieve->precision = TSDB_DEFAULT_PRECISION;
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
L
Liu Jicong 已提交
75
  pRetrieve->streamBlockType = pBlock->info.type;
76
  pRetrieve->numOfRows = htonl(pBlock->info.rows);
L
Liu Jicong 已提交
77
  pRetrieve->numOfCols = htonl(pBlock->info.numOfCols);
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94

  int32_t actualLen = 0;
  blockCompressEncode(pBlock, pRetrieve->data, &actualLen, pBlock->info.numOfCols, false);
  actualLen += sizeof(SRetrieveTableRsp);
  ASSERT(actualLen <= dataStrLen);
  taosArrayPush(pReq->dataLen, &actualLen);
  taosArrayPush(pReq->data, &buf);

  return 0;
}

int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
  void*   buf = NULL;
  int32_t code = -1;
  int32_t blockNum = taosArrayGetSize(data->blocks);
  ASSERT(blockNum != 0);

L
Liu Jicong 已提交
95 96
  SStreamDispatchReq req = {
      .streamId = pTask->streamId,
97 98 99
      .sourceTaskId = pTask->taskId,
      .sourceVg = data->sourceVg,
      .sourceChildId = pTask->childId,
L
Liu Jicong 已提交
100
      .upstreamNodeId = pTask->nodeId,
101
      .blockNum = blockNum,
L
Liu Jicong 已提交
102
  };
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122

  req.data = taosArrayInit(blockNum, sizeof(void*));
  req.dataLen = taosArrayInit(blockNum, sizeof(int32_t));
  if (req.data == NULL || req.dataLen == NULL) {
    goto FAIL;
  }
  for (int32_t i = 0; i < blockNum; i++) {
    SSDataBlock* pDataBlock = taosArrayGet(data->blocks, i);
    if (streamAddBlockToDispatchMsg(pDataBlock, &req) < 0) {
      goto FAIL;
    }
  }
  int32_t vgId = 0;
  int32_t downstreamTaskId = 0;
  // find ep
  if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
    vgId = pTask->fixedEpDispatcher.nodeId;
    *ppEpSet = &pTask->fixedEpDispatcher.epSet;
    downstreamTaskId = pTask->fixedEpDispatcher.taskId;
  } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
123
    // TODO get ctbName for each block
124
    SSDataBlock* pBlock = taosArrayGet(data->blocks, 0);
125
    char*        ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId);
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
    // TODO: get hash function by hashMethod

    // get groupId, compute hash value
    uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));

    // get node
    // TODO: optimize search process
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
    int32_t sz = taosArrayGetSize(vgInfo);
    for (int32_t i = 0; i < sz; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
        vgId = pVgInfo->vgId;
        downstreamTaskId = pVgInfo->taskId;
        *ppEpSet = &pVgInfo->epSet;
        break;
      }
    }
  }

146
  ASSERT(vgId != 0);
147 148
  req.taskId = downstreamTaskId;

149 150 151
  qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->childId,
        downstreamTaskId, vgId);

152 153 154 155
  // serialize
  int32_t tlen;
  tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code);
  if (code < 0) goto FAIL;
156
  code = -1;
157 158 159 160 161
  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    goto FAIL;
  }

162
  ((SMsgHead*)buf)->vgId = htonl(vgId);
163 164 165 166 167 168 169 170 171 172 173
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
  if ((code = tEncodeStreamDispatchReq(&encoder, &req)) < 0) {
    goto FAIL;
  }
  tEncoderClear(&encoder);

  pMsg->contLen = tlen + sizeof(SMsgHead);
  pMsg->pCont = buf;
174
  pMsg->msgType = pTask->dispatchMsgType;
175 176 177

  code = 0;
FAIL:
178
  if (code < 0 && buf) rpcFreeCont(buf);
179 180 181 182 183
  if (req.data) taosArrayDestroyP(req.data, (FDelete)taosMemoryFree);
  if (req.dataLen) taosArrayDestroy(req.dataLen);
  return code;
}

L
Liu Jicong 已提交
184 185 186 187 188 189
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
#if 1
  int8_t old =
      atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
  if (old != TASK_OUTPUT_STATUS__NORMAL) {
    return 0;
L
Liu Jicong 已提交
190
  }
L
Liu Jicong 已提交
191
#endif
L
Liu Jicong 已提交
192

L
Liu Jicong 已提交
193
  SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
L
Liu Jicong 已提交
194 195 196 197
  if (pBlock == NULL) {
    atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
    return 0;
  }
L
Liu Jicong 已提交
198
  ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK);
L
Liu Jicong 已提交
199

L
Liu Jicong 已提交
200 201 202 203
  SRpcMsg dispatchMsg = {0};
  SEpSet* pEpSet = NULL;
  if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) {
    ASSERT(0);
L
Liu Jicong 已提交
204
    atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
L
Liu Jicong 已提交
205
    return -1;
L
Liu Jicong 已提交
206 207
  }

L
Liu Jicong 已提交
208
  tmsgSendReq(pEpSet, &dispatchMsg);
L
Liu Jicong 已提交
209 210
  return 0;
}