streamDispatch.c 29.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInt.h"
17
#include "ttimer.h"
L
Liu Jicong 已提交
18

H
Haojun Liao 已提交
19 20 21
#define MAX_BLOCK_NAME_NUM         1024
#define DISPATCH_RETRY_INTERVAL_MS 300
#define MAX_CONTINUE_RETRY_COUNT   5
L
liuyao 已提交
22 23 24 25 26 27

typedef struct SBlockName {
  uint32_t hashValue;
  char     parTbName[TSDB_TABLE_NAME_LEN];
} SBlockName;

H
Haojun Liao 已提交
28 29 30
static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
                                      int32_t numOfBlocks, int64_t dstTaskId, int32_t type);

31 32 33 34 35 36
static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
    pMsg->msgType = msgType;
    pMsg->pCont = pCont;
    pMsg->contLen = contLen;
}

37
static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
L
Liu Jicong 已提交
38 39 40
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
41
  if (tEncodeI32(pEncoder, pReq->type) < 0) return -1;
L
Liu Jicong 已提交
42
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
43
  if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1;
L
Liu Jicong 已提交
44
  if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
L
Liu Jicong 已提交
45
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
L
Liu Jicong 已提交
46
  if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
47
  if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
L
Liu Jicong 已提交
48 49 50 51 52 53 54 55 56
  ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
  ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
  for (int32_t i = 0; i < pReq->blockNum; i++) {
    int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
    void*   data = taosArrayGetP(pReq->data, i);
    if (tEncodeI32(pEncoder, len) < 0) return -1;
    if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
  }
  tEndEncode(pEncoder);
57
  return pEncoder->pos;
L
Liu Jicong 已提交
58 59
}

60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
  int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
  void*   buf = taosMemoryCalloc(1, dataStrLen);
  if (buf == NULL) return -1;

  SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
  pRetrieve->useconds = 0;
  pRetrieve->precision = TSDB_DEFAULT_PRECISION;
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
  pRetrieve->streamBlockType = pBlock->info.type;
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
  pRetrieve->skey = htobe64(pBlock->info.window.skey);
  pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
  pRetrieve->version = htobe64(pBlock->info.version);
  pRetrieve->watermark = htobe64(pBlock->info.watermark);
  memcpy(pRetrieve->parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);

  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
  pRetrieve->numOfCols = htonl(numOfCols);

  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
  actualLen += sizeof(SRetrieveTableRsp);
  ASSERT(actualLen <= dataStrLen);
  taosArrayPush(pReq->dataLen, &actualLen);
  taosArrayPush(pReq->data, &buf);

  pReq->totalLen += dataStrLen;
  return 0;
}

L
Liu Jicong 已提交
91 92 93 94
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
95
  if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1;
L
Liu Jicong 已提交
96
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
97
  if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1;
L
Liu Jicong 已提交
98
  if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
L
Liu Jicong 已提交
99
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
L
Liu Jicong 已提交
100
  if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
101 102
  if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1;

L
Liu Jicong 已提交
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
  ASSERT(pReq->blockNum > 0);
  pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
  pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t));
  for (int32_t i = 0; i < pReq->blockNum; i++) {
    int32_t  len1;
    uint64_t len2;
    void*    data;
    if (tDecodeI32(pDecoder, &len1) < 0) return -1;
    if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1;
    ASSERT(len1 == len2);
    taosArrayPush(pReq->dataLen, &len1);
    taosArrayPush(pReq->data, &data);
  }
  tEndDecode(pDecoder);
  return 0;
}

