streamDispatch.c 18.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInc.h"
L
Liu Jicong 已提交
17 18 19 20 21

int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
L
Liu Jicong 已提交
22 23 24
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
L
Liu Jicong 已提交
25
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
L
Liu Jicong 已提交
26
  if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
27
  if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
L
Liu Jicong 已提交
28 29 30 31 32 33 34 35 36
  ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
  ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
  for (int32_t i = 0; i < pReq->blockNum; i++) {
    int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
    void*   data = taosArrayGetP(pReq->data, i);
    if (tEncodeI32(pEncoder, len) < 0) return -1;
    if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
  }
  tEndEncode(pEncoder);
37
  return pEncoder->pos;
L
Liu Jicong 已提交
38 39 40 41 42 43
}

int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
L
Liu Jicong 已提交
44 45 46
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->dataSrcVgId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
L
Liu Jicong 已提交
47
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
L
Liu Jicong 已提交
48
  if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
49 50
  if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1;

L
Liu Jicong 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
  ASSERT(pReq->blockNum > 0);
  pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
  pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t));
  for (int32_t i = 0; i < pReq->blockNum; i++) {
    int32_t  len1;
    uint64_t len2;
    void*    data;
    if (tDecodeI32(pDecoder, &len1) < 0) return -1;
    if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1;
    ASSERT(len1 == len2);
    taosArrayPush(pReq->dataLen, &len1);
    taosArrayPush(pReq->data, &data);
  }
  tEndDecode(pDecoder);
  return 0;
}

L
Liu Jicong 已提交
68
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq) {
L
Liu Jicong 已提交
69 70 71 72
  taosArrayDestroyP(pReq->data, taosMemoryFree);
  taosArrayDestroy(pReq->dataLen);
}

L
Liu Jicong 已提交
73 74 75
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
76
  if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
77 78 79 80
  if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->srcTaskId) < 0) return -1;
L
Liu Jicong 已提交
81
  if (tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen) < 0) return -1;
L
Liu Jicong 已提交
82 83 84 85 86 87 88
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
89
  if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
90 91 92 93
  if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->srcTaskId) < 0) return -1;
L
Liu Jicong 已提交
94 95
  uint64_t len = 0;
  if (tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len) < 0) return -1;
96
  pReq->retrieveLen = (int32_t)len;
L
Liu Jicong 已提交
97 98 99 100
  tEndDecode(pDecoder);
  return 0;
}

L
Liu Jicong 已提交
101 102
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }

L
Liu Jicong 已提交
103
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) {
L
Liu Jicong 已提交
104
  int32_t            code = -1;
L
Liu Jicong 已提交
105 106 107 108 109 110 111
  SRetrieveTableRsp* pRetrieve = NULL;
  void*              buf = NULL;
  int32_t            dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);

  pRetrieve = taosMemoryCalloc(1, dataStrLen);
  if (pRetrieve == NULL) return -1;

H
Haojun Liao 已提交
112
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
L
Liu Jicong 已提交
113 114 115 116 117
  pRetrieve->useconds = 0;
  pRetrieve->precision = TSDB_DEFAULT_PRECISION;
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
  pRetrieve->streamBlockType = pBlock->info.type;
118
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
H
Haojun Liao 已提交
119
  pRetrieve->numOfCols = htonl(numOfCols);
120 121
  pRetrieve->skey = htobe64(pBlock->info.window.skey);
  pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
122
  pRetrieve->version = htobe64(pBlock->info.version);
L
Liu Jicong 已提交
123

H
Haojun Liao 已提交
124
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
L
Liu Jicong 已提交
125 126

  SStreamRetrieveReq req = {
127
      .streamId = pTask->id.streamId,
L
Liu Jicong 已提交
128
      .srcNodeId = pTask->nodeId,
129
      .srcTaskId = pTask->id.taskId,
L
Liu Jicong 已提交
130
      .pRetrieve = pRetrieve,
5
54liuyao 已提交
131
      .retrieveLen = dataStrLen,
L
Liu Jicong 已提交
132 133 134 135 136
  };

  int32_t sz = taosArrayGetSize(pTask->childEpInfo);
  ASSERT(sz > 0);
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
137
    req.reqId = tGenIdPI64();
