streamDispatch.c 18.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInc.h"
L
Liu Jicong 已提交
17 18 19 20 21

int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
L
Liu Jicong 已提交
22 23 24
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
L
Liu Jicong 已提交
25
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
L
Liu Jicong 已提交
26
  if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
27
  if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
L
Liu Jicong 已提交
28 29 30 31 32 33 34 35 36
  ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
  ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
  for (int32_t i = 0; i < pReq->blockNum; i++) {
    int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
    void*   data = taosArrayGetP(pReq->data, i);
    if (tEncodeI32(pEncoder, len) < 0) return -1;
    if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
  }
  tEndEncode(pEncoder);
37
  return pEncoder->pos;
L
Liu Jicong 已提交
38 39 40 41 42 43
}

int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
L
Liu Jicong 已提交
44 45 46
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->dataSrcVgId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
L
Liu Jicong 已提交
47
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
L
Liu Jicong 已提交
48
  if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
49 50
  if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1;

L
Liu Jicong 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
  ASSERT(pReq->blockNum > 0);
  pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
  pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t));
  for (int32_t i = 0; i < pReq->blockNum; i++) {
    int32_t  len1;
    uint64_t len2;
    void*    data;
    if (tDecodeI32(pDecoder, &len1) < 0) return -1;
    if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1;
    ASSERT(len1 == len2);
    taosArrayPush(pReq->dataLen, &len1);
    taosArrayPush(pReq->data, &data);
  }
  tEndDecode(pDecoder);
  return 0;
}

L
Liu Jicong 已提交
68
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq) {
L
Liu Jicong 已提交
69 70 71 72
  taosArrayDestroyP(pReq->data, taosMemoryFree);
  taosArrayDestroy(pReq->dataLen);
}

L
Liu Jicong 已提交
73 74 75
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
76
  if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
77 78 79 80
  if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->srcTaskId) < 0) return -1;
L
Liu Jicong 已提交
81
  if (tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen) < 0) return -1;
L
Liu Jicong 已提交
82 83 84 85 86 87 88
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
89
  if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
90 91 92 93
  if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->srcTaskId) < 0) return -1;
L
Liu Jicong 已提交
94 95
  uint64_t len = 0;
  if (tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len) < 0) return -1;
96
  pReq->retrieveLen = (int32_t)len;
L
Liu Jicong 已提交
97 98 99 100
  tEndDecode(pDecoder);
  return 0;
}

L
Liu Jicong 已提交
101 102
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }

L
Liu Jicong 已提交
103
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) {
L
Liu Jicong 已提交
104
  int32_t            code = -1;
L
Liu Jicong 已提交
105 106 107 108 109 110 111
  SRetrieveTableRsp* pRetrieve = NULL;
  void*              buf = NULL;
  int32_t            dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);

  pRetrieve = taosMemoryCalloc(1, dataStrLen);
  if (pRetrieve == NULL) return -1;

H
Haojun Liao 已提交
112
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
L
Liu Jicong 已提交
113 114 115 116 117
  pRetrieve->useconds = 0;
  pRetrieve->precision = TSDB_DEFAULT_PRECISION;
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
  pRetrieve->streamBlockType = pBlock->info.type;
118
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
H
Haojun Liao 已提交
119
  pRetrieve->numOfCols = htonl(numOfCols);
120 121
  pRetrieve->skey = htobe64(pBlock->info.window.skey);
  pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
122
  pRetrieve->version = htobe64(pBlock->info.version);
L
Liu Jicong 已提交
123

H
Haojun Liao 已提交
124
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
L
Liu Jicong 已提交
125 126

  SStreamRetrieveReq req = {
127
      .streamId = pTask->id.streamId,
L
Liu Jicong 已提交
128
      .srcNodeId = pTask->nodeId,
129
      .srcTaskId = pTask->id.taskId,
L
Liu Jicong 已提交
130
      .pRetrieve = pRetrieve,
5
54liuyao 已提交
131
      .retrieveLen = dataStrLen,
L
Liu Jicong 已提交
132 133 134 135 136
  };

  int32_t sz = taosArrayGetSize(pTask->childEpInfo);
  ASSERT(sz > 0);
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
137
    req.reqId = tGenIdPI64();