120 121
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
                               int64_t dstTaskId, int32_t type) {
122
  pReq->streamId = pTask->id.streamId;
123
  pReq->srcVgId = vgId;
124 125 126 127 128
  pReq->upstreamTaskId = pTask->id.taskId;
  pReq->upstreamChildId = pTask->info.selfChildId;
  pReq->upstreamNodeId = pTask->info.nodeId;
  pReq->blockNum = numOfBlocks;
  pReq->taskId = dstTaskId;
H
Haojun Liao 已提交
129
  pReq->type = type;
130 131 132 133 134 135 136 137 138 139 140 141

  pReq->data = taosArrayInit(numOfBlocks, POINTER_BYTES);
  pReq->dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t));
  if (pReq->data == NULL || pReq->dataLen == NULL) {
    taosArrayDestroyP(pReq->data, taosMemoryFree);
    taosArrayDestroy(pReq->dataLen);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
142
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq) {
L
Liu Jicong 已提交
143 144 145 146
  taosArrayDestroyP(pReq->data, taosMemoryFree);
  taosArrayDestroy(pReq->dataLen);
}

L
Liu Jicong 已提交
147 148 149
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
150
  if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
151 152 153 154
  if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->srcTaskId) < 0) return -1;
L
Liu Jicong 已提交
155
  if (tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen) < 0) return -1;
L
Liu Jicong 已提交
156 157 158 159 160 161 162
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
163
  if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
164 165 166 167
  if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->srcTaskId) < 0) return -1;
L
Liu Jicong 已提交
168 169
  uint64_t len = 0;
  if (tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len) < 0) return -1;
170
  pReq->retrieveLen = (int32_t)len;
L
Liu Jicong 已提交
171 172 173 174
  tEndDecode(pDecoder);
  return 0;
}

L
Liu Jicong 已提交
175 176
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }

L
Liu Jicong 已提交
177
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) {
L
Liu Jicong 已提交
178
  int32_t            code = -1;
L
Liu Jicong 已提交
179 180 181 182 183 184 185
  SRetrieveTableRsp* pRetrieve = NULL;
  void*              buf = NULL;
  int32_t            dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);

  pRetrieve = taosMemoryCalloc(1, dataStrLen);
  if (pRetrieve == NULL) return -1;

H
Haojun Liao 已提交
186
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
L
Liu Jicong 已提交
187 188 189 190 191
  pRetrieve->useconds = 0;
  pRetrieve->precision = TSDB_DEFAULT_PRECISION;
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
  pRetrieve->streamBlockType = pBlock->info.type;
192
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
H
Haojun Liao 已提交
193
  pRetrieve->numOfCols = htonl(numOfCols);
194 195
  pRetrieve->skey = htobe64(pBlock->info.window.skey);
  pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
196
  pRetrieve->version = htobe64(pBlock->info.version);
L
Liu Jicong 已提交
197

H
Haojun Liao 已提交
198
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
L
Liu Jicong 已提交
199 200

  SStreamRetrieveReq req = {
201
      .streamId = pTask->id.streamId,
202
      .srcNodeId = pTask->info.nodeId,
203
      .srcTaskId = pTask->id.taskId,
L
Liu Jicong 已提交
204
      .pRetrieve = pRetrieve,
5
54liuyao 已提交
205
      .retrieveLen = dataStrLen,
L
Liu Jicong 已提交
206 207
  };

208
  int32_t sz = taosArrayGetSize(pTask->pUpstreamEpInfoList);
L
Liu Jicong 已提交
209 210
  ASSERT(sz > 0);
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
211
    req.reqId = tGenIdPI64();
212
    SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i);
L
Liu Jicong 已提交
213 214 215 216 217 218 219 220 221 222 223
    req.dstNodeId = pEpInfo->nodeId;
    req.dstTaskId = pEpInfo->taskId;
    int32_t len;
    tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code);
    if (code < 0) {
      ASSERT(0);
      return -1;
    }

    buf = rpcMallocCont(sizeof(SMsgHead) + len);
    if (buf == NULL) {
L
Liu Jicong 已提交
224
      goto CLEAR;
L
Liu Jicong 已提交
225 226 227 228 229 230 231
    }

    ((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId);
    void*    abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
    SEncoder encoder;
    tEncoderInit(&encoder, abuf, len);
    tEncodeStreamRetrieveReq(&encoder, &req);
L
Liu Jicong 已提交
232
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
233

dengyihao's avatar
dengyihao 已提交
234
    SRpcMsg rpcMsg = {.code = 0, .msgType = TDMT_STREAM_RETRIEVE, .pCont = buf, .contLen = sizeof(SMsgHead) + len};
L
Liu Jicong 已提交
235 236
    if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) {
      ASSERT(0);
L
Liu Jicong 已提交
237
      goto CLEAR;
L
Liu Jicong 已提交
238
    }