L
Liu Jicong 已提交
138 139 140 141 142 143 144 145 146 147 148 149
    SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i);
    req.dstNodeId = pEpInfo->nodeId;
    req.dstTaskId = pEpInfo->taskId;
    int32_t len;
    tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code);
    if (code < 0) {
      ASSERT(0);
      return -1;
    }

    buf = rpcMallocCont(sizeof(SMsgHead) + len);
    if (buf == NULL) {
L
Liu Jicong 已提交
150
      goto CLEAR;
L
Liu Jicong 已提交
151 152 153 154 155 156 157
    }

    ((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId);
    void*    abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
    SEncoder encoder;
    tEncoderInit(&encoder, abuf, len);
    tEncodeStreamRetrieveReq(&encoder, &req);
L
Liu Jicong 已提交
158
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
159

dengyihao's avatar
dengyihao 已提交
160
    SRpcMsg rpcMsg = {.code = 0, .msgType = TDMT_STREAM_RETRIEVE, .pCont = buf, .contLen = sizeof(SMsgHead) + len};
L
Liu Jicong 已提交
161 162
    if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) {
      ASSERT(0);
L
Liu Jicong 已提交
163
      goto CLEAR;
L
Liu Jicong 已提交
164
    }
L
Liu Jicong 已提交
165

H
Haojun Liao 已提交
166 167
    buf = NULL;
    qDebug("s-task:%s (child %d) send retrieve req to task %d at node %d, reqId:0x%" PRIx64, pTask->id.idStr,
S
Shengliang Guan 已提交
168
           pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
L
Liu Jicong 已提交
169
  }
L
Liu Jicong 已提交
170
  code = 0;
H
Haojun Liao 已提交
171

L
Liu Jicong 已提交
172 173 174 175
CLEAR:
  taosMemoryFree(pRetrieve);
  rpcFreeCont(buf);
  return code;
L
Liu Jicong 已提交
176 177
}

178
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
179 180 181 182 183 184 185 186 187
  int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
  void*   buf = taosMemoryCalloc(1, dataStrLen);
  if (buf == NULL) return -1;

  SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
  pRetrieve->useconds = 0;
  pRetrieve->precision = TSDB_DEFAULT_PRECISION;
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
L
Liu Jicong 已提交
188
  pRetrieve->streamBlockType = pBlock->info.type;
189
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
190 191
  pRetrieve->skey = htobe64(pBlock->info.window.skey);
  pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
192
  pRetrieve->version = htobe64(pBlock->info.version);
193
  pRetrieve->watermark = htobe64(pBlock->info.watermark);
194
  memcpy(pRetrieve->parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
195

L
Liu Jicong 已提交
196
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
197
  pRetrieve->numOfCols = htonl(numOfCols);
198

H
Haojun Liao 已提交
199
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
200 201 202 203 204
  actualLen += sizeof(SRetrieveTableRsp);
  ASSERT(actualLen <= dataStrLen);
  taosArrayPush(pReq->dataLen, &actualLen);
  taosArrayPush(pReq->data, &buf);

205
  pReq->totalLen += dataStrLen;
206 207 208
  return 0;
}

