streamDispatch.c 28.7 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInt.h"
17
#include "ttimer.h"
L
Liu Jicong 已提交
18

H
Haojun Liao 已提交
19 20 21
#define MAX_BLOCK_NAME_NUM         1024
#define DISPATCH_RETRY_INTERVAL_MS 300
#define MAX_CONTINUE_RETRY_COUNT   5
L
liuyao 已提交
22 23 24 25 26 27

typedef struct SBlockName {
  uint32_t hashValue;
  char     parTbName[TSDB_TABLE_NAME_LEN];
} SBlockName;

H
Haojun Liao 已提交
28 29 30
static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
                                      int32_t numOfBlocks, int64_t dstTaskId, int32_t type);

31 32 33 34 35 36
static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
    pMsg->msgType = msgType;
    pMsg->pCont = pCont;
    pMsg->contLen = contLen;
}

37
static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
L
Liu Jicong 已提交
38 39 40
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
41
  if (tEncodeI32(pEncoder, pReq->type) < 0) return -1;
L
Liu Jicong 已提交
42
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
43
  if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1;
L
Liu Jicong 已提交
44
  if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
L
Liu Jicong 已提交
45
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
L
Liu Jicong 已提交
46
  if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
47
  if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
L
Liu Jicong 已提交
48 49 50 51 52 53 54 55 56
  ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
  ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
  for (int32_t i = 0; i < pReq->blockNum; i++) {
    int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
    void*   data = taosArrayGetP(pReq->data, i);
    if (tEncodeI32(pEncoder, len) < 0) return -1;
    if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
  }
  tEndEncode(pEncoder);
57
  return pEncoder->pos;
L
Liu Jicong 已提交
58 59
}

60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
  int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
  void*   buf = taosMemoryCalloc(1, dataStrLen);
  if (buf == NULL) return -1;

  SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
  pRetrieve->useconds = 0;
  pRetrieve->precision = TSDB_DEFAULT_PRECISION;
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
  pRetrieve->streamBlockType = pBlock->info.type;
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
  pRetrieve->skey = htobe64(pBlock->info.window.skey);
  pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
  pRetrieve->version = htobe64(pBlock->info.version);
  pRetrieve->watermark = htobe64(pBlock->info.watermark);
  memcpy(pRetrieve->parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);

  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
  pRetrieve->numOfCols = htonl(numOfCols);

  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
  actualLen += sizeof(SRetrieveTableRsp);
  ASSERT(actualLen <= dataStrLen);
  taosArrayPush(pReq->dataLen, &actualLen);
  taosArrayPush(pReq->data, &buf);

  pReq->totalLen += dataStrLen;
  return 0;
}

L
Liu Jicong 已提交
91 92 93 94
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
95
  if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1;
L
Liu Jicong 已提交
96
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
97
  if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1;
L
Liu Jicong 已提交
98
  if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
L
Liu Jicong 已提交
99
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
L
Liu Jicong 已提交
100
  if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
101 102
  if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1;

L
Liu Jicong 已提交
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
  ASSERT(pReq->blockNum > 0);
  pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
  pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t));
  for (int32_t i = 0; i < pReq->blockNum; i++) {
    int32_t  len1;
    uint64_t len2;
    void*    data;
    if (tDecodeI32(pDecoder, &len1) < 0) return -1;
    if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1;
    ASSERT(len1 == len2);
    taosArrayPush(pReq->dataLen, &len1);
    taosArrayPush(pReq->data, &data);
  }
  tEndDecode(pDecoder);
  return 0;
}

120 121
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
                               int64_t dstTaskId, int32_t type) {
122
  pReq->streamId = pTask->id.streamId;
123
  pReq->srcVgId = vgId;
124 125 126 127 128
  pReq->upstreamTaskId = pTask->id.taskId;
  pReq->upstreamChildId = pTask->info.selfChildId;
  pReq->upstreamNodeId = pTask->info.nodeId;
  pReq->blockNum = numOfBlocks;
  pReq->taskId = dstTaskId;
H
Haojun Liao 已提交
129
  pReq->type = type;
130 131 132 133 134 135 136 137 138 139 140 141

  pReq->data = taosArrayInit(numOfBlocks, POINTER_BYTES);
  pReq->dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t));
  if (pReq->data == NULL || pReq->dataLen == NULL) {
    taosArrayDestroyP(pReq->data, taosMemoryFree);
    taosArrayDestroy(pReq->dataLen);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
142
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq) {
L
Liu Jicong 已提交
143 144 145 146
  taosArrayDestroyP(pReq->data, taosMemoryFree);
  taosArrayDestroy(pReq->dataLen);
}