L
Liu Jicong 已提交
239

H
Haojun Liao 已提交
240
    buf = NULL;
241
    qDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
242
           pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
L
Liu Jicong 已提交
243
  }
L
Liu Jicong 已提交
244
  code = 0;
H
Haojun Liao 已提交
245

L
Liu Jicong 已提交
246 247 248 249
CLEAR:
  taosMemoryFree(pRetrieve);
  rpcFreeCont(buf);
  return code;
L
Liu Jicong 已提交
250 251
}

252
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) {
253 254 255 256 257
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
258
  tEncodeSize(tEncodeStreamTaskCheckReq, pReq, tlen, code);
259 260 261 262 263 264 265 266 267 268 269 270 271 272
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(nodeId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
273
  if ((code = tEncodeStreamTaskCheckReq(&encoder, pReq)) < 0) {
274 275
    rpcFreeCont(buf);
    return code;
276
  }
277

278 279 280 281 282 283
  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
  msg.msgType = TDMT_STREAM_TASK_CHECK;

H
Haojun Liao 已提交
284 285
  qDebug("s-task:%s (level:%d) dispatch check msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
         pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId);
286 287 288 289 290

  tmsgSendReq(pEpSet, &msg);
  return 0;
}

291
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
292
                                             SEpSet* pEpSet) {
293 294 295 296 297
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
298
  tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
299 300 301 302 303 304
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
H
Haojun Liao 已提交
305
    terrno = TSDB_CODE_OUT_OF_MEMORY;
306 307 308 309 310 311 312 313
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
314
  if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
H
Haojun Liao 已提交
315 316 317 318
    if (buf) {
      rpcFreeCont(buf);
    }
    return code;
319
  }
H
Haojun Liao 已提交
320

321 322 323 324
  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
325
  msg.msgType = TDMT_STREAM_SCAN_HISTORY_FINISH;
326 327

  tmsgSendReq(pEpSet, &msg);
328 329

  const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
330 331
  qDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus,
         pReq->downstreamTaskId, vgId);
332 333 334
  return 0;
}

335
static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
L
Liu Jicong 已提交
336 337 338 339 340 341 342
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  // serialize
  int32_t tlen;
  tEncodeSize(tEncodeStreamDispatchReq, pReq, tlen, code);
H
Haojun Liao 已提交
343 344 345 346
  if (code < 0) {
    goto FAIL;
  }

L
Liu Jicong 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
  code = -1;
  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    goto FAIL;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
  if ((code = tEncodeStreamDispatchReq(&encoder, pReq)) < 0) {
    goto FAIL;
  }
  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
365
  msg.msgType = pTask->msgInfo.msgType;
L
Liu Jicong 已提交
366

H
Haojun Liao 已提交
367 368
  qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg, len:%d", pTask->id.idStr, pReq->taskId, vgId,
         msg.contLen);
H
Haojun Liao 已提交
369
  return tmsgSendReq(pEpSet, &msg);
370

371
FAIL:
H
Haojun Liao 已提交
372 373 374 375
  if (buf) {
    rpcFreeCont(buf);
  }

376
  return code;
L
Liu Jicong 已提交
377 378
}

L
Liu Jicong 已提交
379 380
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
                                int64_t groupId) {
L
liuyao 已提交
381 382 383 384
  uint32_t   hashValue = 0;
  SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
  if (pTask->pNameMap == NULL) {
    pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
L
Liu Jicong 已提交
385 386
  }

L
liuyao 已提交
387 388 389 390 391
  void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t));
  if (pVal) {
    SBlockName* pBln = (SBlockName*)pVal;
    hashValue = pBln->hashValue;
    if (!pDataBlock->info.parTbName[0]) {
L
liuyao 已提交
392
      memset(pDataBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
L
liuyao 已提交
393 394
      memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName));
    }