209
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) {
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
  tEncodeSize(tEncodeSStreamTaskCheckReq, pReq, tlen, code);
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(nodeId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
  if ((code = tEncodeSStreamTaskCheckReq(&encoder, pReq)) < 0) {
231 232
    rpcFreeCont(buf);
    return code;
233
  }
234

235 236 237 238 239 240
  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
  msg.msgType = TDMT_STREAM_TASK_CHECK;

241
  qDebug("s-task:%s dispatch check msg to downstream s-task:%" PRIx64 ":%d node %d: check msg", pTask->id.idStr,
242
         pReq->streamId, pReq->downstreamTaskId, nodeId);
243 244 245 246 247

  tmsgSendReq(pEpSet, &msg);
  return 0;
}

248 249 250 251 252 253 254 255 256 257 258 259 260 261
int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
                                          SEpSet* pEpSet) {
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
  tEncodeSize(tEncodeSStreamRecoverFinishReq, pReq, tlen, code);
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
H
Haojun Liao 已提交
262
    terrno = TSDB_CODE_OUT_OF_MEMORY;
263 264 265 266 267 268 269 270 271
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
  if ((code = tEncodeSStreamRecoverFinishReq(&encoder, pReq)) < 0) {
H
Haojun Liao 已提交
272 273 274 275
    if (buf) {
      rpcFreeCont(buf);
    }
    return code;
276
  }
H
Haojun Liao 已提交
277

278 279 280 281
  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
L
Liu Jicong 已提交
282
  msg.msgType = TDMT_STREAM_RECOVER_FINISH;
dengyihao's avatar
dengyihao 已提交
283
  msg.info.noResp = 1;
284 285

  tmsgSendReq(pEpSet, &msg);
286
  qDebug("s-task:%s dispatch recover finish msg to downstream taskId:0x%x node %d: recover finish msg", pTask->id.idStr,
H
Haojun Liao 已提交
287
         pReq->taskId, vgId);
288

289 290 291
  return 0;
}

292
int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
L
Liu Jicong 已提交
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  // serialize
  int32_t tlen;
  tEncodeSize(tEncodeStreamDispatchReq, pReq, tlen, code);
  if (code < 0) goto FAIL;
  code = -1;
  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    goto FAIL;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
  if ((code = tEncodeStreamDispatchReq(&encoder, pReq)) < 0) {
    goto FAIL;
  }
  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
  msg.msgType = pTask->dispatchMsgType;

321
  qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId);
L
Liu Jicong 已提交
322 323 324 325
  tmsgSendReq(pEpSet, &msg);

  code = 0;
  return 0;
326

327 328 329
FAIL:
  if (buf) rpcFreeCont(buf);
  return code;
L
Liu Jicong 已提交
330 331
}

L
Liu Jicong 已提交
332 333
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
                                int64_t groupId) {
L
Liu Jicong 已提交
334 335 336 337 338
  char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
  if (ctbName == NULL) {
    return -1;
  }

339 340 341
  if (pDataBlock->info.parTbName[0]) {
    snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
  } else {
L
Liu Jicong 已提交
342 343 344
    char* ctbShortName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId);
    snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, ctbShortName);
    taosMemoryFree(ctbShortName);
345
  }
L
Liu Jicong 已提交
346

L
Liu Jicong 已提交
347 348
  SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;

349 350 351 352
  /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
  SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo;
  uint32_t   hashValue =
      taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
L
Liu Jicong 已提交
353
  taosMemoryFree(ctbName);
354

L
Liu Jicong 已提交
355 356 357 358 359 360 361
  bool found = false;
  // TODO: optimize search
  int32_t j;
  for (j = 0; j < vgSz; j++) {
    SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
    ASSERT(pVgInfo->vgId > 0);
    if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
362
      if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
L
Liu Jicong 已提交
363 364 365 366 367 368 369 370 371 372 373 374 375 376
        return -1;
      }
      if (pReqs[j].blockNum == 0) {
        atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
      }
      pReqs[j].blockNum++;
      found = true;
      break;
    }
  }
  ASSERT(found);
  return 0;
}

L
Liu Jicong 已提交
377
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
378 379 380
  int32_t code = 0;
  int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
  ASSERT(numOfBlocks != 0);
L
Liu Jicong 已提交
381