L
Liu Jicong 已提交
138 139 140 141 142 143 144 145 146 147 148 149
    SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i);
    req.dstNodeId = pEpInfo->nodeId;
    req.dstTaskId = pEpInfo->taskId;
    int32_t len;
    tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code);
    if (code < 0) {
      ASSERT(0);
      return -1;
    }

    buf = rpcMallocCont(sizeof(SMsgHead) + len);
    if (buf == NULL) {
L
Liu Jicong 已提交
150
      goto CLEAR;
L
Liu Jicong 已提交
151 152 153 154 155 156 157
    }

    ((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId);
    void*    abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
    SEncoder encoder;
    tEncoderInit(&encoder, abuf, len);
    tEncodeStreamRetrieveReq(&encoder, &req);
L
Liu Jicong 已提交
158
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
159

dengyihao's avatar
dengyihao 已提交
160
    SRpcMsg rpcMsg = {.code = 0, .msgType = TDMT_STREAM_RETRIEVE, .pCont = buf, .contLen = sizeof(SMsgHead) + len};
L
Liu Jicong 已提交
161 162
    if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) {
      ASSERT(0);
L
Liu Jicong 已提交
163
      goto CLEAR;
L
Liu Jicong 已提交
164
    }
L
Liu Jicong 已提交
165

H
Haojun Liao 已提交
166
    buf = NULL;
167
    qDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
S
Shengliang Guan 已提交
168
           pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
L
Liu Jicong 已提交
169
  }
L
Liu Jicong 已提交
170
  code = 0;
H
Haojun Liao 已提交
171

L
Liu Jicong 已提交
172 173 174 175
CLEAR:
  taosMemoryFree(pRetrieve);
  rpcFreeCont(buf);
  return code;
L
Liu Jicong 已提交
176 177
}

178
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
179 180 181 182 183 184 185 186 187
  int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
  void*   buf = taosMemoryCalloc(1, dataStrLen);
  if (buf == NULL) return -1;

  SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
  pRetrieve->useconds = 0;
  pRetrieve->precision = TSDB_DEFAULT_PRECISION;
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
L
Liu Jicong 已提交
188
  pRetrieve->streamBlockType = pBlock->info.type;
189
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
190 191
  pRetrieve->skey = htobe64(pBlock->info.window.skey);
  pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
192
  pRetrieve->version = htobe64(pBlock->info.version);
193
  pRetrieve->watermark = htobe64(pBlock->info.watermark);
194
  memcpy(pRetrieve->parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
195

L
Liu Jicong 已提交
196
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
197
  pRetrieve->numOfCols = htonl(numOfCols);
198

H
Haojun Liao 已提交
199
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
200 201 202 203 204
  actualLen += sizeof(SRetrieveTableRsp);
  ASSERT(actualLen <= dataStrLen);
  taosArrayPush(pReq->dataLen, &actualLen);
  taosArrayPush(pReq->data, &buf);

205
  pReq->totalLen += dataStrLen;
206 207 208
  return 0;
}

209
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) {
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
  tEncodeSize(tEncodeSStreamTaskCheckReq, pReq, tlen, code);
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(nodeId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
  if ((code = tEncodeSStreamTaskCheckReq(&encoder, pReq)) < 0) {
231 232
    rpcFreeCont(buf);
    return code;
233
  }
234

235 236 237 238 239 240
  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
  msg.msgType = TDMT_STREAM_TASK_CHECK;

241
  qDebug("s-task:%s dispatch check msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
242
         pReq->streamId, pReq->downstreamTaskId, nodeId);
243 244 245 246 247

  tmsgSendReq(pEpSet, &msg);
  return 0;
}

248
int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
249 250 251 252 253 254 255 256 257 258 259 260 261
                                          SEpSet* pEpSet) {
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
  tEncodeSize(tEncodeSStreamRecoverFinishReq, pReq, tlen, code);
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
H
Haojun Liao 已提交
262
    terrno = TSDB_CODE_OUT_OF_MEMORY;
263 264 265 266 267 268 269 270 271
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
  if ((code = tEncodeSStreamRecoverFinishReq(&encoder, pReq)) < 0) {
H
Haojun Liao 已提交
272 273 274 275
    if (buf) {
      rpcFreeCont(buf);
    }
    return code;
276
  }
H
Haojun Liao 已提交
277

278 279 280 281
  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
L
Liu Jicong 已提交
282
  msg.msgType = TDMT_STREAM_RECOVER_FINISH;
dengyihao's avatar
dengyihao 已提交
283
  msg.info.noResp = 1;
284 285

  tmsgSendReq(pEpSet, &msg);
286
  qDebug("s-task:%s dispatch recover finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->taskId, vgId);
287

288 289 290
  return 0;
}

291
int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
L
Liu Jicong 已提交
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  // serialize
  int32_t tlen;
  tEncodeSize(tEncodeStreamDispatchReq, pReq, tlen, code);
  if (code < 0) goto FAIL;
  code = -1;
  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    goto FAIL;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
  if ((code = tEncodeStreamDispatchReq(&encoder, pReq)) < 0) {
    goto FAIL;
  }
  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
  msg.msgType = pTask->dispatchMsgType;

320
  qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId);
L
Liu Jicong 已提交
321 322 323 324
  tmsgSendReq(pEpSet, &msg);

  code = 0;
  return 0;