395
  } else {
L
liuyao 已提交
396 397 398 399
    char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
    if (ctbName == NULL) {
      return -1;
    }
L
Liu Jicong 已提交
400

L
liuyao 已提交
401 402 403 404 405 406
    if (pDataBlock->info.parTbName[0]) {
      snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
    } else {
      buildCtbNameByGroupIdImpl(pTask->shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
      snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
    }
L
Liu Jicong 已提交
407

L
liuyao 已提交
408 409 410 411 412 413 414 415 416 417 418 419
    /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
    SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo;
    hashValue =
        taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
    taosMemoryFree(ctbName);
    SBlockName bln = {0};
    bln.hashValue = hashValue;
    memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
    if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) {
      tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName));
    }
  }
420

L
Liu Jicong 已提交
421 422 423 424 425 426
  bool found = false;
  // TODO: optimize search
  int32_t j;
  for (j = 0; j < vgSz; j++) {
    SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
    ASSERT(pVgInfo->vgId > 0);
H
Haojun Liao 已提交
427

L
Liu Jicong 已提交
428
    if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
429
      if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
L
Liu Jicong 已提交
430 431
        return -1;
      }
H
Haojun Liao 已提交
432

L
Liu Jicong 已提交
433 434 435
      if (pReqs[j].blockNum == 0) {
        atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
      }
H
Haojun Liao 已提交
436

L
Liu Jicong 已提交
437 438 439 440 441 442 443 444 445
      pReqs[j].blockNum++;
      found = true;
      break;
    }
  }
  ASSERT(found);
  return 0;
}

H
Haojun Liao 已提交
446
static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
447 448 449
  int32_t code = 0;
  int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
  ASSERT(numOfBlocks != 0);
L
Liu Jicong 已提交
450

451
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
452 453 454
    SStreamDispatchReq req = {0};

    int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
H
Haojun Liao 已提交
455
    code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
456 457
    if (code != TSDB_CODE_SUCCESS) {
      return code;
L
Liu Jicong 已提交
458 459
    }

460
    for (int32_t i = 0; i < numOfBlocks; i++) {
L
Liu Jicong 已提交
461
      SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
H
Haojun Liao 已提交
462

463
      code = streamAddBlockIntoDispatchMsg(pDataBlock, &req);
H
Haojun Liao 已提交
464
      if (code != TSDB_CODE_SUCCESS) {
465 466 467
        taosArrayDestroyP(req.data, taosMemoryFree);
        taosArrayDestroy(req.dataLen);
        return code;
L
Liu Jicong 已提交
468 469
      }
    }
470

L
Liu Jicong 已提交
471 472 473
    int32_t vgId = pTask->fixedEpDispatcher.nodeId;
    SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;

474 475
    qDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d)", pTask->id.idStr,
           pTask->info.selfChildId, numOfBlocks, downstreamTaskId, vgId);
L
Liu Jicong 已提交
476

H
Haojun Liao 已提交
477 478 479 480
    code = doSendDispatchMsg(pTask, &req, vgId, pEpSet);
    taosArrayDestroyP(req.data, taosMemoryFree);
    taosArrayDestroy(req.dataLen);
    return code;
481
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
482 483 484
    int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
    ASSERT(rspCnt == 0);

485 486 487
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
    int32_t vgSz = taosArrayGetSize(vgInfo);

L
Liu Jicong 已提交
488 489
    SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq));
    if (pReqs == NULL) {
H
Haojun Liao 已提交
490
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
491 492 493 494
      return -1;
    }

    for (int32_t i = 0; i < vgSz; i++) {
495
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
H
Haojun Liao 已提交
496
      code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type);