L
Liu Jicong 已提交
147 148 149
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
150
  if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
151 152 153 154
  if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->srcTaskId) < 0) return -1;
L
Liu Jicong 已提交
155
  if (tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen) < 0) return -1;
L
Liu Jicong 已提交
156 157 158 159 160 161 162
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
163
  if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
164 165 166 167
  if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->srcTaskId) < 0) return -1;
L
Liu Jicong 已提交
168 169
  uint64_t len = 0;
  if (tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len) < 0) return -1;
170
  pReq->retrieveLen = (int32_t)len;
L
Liu Jicong 已提交
171 172 173 174
  tEndDecode(pDecoder);
  return 0;
}

L
Liu Jicong 已提交
175 176
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }

L
Liu Jicong 已提交
177
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) {
L
Liu Jicong 已提交
178
  int32_t            code = -1;
L
Liu Jicong 已提交
179 180 181 182 183 184 185
  SRetrieveTableRsp* pRetrieve = NULL;
  void*              buf = NULL;
  int32_t            dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);

  pRetrieve = taosMemoryCalloc(1, dataStrLen);
  if (pRetrieve == NULL) return -1;

H
Haojun Liao 已提交
186
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
L
Liu Jicong 已提交
187 188 189 190 191
  pRetrieve->useconds = 0;
  pRetrieve->precision = TSDB_DEFAULT_PRECISION;
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
  pRetrieve->streamBlockType = pBlock->info.type;
192
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
H
Haojun Liao 已提交
193
  pRetrieve->numOfCols = htonl(numOfCols);
194 195
  pRetrieve->skey = htobe64(pBlock->info.window.skey);
  pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
196
  pRetrieve->version = htobe64(pBlock->info.version);
L
Liu Jicong 已提交
197

H
Haojun Liao 已提交
198
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
L
Liu Jicong 已提交
199 200

  SStreamRetrieveReq req = {
201
      .streamId = pTask->id.streamId,
202
      .srcNodeId = pTask->info.nodeId,
203
      .srcTaskId = pTask->id.taskId,
L
Liu Jicong 已提交
204
      .pRetrieve = pRetrieve,
5
54liuyao 已提交
205
      .retrieveLen = dataStrLen,
L
Liu Jicong 已提交
206 207
  };

208
  int32_t sz = taosArrayGetSize(pTask->pUpstreamEpInfoList);
L
Liu Jicong 已提交
209 210
  ASSERT(sz > 0);
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
211
    req.reqId = tGenIdPI64();
212
    SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i);
L
Liu Jicong 已提交
213 214 215 216 217 218 219 220 221 222 223
    req.dstNodeId = pEpInfo->nodeId;
    req.dstTaskId = pEpInfo->taskId;
    int32_t len;
    tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code);
    if (code < 0) {
      ASSERT(0);
      return -1;
    }

    buf = rpcMallocCont(sizeof(SMsgHead) + len);
    if (buf == NULL) {
L
Liu Jicong 已提交
224
      goto CLEAR;
L
Liu Jicong 已提交
225 226 227 228 229 230 231
    }

    ((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId);
    void*    abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
    SEncoder encoder;
    tEncoderInit(&encoder, abuf, len);
    tEncodeStreamRetrieveReq(&encoder, &req);
L
Liu Jicong 已提交
232
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
233

dengyihao's avatar
dengyihao 已提交
234
    SRpcMsg rpcMsg = {.code = 0, .msgType = TDMT_STREAM_RETRIEVE, .pCont = buf, .contLen = sizeof(SMsgHead) + len};
L
Liu Jicong 已提交
235 236
    if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) {
      ASSERT(0);
L
Liu Jicong 已提交
237
      goto CLEAR;
L
Liu Jicong 已提交
238
    }
L
Liu Jicong 已提交
239

H
Haojun Liao 已提交
240
    buf = NULL;
241
    qDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
242
           pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
L
Liu Jicong 已提交
243
  }
L
Liu Jicong 已提交
244
  code = 0;
H
Haojun Liao 已提交
245

L
Liu Jicong 已提交
246 247 248 249
CLEAR:
  taosMemoryFree(pRetrieve);
  rpcFreeCont(buf);
  return code;
L
Liu Jicong 已提交
250 251
}