325

326 327 328
FAIL:
  if (buf) rpcFreeCont(buf);
  return code;
L
Liu Jicong 已提交
329 330
}

L
Liu Jicong 已提交
331 332
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
                                int64_t groupId) {
L
Liu Jicong 已提交
333 334 335 336 337
  char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
  if (ctbName == NULL) {
    return -1;
  }

338 339 340
  if (pDataBlock->info.parTbName[0]) {
    snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
  } else {
L
Liu Jicong 已提交
341 342 343
    char* ctbShortName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId);
    snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, ctbShortName);
    taosMemoryFree(ctbShortName);
344
  }
L
Liu Jicong 已提交
345

L
Liu Jicong 已提交
346 347
  SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;

348 349 350 351
  /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
  SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo;
  uint32_t   hashValue =
      taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
L
Liu Jicong 已提交
352
  taosMemoryFree(ctbName);
353

L
Liu Jicong 已提交
354 355 356 357 358 359 360
  bool found = false;
  // TODO: optimize search
  int32_t j;
  for (j = 0; j < vgSz; j++) {
    SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
    ASSERT(pVgInfo->vgId > 0);
    if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
361
      if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
L
Liu Jicong 已提交
362 363 364 365 366 367 368 369 370 371 372 373 374 375
        return -1;
      }
      if (pReqs[j].blockNum == 0) {
        atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
      }
      pReqs[j].blockNum++;
      found = true;
      break;
    }
  }
  ASSERT(found);
  return 0;
}

L
Liu Jicong 已提交
376
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
377 378 379
  int32_t code = 0;
  int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
  ASSERT(numOfBlocks != 0);
L
Liu Jicong 已提交
380

381
  if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
L
Liu Jicong 已提交
382
    SStreamDispatchReq req = {
383
        .streamId = pTask->id.streamId,
L
Liu Jicong 已提交
384
        .dataSrcVgId = pData->srcVgId,
385
        .upstreamTaskId = pTask->id.taskId,
L
Liu Jicong 已提交
386 387
        .upstreamChildId = pTask->selfChildId,
        .upstreamNodeId = pTask->nodeId,
388
        .blockNum = numOfBlocks,
L
Liu Jicong 已提交
389 390
    };

391 392
    req.data = taosArrayInit(numOfBlocks, sizeof(void*));
    req.dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t));
L
Liu Jicong 已提交
393
    if (req.data == NULL || req.dataLen == NULL) {
394 395
      taosArrayDestroyP(req.data, taosMemoryFree);
      taosArrayDestroy(req.dataLen);
H
Haojun Liao 已提交
396
      return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
397 398
    }

399
    for (int32_t i = 0; i < numOfBlocks; i++) {
L
Liu Jicong 已提交
400
      SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
H
Haojun Liao 已提交
401 402 403
      code = streamAddBlockIntoDispatchMsg(pDataBlock, &req);

      if (code != TSDB_CODE_SUCCESS) {
404 405 406
        taosArrayDestroyP(req.data, taosMemoryFree);
        taosArrayDestroy(req.dataLen);
        return code;
L
Liu Jicong 已提交
407 408
      }
    }
409

L
Liu Jicong 已提交
410 411 412 413 414 415
    int32_t vgId = pTask->fixedEpDispatcher.nodeId;
    SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
    int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;

    req.taskId = downstreamTaskId;

416
    qDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to down stream s-task:0x%x in vgId:%d", pTask->id.idStr,
417
           pTask->selfChildId, numOfBlocks, downstreamTaskId, vgId);
L
Liu Jicong 已提交
418

H
Haojun Liao 已提交
419 420 421 422
    code = doSendDispatchMsg(pTask, &req, vgId, pEpSet);
    taosArrayDestroyP(req.data, taosMemoryFree);
    taosArrayDestroy(req.dataLen);
    return code;
423
  } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
424 425 426 427
    int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
    ASSERT(rspCnt == 0);

    SArray*             vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
L
Liu Jicong 已提交
428 429 430
    int32_t             vgSz = taosArrayGetSize(vgInfo);
    SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq));
    if (pReqs == NULL) {
H
Haojun Liao 已提交
431
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
432 433 434 435
      return -1;
    }

    for (int32_t i = 0; i < vgSz; i++) {
436
      pReqs[i].streamId = pTask->id.streamId;
L
Liu Jicong 已提交
437
      pReqs[i].dataSrcVgId = pData->srcVgId;
438
      pReqs[i].upstreamTaskId = pTask->id.taskId;
L
Liu Jicong 已提交
439 440 441 442 443 444 445 446
      pReqs[i].upstreamChildId = pTask->selfChildId;
      pReqs[i].upstreamNodeId = pTask->nodeId;
      pReqs[i].blockNum = 0;
      pReqs[i].data = taosArrayInit(0, sizeof(void*));
      pReqs[i].dataLen = taosArrayInit(0, sizeof(int32_t));
      if (pReqs[i].data == NULL || pReqs[i].dataLen == NULL) {
        goto FAIL_SHUFFLE_DISPATCH;
      }
H
Haojun Liao 已提交
447

L
Liu Jicong 已提交
448 449
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      pReqs[i].taskId = pVgInfo->taskId;
L
Liu Jicong 已提交
450
    }