497
      if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
498 499 500
        goto FAIL_SHUFFLE_DISPATCH;
      }
    }
L
Liu Jicong 已提交
501

502
    for (int32_t i = 0; i < numOfBlocks; i++) {
L
Liu Jicong 已提交
503
      SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
L
Liu Jicong 已提交
504

L
Liu Jicong 已提交
505
      // TODO: do not use broadcast
H
Haojun Liao 已提交
506
      if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT || pDataBlock->info.type == STREAM_TRANS_STATE) {
L
Liu Jicong 已提交
507
        for (int32_t j = 0; j < vgSz; j++) {
508
          if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
L
Liu Jicong 已提交
509 510
            goto FAIL_SHUFFLE_DISPATCH;
          }
511

L
Liu Jicong 已提交
512 513 514
          if (pReqs[j].blockNum == 0) {
            atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
          }
L
Liu Jicong 已提交
515
          pReqs[j].blockNum++;
L
Liu Jicong 已提交
516
        }
517

L
Liu Jicong 已提交
518 519 520
        continue;
      }

H
Haojun Liao 已提交
521
      if (streamSearchAndAddBlock(pTask, pReqs, pDataBlock, vgSz, pDataBlock->info.id.groupId) < 0) {
L
Liu Jicong 已提交
522
        goto FAIL_SHUFFLE_DISPATCH;
L
Liu Jicong 已提交
523 524
      }
    }
L
Liu Jicong 已提交
525

H
Haojun Liao 已提交
526 527
    qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr,
           pTask->info.selfChildId, numOfBlocks, vgSz);
H
Haojun Liao 已提交
528

L
Liu Jicong 已提交
529 530 531
    for (int32_t i = 0; i < vgSz; i++) {
      if (pReqs[i].blockNum > 0) {
        SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
H
Haojun Liao 已提交
532 533
        qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
               pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId);
H
Haojun Liao 已提交
534

H
Haojun Liao 已提交
535 536
        code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet);
        if (code < 0) {
L
Liu Jicong 已提交
537 538 539 540
          goto FAIL_SHUFFLE_DISPATCH;
        }
      }
    }
H
Haojun Liao 已提交
541

L
Liu Jicong 已提交
542
    code = 0;
H
Haojun Liao 已提交
543

H
Haojun Liao 已提交
544
    FAIL_SHUFFLE_DISPATCH:
H
Haojun Liao 已提交
545 546 547
    for (int32_t i = 0; i < vgSz; i++) {
      taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
      taosArrayDestroy(pReqs[i].dataLen);
L
Liu Jicong 已提交
548
    }
549

H
Haojun Liao 已提交
550
    taosMemoryFree(pReqs);
L
Liu Jicong 已提交
551
  }
552

H
Haojun Liao 已提交
553
  return code;
L
Liu Jicong 已提交
554 555
}

556 557
static void doRetryDispatchData(void* param, void* tmrId) {
  SStreamTask* pTask = param;
558
  ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
559

560
  int32_t code = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
561
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
562 563 564
    qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
    atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
    streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
565 566 567 568 569
  }
}

void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) {
  qError("s-task:%s dispatch data in %"PRId64"ms", pTask->id.idStr, waitDuration);
570
  taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
571 572
}

573
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
574 575
  STaskOutputInfo* pInfo = &pTask->outputInfo;
  ASSERT((pInfo->type == TASK_OUTPUT__FIXED_DISPATCH || pInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH));
576

577
  int32_t numOfElems = taosQueueItemSize(pInfo->queue->queue);
H
Haojun Liao 已提交
578 579 580 581
  if (numOfElems > 0) {
    qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr,
           numOfElems);
  }
L
Liu Jicong 已提交
582

583
  // to make sure only one dispatch is running
584
  int8_t old = atomic_val_compare_exchange_8(&pInfo->status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
L
Liu Jicong 已提交
585
  if (old != TASK_OUTPUT_STATUS__NORMAL) {
586
    qDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old);
L
Liu Jicong 已提交
587
    return 0;
L
Liu Jicong 已提交
588 589
  }