252
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) {
253 254 255 256 257
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
258
  tEncodeSize(tEncodeStreamTaskCheckReq, pReq, tlen, code);
259 260 261 262 263 264 265 266 267 268 269 270 271 272
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(nodeId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
273
  if ((code = tEncodeStreamTaskCheckReq(&encoder, pReq)) < 0) {
274 275
    rpcFreeCont(buf);
    return code;
276
  }
277

278 279 280 281 282 283
  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
  msg.msgType = TDMT_STREAM_TASK_CHECK;

H
Haojun Liao 已提交
284 285
  qDebug("s-task:%s (level:%d) dispatch check msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
         pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId);
286 287 288 289 290

  tmsgSendReq(pEpSet, &msg);
  return 0;
}

291
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
292
                                             SEpSet* pEpSet) {
293 294 295 296 297
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
298
  tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
299 300 301 302 303 304
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
H
Haojun Liao 已提交
305
    terrno = TSDB_CODE_OUT_OF_MEMORY;
306 307 308 309 310 311 312 313
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
314
  if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
H
Haojun Liao 已提交
315 316 317 318
    if (buf) {
      rpcFreeCont(buf);
    }
    return code;
319
  }
H
Haojun Liao 已提交
320

321 322 323 324
  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
325
  msg.msgType = TDMT_STREAM_SCAN_HISTORY_FINISH;
326 327

  tmsgSendReq(pEpSet, &msg);
328 329

  const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
330 331
  qDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus,
         pReq->downstreamTaskId, vgId);
332 333 334
  return 0;
}

335
static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
L
Liu Jicong 已提交
336 337 338 339 340 341 342
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  // serialize
  int32_t tlen;
  tEncodeSize(tEncodeStreamDispatchReq, pReq, tlen, code);
H
Haojun Liao 已提交
343 344 345 346
  if (code < 0) {
    goto FAIL;
  }

L
Liu Jicong 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
  code = -1;
  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    goto FAIL;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
  if ((code = tEncodeStreamDispatchReq(&encoder, pReq)) < 0) {
    goto FAIL;
  }
  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
365
  msg.msgType = pTask->msgInfo.msgType;
L
Liu Jicong 已提交
366

367
  qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId);
H
Haojun Liao 已提交
368
  return tmsgSendReq(pEpSet, &msg);
369

370
FAIL:
H
Haojun Liao 已提交
371 372 373 374
  if (buf) {
    rpcFreeCont(buf);
  }

375
  return code;
L
Liu Jicong 已提交
376 377
}

L
Liu Jicong 已提交
378 379
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
                                int64_t groupId) {
L
liuyao 已提交
380 381 382 383
  uint32_t   hashValue = 0;
  SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
  if (pTask->pNameMap == NULL) {
    pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
L
Liu Jicong 已提交
384 385
  }

L
liuyao 已提交
386 387 388 389 390
  void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t));
  if (pVal) {
    SBlockName* pBln = (SBlockName*)pVal;
    hashValue = pBln->hashValue;
    if (!pDataBlock->info.parTbName[0]) {
L
liuyao 已提交
391
      memset(pDataBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
L
liuyao 已提交
392 393
      memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName));
    }
394
  } else {
L
liuyao 已提交
395 396 397 398
    char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
    if (ctbName == NULL) {
      return -1;
    }
L
Liu Jicong 已提交
399

L
liuyao 已提交
400 401 402 403 404 405
    if (pDataBlock->info.parTbName[0]) {
      snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
    } else {
      buildCtbNameByGroupIdImpl(pTask->shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
      snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
    }
L
Liu Jicong 已提交
406

L
liuyao 已提交
407 408 409 410 411 412 413 414 415 416 417 418
    /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
    SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo;
    hashValue =
        taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
    taosMemoryFree(ctbName);
    SBlockName bln = {0};
    bln.hashValue = hashValue;
    memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
    if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) {
      tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName));
    }
  }
419

L
Liu Jicong 已提交
420 421 422 423 424 425
  bool found = false;
  // TODO: optimize search
  int32_t j;
  for (j = 0; j < vgSz; j++) {
    SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
    ASSERT(pVgInfo->vgId > 0);
H
Haojun Liao 已提交
426

L
Liu Jicong 已提交
427
    if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
428
      if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
L
Liu Jicong 已提交
429 430
        return -1;
      }
H
Haojun Liao 已提交
431

L
Liu Jicong 已提交
432 433 434
      if (pReqs[j].blockNum == 0) {
        atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
      }
H
Haojun Liao 已提交
435