L
Liu Jicong 已提交
451

452
    for (int32_t i = 0; i < numOfBlocks; i++) {
L
Liu Jicong 已提交
453
      SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
L
Liu Jicong 已提交
454

L
Liu Jicong 已提交
455 456 457
      // TODO: do not use broadcast
      if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
        for (int32_t j = 0; j < vgSz; j++) {
458
          if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
L
Liu Jicong 已提交
459 460
            goto FAIL_SHUFFLE_DISPATCH;
          }
L
Liu Jicong 已提交
461 462 463
          if (pReqs[j].blockNum == 0) {
            atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
          }
L
Liu Jicong 已提交
464
          pReqs[j].blockNum++;
L
Liu Jicong 已提交
465
        }
L
Liu Jicong 已提交
466 467 468
        continue;
      }

H
Haojun Liao 已提交
469
      if (streamSearchAndAddBlock(pTask, pReqs, pDataBlock, vgSz, pDataBlock->info.id.groupId) < 0) {
L
Liu Jicong 已提交
470
        goto FAIL_SHUFFLE_DISPATCH;
L
Liu Jicong 已提交
471 472
      }
    }
L
Liu Jicong 已提交
473

H
Haojun Liao 已提交
474
    qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->selfChildId,
475
           numOfBlocks, vgSz);
H
Haojun Liao 已提交
476

L
Liu Jicong 已提交
477 478 479
    for (int32_t i = 0; i < vgSz; i++) {
      if (pReqs[i].blockNum > 0) {
        SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
H
Haojun Liao 已提交
480 481 482
        qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->selfChildId,
               pReqs[i].blockNum, pVgInfo->vgId);

483
        if (doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) {
L
Liu Jicong 已提交
484 485 486 487
          goto FAIL_SHUFFLE_DISPATCH;
        }
      }
    }
H
Haojun Liao 已提交
488

L
Liu Jicong 已提交
489
    code = 0;
H
Haojun Liao 已提交
490

L
Liu Jicong 已提交
491
  FAIL_SHUFFLE_DISPATCH:
H
Haojun Liao 已提交
492 493 494
    for (int32_t i = 0; i < vgSz; i++) {
      taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
      taosArrayDestroy(pReqs[i].dataLen);
L
Liu Jicong 已提交
495
    }
H
Haojun Liao 已提交
496
    taosMemoryFree(pReqs);
L
Liu Jicong 已提交
497
  }
H
Haojun Liao 已提交
498
  return code;
L
Liu Jicong 已提交
499 500
}

501
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
502
  ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
H
Haojun Liao 已提交
503 504 505 506 507
  int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue);
  if (numOfElems > 0) {
    qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr,
           numOfElems);
  }
L
Liu Jicong 已提交
508

L
Liu Jicong 已提交
509 510 511
  int8_t old =
      atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
  if (old != TASK_OUTPUT_STATUS__NORMAL) {
512
    qDebug("s-task:%s task wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old);
L
Liu Jicong 已提交
513
    return 0;
L
Liu Jicong 已提交
514 515
  }

516
  qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pTask->outputStatus);
517

518 519
  SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue);
  if (pDispatchedBlock == NULL) {
L
Liu Jicong 已提交
520
    atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
521 522
    qDebug("s-task:%s stop dispatching since no output in output queue, output status:%d", pTask->id.idStr,
           pTask->outputStatus);
L
Liu Jicong 已提交
523 524
    return 0;
  }
L
Liu Jicong 已提交
525

526
  ASSERT(pDispatchedBlock->type == STREAM_INPUT__DATA_BLOCK);
L
Liu Jicong 已提交
527

H
Haojun Liao 已提交
528
  int32_t code = streamDispatchAllBlocks(pTask, pDispatchedBlock);
529
  if (code != TSDB_CODE_SUCCESS) {
530 531
    streamQueueProcessFail(pTask->outputQueue);
    atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
532
    qDebug("s-task:%s failed to dispatch msg to downstream, output status:%d", pTask->id.idStr, pTask->outputStatus);
L
Liu Jicong 已提交
533
  }
534

535 536
  // this block can be freed only when it has been pushed to down stream.
  destroyStreamDataBlock(pDispatchedBlock);
L
Liu Jicong 已提交
537
  return code;
L
Liu Jicong 已提交
538
}