590
  ASSERT(pTask->msgInfo.pData == NULL);
591
  qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pInfo->status);
592

593
  SStreamDataBlock* pBlock = streamQueueNextItem(pInfo->queue);
594
  if (pBlock == NULL) {
595 596
    atomic_store_8(&pInfo->status, TASK_OUTPUT_STATUS__NORMAL);
    qDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", pTask->id.idStr, pInfo->status);
L
Liu Jicong 已提交
597 598
    return 0;
  }
L
Liu Jicong 已提交
599

600
  pTask->msgInfo.pData = pBlock;
601 602
  ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK || pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
         pBlock->type == STREAM_INPUT__TRANS_STATE);
L
Liu Jicong 已提交
603

604 605 606
  int32_t retryCount = 0;

  while (1) {
607
    int32_t code = doDispatchAllBlocks(pTask, pBlock);
608 609 610 611 612
    if (code == TSDB_CODE_SUCCESS) {
      break;
    }

    qDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", pTask->id.idStr,
613
           tstrerror(terrno), pInfo->status, retryCount);
H
Haojun Liao 已提交
614 615 616

    // todo deal with only partially success dispatch case
    atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
617
    if (terrno == TSDB_CODE_APP_IS_STOPPING) {  // in case of this error, do not retry anymore
H
Haojun Liao 已提交
618 619 620 621
      destroyStreamDataBlock(pTask->msgInfo.pData);
      pTask->msgInfo.pData = NULL;
      return code;
    }
622

623 624 625
    if (++retryCount > MAX_CONTINUE_RETRY_COUNT) {  // add to timer to retry
      qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms",
             pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS);
H
Haojun Liao 已提交
626
      streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
627 628
      break;
    }
L
Liu Jicong 已提交
629
  }
630

631 632
  // this block can not be deleted until it has been sent to downstream task successfully.
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
633
}
634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693

int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamNode) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pRsp) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamNode) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
  tEndDecode(pDecoder);
  return 0;
}

int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) {
  int32_t  len = 0;
  int32_t  code = 0;
  SEncoder encoder;

  SStreamCompleteHistoryMsg msg = {
      .streamId = pReq->streamId,
      .upstreamTaskId = pReq->upstreamTaskId,
      .upstreamNodeId = pReq->upstreamNodeId,
      .downstreamId = pReq->downstreamTaskId,
      .downstreamNode = pTask->pMeta->vgId,
  };

  tEncodeSize(tEncodeCompleteHistoryDataMsg, &msg, len, code);
  if (code < 0) {
    return code;
  }

  void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len);
  if (pBuf == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  ((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamNodeId);

  void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));

  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeCompleteHistoryDataMsg(&encoder, &msg);
  tEncoderClear(&encoder);

  SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);

  SStreamContinueExecInfo info = {.taskId = pReq->upstreamTaskId, .epset = pInfo->epSet};
  initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len);
  info.msg.info = *pRpcInfo;

H
Haojun Liao 已提交
694
  taosThreadMutexLock(&pTask->lock);
695 696 697 698
  if (pTask->pRspMsgList == NULL) {
    pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));
  }
  taosArrayPush(pTask->pRspMsgList, &info);
H
Haojun Liao 已提交
699
  taosThreadMutexUnlock(&pTask->lock);
700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723

  int32_t num = taosArrayGetSize(pTask->pRspMsgList);
  qDebug("s-task:%s add scan history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId,
         num);
  return TSDB_CODE_SUCCESS;
}

int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
  ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK);

  int32_t num = taosArrayGetSize(pTask->pRspMsgList);
  for (int32_t i = 0; i < num; ++i) {
    SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i);
    tmsgSendRsp(&pInfo->msg);

    qDebug("s-task:%s level:%d notify upstream:0x%x to continue process data from WAL", pTask->id.idStr, pTask->info.taskLevel,
           pInfo->taskId);
  }

  taosArrayClear(pTask->pRspMsgList);
  qDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel,
         num);
  return 0;
}
H
Haojun Liao 已提交
724 725