L
Liu Jicong 已提交
436 437 438 439 440 441 442 443 444
      pReqs[j].blockNum++;
      found = true;
      break;
    }
  }
  ASSERT(found);
  return 0;
}

H
Haojun Liao 已提交
445
static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
446 447 448
  int32_t code = 0;
  int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
  ASSERT(numOfBlocks != 0);
L
Liu Jicong 已提交
449

450
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
451 452 453
    SStreamDispatchReq req = {0};

    int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
H
Haojun Liao 已提交
454
    code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
455 456
    if (code != TSDB_CODE_SUCCESS) {
      return code;
L
Liu Jicong 已提交
457 458
    }

459
    for (int32_t i = 0; i < numOfBlocks; i++) {
L
Liu Jicong 已提交
460
      SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
H
Haojun Liao 已提交
461

462
      code = streamAddBlockIntoDispatchMsg(pDataBlock, &req);
H
Haojun Liao 已提交
463
      if (code != TSDB_CODE_SUCCESS) {
464 465 466
        taosArrayDestroyP(req.data, taosMemoryFree);
        taosArrayDestroy(req.dataLen);
        return code;
L
Liu Jicong 已提交
467 468
      }
    }
469

L
Liu Jicong 已提交
470 471 472
    int32_t vgId = pTask->fixedEpDispatcher.nodeId;
    SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;

473 474
    qDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d)", pTask->id.idStr,
           pTask->info.selfChildId, numOfBlocks, downstreamTaskId, vgId);
L
Liu Jicong 已提交
475

H
Haojun Liao 已提交
476 477 478 479
    code = doSendDispatchMsg(pTask, &req, vgId, pEpSet);
    taosArrayDestroyP(req.data, taosMemoryFree);
    taosArrayDestroy(req.dataLen);
    return code;
480
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
481 482 483
    int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
    ASSERT(rspCnt == 0);

484 485 486
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
    int32_t vgSz = taosArrayGetSize(vgInfo);

L
Liu Jicong 已提交
487 488
    SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq));
    if (pReqs == NULL) {
H
Haojun Liao 已提交
489
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
490 491 492 493
      return -1;
    }

    for (int32_t i = 0; i < vgSz; i++) {
494
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
H
Haojun Liao 已提交
495
      code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type);
496
      if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
497 498 499
        goto FAIL_SHUFFLE_DISPATCH;
      }
    }
L
Liu Jicong 已提交
500

501
    for (int32_t i = 0; i < numOfBlocks; i++) {
L
Liu Jicong 已提交
502
      SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
L
Liu Jicong 已提交
503

L
Liu Jicong 已提交
504
      // TODO: do not use broadcast
H
Haojun Liao 已提交
505
      if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT || pDataBlock->info.type == STREAM_TRANS_STATE) {
L
Liu Jicong 已提交
506
        for (int32_t j = 0; j < vgSz; j++) {
507
          if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
L
Liu Jicong 已提交
508 509
            goto FAIL_SHUFFLE_DISPATCH;
          }
510

L
Liu Jicong 已提交
511 512 513
          if (pReqs[j].blockNum == 0) {
            atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
          }
L
Liu Jicong 已提交
514
          pReqs[j].blockNum++;
L
Liu Jicong 已提交
515
        }
516

L
Liu Jicong 已提交
517 518 519
        continue;
      }

H
Haojun Liao 已提交
520
      if (streamSearchAndAddBlock(pTask, pReqs, pDataBlock, vgSz, pDataBlock->info.id.groupId) < 0) {
L
Liu Jicong 已提交
521
        goto FAIL_SHUFFLE_DISPATCH;
L
Liu Jicong 已提交
522 523
      }
    }
L
Liu Jicong 已提交
524

H
Haojun Liao 已提交
525 526
    qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr,
           pTask->info.selfChildId, numOfBlocks, vgSz);
H
Haojun Liao 已提交
527

L
Liu Jicong 已提交
528 529 530
    for (int32_t i = 0; i < vgSz; i++) {
      if (pReqs[i].blockNum > 0) {
        SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
H
Haojun Liao 已提交
531 532
        qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
               pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId);
H
Haojun Liao 已提交
533

H
Haojun Liao 已提交
534 535
        code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet);
        if (code < 0) {
L
Liu Jicong 已提交
536 537 538 539
          goto FAIL_SHUFFLE_DISPATCH;
        }
      }
    }
H
Haojun Liao 已提交
540

L
Liu Jicong 已提交
541
    code = 0;
H
Haojun Liao 已提交
542