382
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
L
Liu Jicong 已提交
383
    SStreamDispatchReq req = {
384
        .streamId = pTask->id.streamId,
L
Liu Jicong 已提交
385
        .dataSrcVgId = pData->srcVgId,
386
        .upstreamTaskId = pTask->id.taskId,
L
Liu Jicong 已提交
387 388
        .upstreamChildId = pTask->selfChildId,
        .upstreamNodeId = pTask->nodeId,
389
        .blockNum = numOfBlocks,
L
Liu Jicong 已提交
390 391
    };

392 393
    req.data = taosArrayInit(numOfBlocks, sizeof(void*));
    req.dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t));
L
Liu Jicong 已提交
394
    if (req.data == NULL || req.dataLen == NULL) {
395 396
      taosArrayDestroyP(req.data, taosMemoryFree);
      taosArrayDestroy(req.dataLen);
H
Haojun Liao 已提交
397
      return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
398 399
    }

400
    for (int32_t i = 0; i < numOfBlocks; i++) {
L
Liu Jicong 已提交
401
      SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
H
Haojun Liao 已提交
402 403 404
      code = streamAddBlockIntoDispatchMsg(pDataBlock, &req);

      if (code != TSDB_CODE_SUCCESS) {
405 406 407
        taosArrayDestroyP(req.data, taosMemoryFree);
        taosArrayDestroy(req.dataLen);
        return code;
L
Liu Jicong 已提交
408 409
      }
    }
410

L
Liu Jicong 已提交
411 412 413 414 415 416
    int32_t vgId = pTask->fixedEpDispatcher.nodeId;
    SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
    int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;

    req.taskId = downstreamTaskId;

417
    qDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to down stream s-task:0x%x in vgId:%d", pTask->id.idStr,
418
           pTask->selfChildId, numOfBlocks, downstreamTaskId, vgId);
L
Liu Jicong 已提交
419

H
Haojun Liao 已提交
420 421 422 423
    code = doSendDispatchMsg(pTask, &req, vgId, pEpSet);
    taosArrayDestroyP(req.data, taosMemoryFree);
    taosArrayDestroy(req.dataLen);
    return code;
424
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
425 426 427 428
    int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
    ASSERT(rspCnt == 0);

    SArray*             vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
L
Liu Jicong 已提交
429 430 431
    int32_t             vgSz = taosArrayGetSize(vgInfo);
    SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq));
    if (pReqs == NULL) {
H
Haojun Liao 已提交
432
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
433 434 435 436
      return -1;
    }

    for (int32_t i = 0; i < vgSz; i++) {
437
      pReqs[i].streamId = pTask->id.streamId;
L
Liu Jicong 已提交
438
      pReqs[i].dataSrcVgId = pData->srcVgId;
439
      pReqs[i].upstreamTaskId = pTask->id.taskId;
L
Liu Jicong 已提交
440 441 442 443 444 445 446 447
      pReqs[i].upstreamChildId = pTask->selfChildId;
      pReqs[i].upstreamNodeId = pTask->nodeId;
      pReqs[i].blockNum = 0;
      pReqs[i].data = taosArrayInit(0, sizeof(void*));
      pReqs[i].dataLen = taosArrayInit(0, sizeof(int32_t));
      if (pReqs[i].data == NULL || pReqs[i].dataLen == NULL) {
        goto FAIL_SHUFFLE_DISPATCH;
      }
H
Haojun Liao 已提交
448

L
Liu Jicong 已提交
449 450
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      pReqs[i].taskId = pVgInfo->taskId;
L
Liu Jicong 已提交
451
    }
L
Liu Jicong 已提交
452

453
    for (int32_t i = 0; i < numOfBlocks; i++) {
L
Liu Jicong 已提交
454
      SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
L
Liu Jicong 已提交
455

L
Liu Jicong 已提交
456 457 458
      // TODO: do not use broadcast
      if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
        for (int32_t j = 0; j < vgSz; j++) {
459
          if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
L
Liu Jicong 已提交
460 461
            goto FAIL_SHUFFLE_DISPATCH;
          }
L
Liu Jicong 已提交
462 463 464
          if (pReqs[j].blockNum == 0) {
            atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
          }
L
Liu Jicong 已提交
465
          pReqs[j].blockNum++;
L
Liu Jicong 已提交
466
        }