int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
726 727
  const char* id = pTask->id.idStr;

H
Haojun Liao 已提交
728 729 730 731 732 733
  if (code != TSDB_CODE_SUCCESS) {
    // dispatch message failed: network error, or node not available.
    // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set
    // flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
    // happened too fast.
    // todo handle the shuffle dispatch failure
H
Haojun Liao 已提交
734
    if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
H
Haojun Liao 已提交
735
      qWarn("s-task:%s failed to dispatch msg to task:0x%x, no retry, since it is destroyed already", id, pRsp->downstreamTaskId);
H
Haojun Liao 已提交
736 737 738 739 740 741
    } else {
      qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", id, pRsp->downstreamTaskId,
             tstrerror(code), ++pTask->msgInfo.retryCount);
      int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
      if (ret != TSDB_CODE_SUCCESS) {
      }
H
Haojun Liao 已提交
742 743 744 745 746
    }

    return TSDB_CODE_SUCCESS;
  }

747 748
  qDebug("s-task:%s recv dispatch rsp from 0x%x, downstream task input status:%d code:%d", id, pRsp->downstreamTaskId,
         pRsp->inputStatus, code);
H
Haojun Liao 已提交
749 750 751 752

  // there are other dispatch message not response yet
  if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
753
    qDebug("s-task:%s is shuffle, left waiting rsp %d", id, leftRsp);
H
Haojun Liao 已提交
754 755 756 757 758
    if (leftRsp > 0) {
      return 0;
    }
  }

759 760 761 762 763 764
  // transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
  SStreamDataBlock* p = pTask->msgInfo.pData;
  if (p->type == STREAM_INPUT__TRANS_STATE) {
    qDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id);
    ASSERT(pTask->info.fillHistory == 1);
    code = streamTransferStateToStreamTask(pTask);
765
    if (code != TSDB_CODE_SUCCESS) {  // todo: do nothing if error happens
766
    }
H
Haojun Liao 已提交
767 768

    streamFreeQitem(pTask->msgInfo.pData);
H
Haojun Liao 已提交
769
    pTask->msgInfo.pData = NULL;
770
    return TSDB_CODE_SUCCESS;
771 772
  }

H
Haojun Liao 已提交
773 774 775
  pTask->msgInfo.retryCount = 0;
  ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);

776
  qDebug("s-task:%s output status is set to:%d", id, pTask->outputInfo.status);
H
Haojun Liao 已提交
777 778 779 780 781 782

  // the input queue of the (down stream) task that receive the output data is full,
  // so the TASK_INPUT_STATUS_BLOCKED is rsp
  if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
    pTask->inputStatus = TASK_INPUT_STATUS__BLOCKED;   // block the input of current task, to push pressure to upstream
    pTask->msgInfo.blockingTs = taosGetTimestampMs();  // record the blocking start time
H
Haojun Liao 已提交
783
    qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data",
784
           id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS);
H
Haojun Liao 已提交
785 786 787 788 789 790 791 792
    streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
  } else {  // pipeline send data in output queue
    // this message has been sent successfully, let's try next one.
    destroyStreamDataBlock(pTask->msgInfo.pData);
    pTask->msgInfo.pData = NULL;

    if (pTask->msgInfo.blockingTs != 0) {
      int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
793 794
      qDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", id,
             pRsp->downstreamTaskId, el);
H
Haojun Liao 已提交
795 796 797 798 799 800 801 802 803 804 805 806 807 808 809
      pTask->msgInfo.blockingTs = 0;

      // put data into inputQ of current task is also allowed
      pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
    }

    // now ready for next data output
    atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);

    // otherwise, continue dispatch the first block to down stream task in pipeline
    streamDispatchStreamBlock(pTask);
  }

  return 0;
}