H
Haojun Liao 已提交
543
    FAIL_SHUFFLE_DISPATCH:
H
Haojun Liao 已提交
544 545 546
    for (int32_t i = 0; i < vgSz; i++) {
      taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
      taosArrayDestroy(pReqs[i].dataLen);
L
Liu Jicong 已提交
547
    }
548

H
Haojun Liao 已提交
549
    taosMemoryFree(pReqs);
L
Liu Jicong 已提交
550
  }
551

H
Haojun Liao 已提交
552
  return code;
L
Liu Jicong 已提交
553 554
}

555 556
static void doRetryDispatchData(void* param, void* tmrId) {
  SStreamTask* pTask = param;
557
  ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
558

559
  int32_t code = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
560
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
561 562 563
    qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
    atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
    streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
564 565 566 567 568
  }
}

void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) {
  qError("s-task:%s dispatch data in %"PRId64"ms", pTask->id.idStr, waitDuration);
569
  taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
570 571
}

572
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
573 574
  STaskOutputInfo* pInfo = &pTask->outputInfo;
  ASSERT((pInfo->type == TASK_OUTPUT__FIXED_DISPATCH || pInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH));
575

576
  int32_t numOfElems = taosQueueItemSize(pInfo->queue->queue);
H
Haojun Liao 已提交
577 578 579 580
  if (numOfElems > 0) {
    qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr,
           numOfElems);
  }
L
Liu Jicong 已提交
581

582
  // to make sure only one dispatch is running
583
  int8_t old = atomic_val_compare_exchange_8(&pInfo->status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
L
Liu Jicong 已提交
584
  if (old != TASK_OUTPUT_STATUS__NORMAL) {
585
    qDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old);
L
Liu Jicong 已提交
586
    return 0;
L
Liu Jicong 已提交
587 588
  }

589
  ASSERT(pTask->msgInfo.pData == NULL);
590
  qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pInfo->status);
591

592
  SStreamDataBlock* pBlock = streamQueueNextItem(pInfo->queue);
593
  if (pBlock == NULL) {
594 595
    atomic_store_8(&pInfo->status, TASK_OUTPUT_STATUS__NORMAL);
    qDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", pTask->id.idStr, pInfo->status);
L
Liu Jicong 已提交
596 597
    return 0;
  }
L
Liu Jicong 已提交
598

599
  pTask->msgInfo.pData = pBlock;
600 601
  ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK || pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
         pBlock->type == STREAM_INPUT__TRANS_STATE);
L
Liu Jicong 已提交
602

603 604 605
  int32_t retryCount = 0;

  while (1) {
606
    int32_t code = doDispatchAllBlocks(pTask, pBlock);
607 608 609 610 611
    if (code == TSDB_CODE_SUCCESS) {
      break;
    }

    qDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", pTask->id.idStr,
612
           tstrerror(terrno), pInfo->status, retryCount);
H
Haojun Liao 已提交
613 614 615

    // todo deal with only partially success dispatch case
    atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
616
    if (terrno == TSDB_CODE_APP_IS_STOPPING) {  // in case of this error, do not retry anymore
H
Haojun Liao 已提交
617 618 619 620
      destroyStreamDataBlock(pTask->msgInfo.pData);
      pTask->msgInfo.pData = NULL;
      return code;
    }
621

622 623 624
    if (++retryCount > MAX_CONTINUE_RETRY_COUNT) {  // add to timer to retry
      qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms",
             pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS);
H
Haojun Liao 已提交
625
      streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
626 627
      break;
    }
L
Liu Jicong 已提交
628
  }
629

630 631
  // this block can not be deleted until it has been sent to downstream task successfully.
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
632
}
633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692

int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamNode) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pRsp) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamNode) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
  tEndDecode(pDecoder);
  return 0;
}

int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) {
  int32_t  len = 0;
  int32_t  code = 0;
  SEncoder encoder;

  SStreamCompleteHistoryMsg msg = {
      .streamId = pReq->streamId,
      .upstreamTaskId = pReq->upstreamTaskId,
      .upstreamNodeId = pReq->upstreamNodeId,
      .downstreamId = pReq->downstreamTaskId,
      .downstreamNode = pTask->pMeta->vgId,
  };

  tEncodeSize(tEncodeCompleteHistoryDataMsg, &msg, len, code);
  if (code < 0) {
    return code;
  }

  void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len);
  if (pBuf == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  ((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamNodeId);

  void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));

  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeCompleteHistoryDataMsg(&encoder, &msg);
  tEncoderClear(&encoder);

  SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);

  SStreamContinueExecInfo info = {.taskId = pReq->upstreamTaskId, .epset = pInfo->epSet};
  initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len);
  info.msg.info = *pRpcInfo;