L
Liu Jicong 已提交
467 468 469
        continue;
      }

H
Haojun Liao 已提交
470
      if (streamSearchAndAddBlock(pTask, pReqs, pDataBlock, vgSz, pDataBlock->info.id.groupId) < 0) {
L
Liu Jicong 已提交
471
        goto FAIL_SHUFFLE_DISPATCH;
L
Liu Jicong 已提交
472 473
      }
    }
L
Liu Jicong 已提交
474

H
Haojun Liao 已提交
475
    qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->selfChildId,
476
           numOfBlocks, vgSz);
H
Haojun Liao 已提交
477

L
Liu Jicong 已提交
478 479 480
    for (int32_t i = 0; i < vgSz; i++) {
      if (pReqs[i].blockNum > 0) {
        SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
H
Haojun Liao 已提交
481 482 483
        qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->selfChildId,
               pReqs[i].blockNum, pVgInfo->vgId);

484
        if (doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) {
L
Liu Jicong 已提交
485 486 487 488
          goto FAIL_SHUFFLE_DISPATCH;
        }
      }
    }
H
Haojun Liao 已提交
489

L
Liu Jicong 已提交
490
    code = 0;
H
Haojun Liao 已提交
491

L
Liu Jicong 已提交
492
  FAIL_SHUFFLE_DISPATCH:
H
Haojun Liao 已提交
493 494 495
    for (int32_t i = 0; i < vgSz; i++) {
      taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
      taosArrayDestroy(pReqs[i].dataLen);
L
Liu Jicong 已提交
496
    }
H
Haojun Liao 已提交
497
    taosMemoryFree(pReqs);
L
Liu Jicong 已提交
498
  }
H
Haojun Liao 已提交
499
  return code;
L
Liu Jicong 已提交
500 501
}

502
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
503
  ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
H
Haojun Liao 已提交
504 505 506 507 508
  int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue);
  if (numOfElems > 0) {
    qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr,
           numOfElems);
  }
L
Liu Jicong 已提交
509

L
Liu Jicong 已提交
510 511 512
  int8_t old =
      atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
  if (old != TASK_OUTPUT_STATUS__NORMAL) {
513
    qDebug("s-task:%s task wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old);
L
Liu Jicong 已提交
514
    return 0;
L
Liu Jicong 已提交
515 516
  }

517
  qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pTask->outputStatus);
518

519 520
  SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue);
  if (pDispatchedBlock == NULL) {
L
Liu Jicong 已提交
521
    atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
522 523
    qDebug("s-task:%s stop dispatching since no output in output queue, output status:%d", pTask->id.idStr,
           pTask->outputStatus);
L
Liu Jicong 已提交
524 525
    return 0;
  }
L
Liu Jicong 已提交
526

527
  ASSERT(pDispatchedBlock->type == STREAM_INPUT__DATA_BLOCK);
L
Liu Jicong 已提交
528

H
Haojun Liao 已提交
529
  int32_t code = streamDispatchAllBlocks(pTask, pDispatchedBlock);
530
  if (code != TSDB_CODE_SUCCESS) {
531 532
    streamQueueProcessFail(pTask->outputQueue);
    atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
533
    qDebug("s-task:%s failed to dispatch msg to downstream, output status:%d", pTask->id.idStr, pTask->outputStatus);
L
Liu Jicong 已提交
534
  }
535

536 537
  // this block can be freed only when it has been pushed to down stream.
  destroyStreamDataBlock(pDispatchedBlock);
L
Liu Jicong 已提交
538
  return code;
L
Liu Jicong 已提交
539
}