H
Haojun Liao 已提交
693
  taosThreadMutexLock(&pTask->lock);
694 695 696 697
  if (pTask->pRspMsgList == NULL) {
    pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));
  }
  taosArrayPush(pTask->pRspMsgList, &info);
H
Haojun Liao 已提交
698
  taosThreadMutexUnlock(&pTask->lock);
699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722

  int32_t num = taosArrayGetSize(pTask->pRspMsgList);
  qDebug("s-task:%s add scan history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId,
         num);
  return TSDB_CODE_SUCCESS;
}

int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
  ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK);

  int32_t num = taosArrayGetSize(pTask->pRspMsgList);
  for (int32_t i = 0; i < num; ++i) {
    SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i);
    tmsgSendRsp(&pInfo->msg);

    qDebug("s-task:%s level:%d notify upstream:0x%x to continue process data from WAL", pTask->id.idStr, pTask->info.taskLevel,
           pInfo->taskId);
  }

  taosArrayClear(pTask->pRspMsgList);
  qDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel,
         num);
  return 0;
}
H
Haojun Liao 已提交
723 724

int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
725 726
  const char* id = pTask->id.idStr;

H
Haojun Liao 已提交
727 728 729 730 731 732
  if (code != TSDB_CODE_SUCCESS) {
    // dispatch message failed: network error, or node not available.
    // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set
    // flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
    // happened too fast.
    // todo handle the shuffle dispatch failure
733 734
    qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", id, pRsp->downstreamTaskId,
           tstrerror(code), ++pTask->msgInfo.retryCount);
H
Haojun Liao 已提交
735 736 737 738 739 740 741
    int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
    if (ret != TSDB_CODE_SUCCESS) {
    }

    return TSDB_CODE_SUCCESS;
  }

742 743
  qDebug("s-task:%s recv dispatch rsp from 0x%x, downstream task input status:%d code:%d", id, pRsp->downstreamTaskId,
         pRsp->inputStatus, code);
H
Haojun Liao 已提交
744 745 746 747

  // there are other dispatch message not response yet
  if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
    int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
748
    qDebug("s-task:%s is shuffle, left waiting rsp %d", id, leftRsp);
H
Haojun Liao 已提交
749 750 751 752 753
    if (leftRsp > 0) {
      return 0;
    }
  }

754 755 756 757 758 759 760 761 762 763 764 765 766
  // transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
  SStreamDataBlock* p = pTask->msgInfo.pData;
  if (p->type == STREAM_INPUT__TRANS_STATE) {
    qDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id);
    ASSERT(pTask->info.fillHistory == 1);
    code = streamTransferStateToStreamTask(pTask);

    if (code != TSDB_CODE_SUCCESS) {
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
      return code;
    }
  }

H
Haojun Liao 已提交
767 768 769
  pTask->msgInfo.retryCount = 0;
  ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);

770
  qDebug("s-task:%s output status is set to:%d", id, pTask->outputInfo.status);
H
Haojun Liao 已提交
771 772 773 774 775 776 777

  // the input queue of the (down stream) task that receive the output data is full,
  // so the TASK_INPUT_STATUS_BLOCKED is rsp
  if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
    pTask->inputStatus = TASK_INPUT_STATUS__BLOCKED;   // block the input of current task, to push pressure to upstream
    pTask->msgInfo.blockingTs = taosGetTimestampMs();  // record the blocking start time
    qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data",
778
           id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS);
H
Haojun Liao 已提交
779 780 781 782 783 784 785 786
    streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
  } else {  // pipeline send data in output queue
    // this message has been sent successfully, let's try next one.
    destroyStreamDataBlock(pTask->msgInfo.pData);
    pTask->msgInfo.pData = NULL;

    if (pTask->msgInfo.blockingTs != 0) {
      int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
787 788
      qDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", id,
             pRsp->downstreamTaskId, el);
H
Haojun Liao 已提交
789 790 791 792 793 794 795 796 797 798 799 800 801 802 803
      pTask->msgInfo.blockingTs = 0;

      // put data into inputQ of current task is also allowed
      pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
    }

    // now ready for next data output
    atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);

    // otherwise, continue dispatch the first block to down stream task in pipeline
    streamDispatchStreamBlock(pTask);
  }

  return 0;
}