streamDispatch.c 24.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInt.h"
17
#include "ttimer.h"
L
Liu Jicong 已提交
18

H
Haojun Liao 已提交
19 20 21
#define MAX_BLOCK_NAME_NUM         1024
#define DISPATCH_RETRY_INTERVAL_MS 300
#define MAX_CONTINUE_RETRY_COUNT   5
L
liuyao 已提交
22 23 24 25 26 27

typedef struct SBlockName {
  uint32_t hashValue;
  char     parTbName[TSDB_TABLE_NAME_LEN];
} SBlockName;

28 29 30 31 32 33
static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
    pMsg->msgType = msgType;
    pMsg->pCont = pCont;
    pMsg->contLen = contLen;
}

34
static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
L
Liu Jicong 已提交
35 36 37
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
L
Liu Jicong 已提交
38 39 40
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
L
Liu Jicong 已提交
41
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
L
Liu Jicong 已提交
42
  if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
43
  if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
L
Liu Jicong 已提交
44 45 46 47 48 49 50 51 52
  ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
  ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
  for (int32_t i = 0; i < pReq->blockNum; i++) {
    int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
    void*   data = taosArrayGetP(pReq->data, i);
    if (tEncodeI32(pEncoder, len) < 0) return -1;
    if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
  }
  tEndEncode(pEncoder);
53
  return pEncoder->pos;
L
Liu Jicong 已提交
54 55
}

56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
  int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
  void*   buf = taosMemoryCalloc(1, dataStrLen);
  if (buf == NULL) return -1;

  SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
  pRetrieve->useconds = 0;
  pRetrieve->precision = TSDB_DEFAULT_PRECISION;
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
  pRetrieve->streamBlockType = pBlock->info.type;
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
  pRetrieve->skey = htobe64(pBlock->info.window.skey);
  pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
  pRetrieve->version = htobe64(pBlock->info.version);
  pRetrieve->watermark = htobe64(pBlock->info.watermark);
  memcpy(pRetrieve->parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);

  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
  pRetrieve->numOfCols = htonl(numOfCols);

  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
  actualLen += sizeof(SRetrieveTableRsp);
  ASSERT(actualLen <= dataStrLen);
  taosArrayPush(pReq->dataLen, &actualLen);
  taosArrayPush(pReq->data, &buf);

  pReq->totalLen += dataStrLen;
  return 0;
}

L
Liu Jicong 已提交
87 88 89 90
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
L
Liu Jicong 已提交
91 92 93
  if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->dataSrcVgId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
L
Liu Jicong 已提交
94
  if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
L
Liu Jicong 已提交
95
  if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
96 97
  if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1;

L
Liu Jicong 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
  ASSERT(pReq->blockNum > 0);
  pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
  pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t));
  for (int32_t i = 0; i < pReq->blockNum; i++) {
    int32_t  len1;
    uint64_t len2;
    void*    data;
    if (tDecodeI32(pDecoder, &len1) < 0) return -1;
    if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1;
    ASSERT(len1 == len2);
    taosArrayPush(pReq->dataLen, &len1);
    taosArrayPush(pReq->data, &data);
  }
  tEndDecode(pDecoder);
  return 0;
}

115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
                               int64_t dstTaskId) {
  pReq->streamId = pTask->id.streamId;
  pReq->dataSrcVgId = vgId;
  pReq->upstreamTaskId = pTask->id.taskId;
  pReq->upstreamChildId = pTask->info.selfChildId;
  pReq->upstreamNodeId = pTask->info.nodeId;
  pReq->blockNum = numOfBlocks;
  pReq->taskId = dstTaskId;

  pReq->data = taosArrayInit(numOfBlocks, POINTER_BYTES);
  pReq->dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t));
  if (pReq->data == NULL || pReq->dataLen == NULL) {
    taosArrayDestroyP(pReq->data, taosMemoryFree);
    taosArrayDestroy(pReq->dataLen);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
136
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq) {
L
Liu Jicong 已提交
137 138 139 140
  taosArrayDestroyP(pReq->data, taosMemoryFree);
  taosArrayDestroy(pReq->dataLen);
}

L
Liu Jicong 已提交
141 142 143
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
144
  if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
145 146 147 148
  if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->srcTaskId) < 0) return -1;
L
Liu Jicong 已提交
149
  if (tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen) < 0) return -1;
L
Liu Jicong 已提交
150 151 152 153 154 155 156
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
L
Liu Jicong 已提交
157
  if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
L
Liu Jicong 已提交
158 159 160 161
  if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pReq->srcTaskId) < 0) return -1;
L
Liu Jicong 已提交
162 163
  uint64_t len = 0;
  if (tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len) < 0) return -1;
164
  pReq->retrieveLen = (int32_t)len;
L
Liu Jicong 已提交
165 166 167 168
  tEndDecode(pDecoder);
  return 0;
}

L
Liu Jicong 已提交
169 170
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }

L
Liu Jicong 已提交
171
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) {
L
Liu Jicong 已提交
172
  int32_t            code = -1;
L
Liu Jicong 已提交
173 174 175 176 177 178 179
  SRetrieveTableRsp* pRetrieve = NULL;
  void*              buf = NULL;
  int32_t            dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);

  pRetrieve = taosMemoryCalloc(1, dataStrLen);
  if (pRetrieve == NULL) return -1;

H
Haojun Liao 已提交
180
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
L
Liu Jicong 已提交
181 182 183 184 185
  pRetrieve->useconds = 0;
  pRetrieve->precision = TSDB_DEFAULT_PRECISION;
  pRetrieve->compressed = 0;
  pRetrieve->completed = 1;
  pRetrieve->streamBlockType = pBlock->info.type;
186
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
H
Haojun Liao 已提交
187
  pRetrieve->numOfCols = htonl(numOfCols);
188 189
  pRetrieve->skey = htobe64(pBlock->info.window.skey);
  pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
190
  pRetrieve->version = htobe64(pBlock->info.version);
L
Liu Jicong 已提交
191

H
Haojun Liao 已提交
192
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
L
Liu Jicong 已提交
193 194

  SStreamRetrieveReq req = {
195
      .streamId = pTask->id.streamId,
196
      .srcNodeId = pTask->info.nodeId,
197
      .srcTaskId = pTask->id.taskId,
L
Liu Jicong 已提交
198
      .pRetrieve = pRetrieve,
5
54liuyao 已提交
199
      .retrieveLen = dataStrLen,
L
Liu Jicong 已提交
200 201
  };

202
  int32_t sz = taosArrayGetSize(pTask->pUpstreamEpInfoList);
L
Liu Jicong 已提交
203 204
  ASSERT(sz > 0);
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
205
    req.reqId = tGenIdPI64();
206
    SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i);
L
Liu Jicong 已提交
207 208 209 210 211 212 213 214 215 216 217
    req.dstNodeId = pEpInfo->nodeId;
    req.dstTaskId = pEpInfo->taskId;
    int32_t len;
    tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code);
    if (code < 0) {
      ASSERT(0);
      return -1;
    }

    buf = rpcMallocCont(sizeof(SMsgHead) + len);
    if (buf == NULL) {
L
Liu Jicong 已提交
218
      goto CLEAR;
L
Liu Jicong 已提交
219 220 221 222 223 224 225
    }

    ((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId);
    void*    abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
    SEncoder encoder;
    tEncoderInit(&encoder, abuf, len);
    tEncodeStreamRetrieveReq(&encoder, &req);
L
Liu Jicong 已提交
226
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
227

dengyihao's avatar
dengyihao 已提交
228
    SRpcMsg rpcMsg = {.code = 0, .msgType = TDMT_STREAM_RETRIEVE, .pCont = buf, .contLen = sizeof(SMsgHead) + len};
L
Liu Jicong 已提交
229 230
    if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) {
      ASSERT(0);
L
Liu Jicong 已提交
231
      goto CLEAR;
L
Liu Jicong 已提交
232
    }
L
Liu Jicong 已提交
233

H
Haojun Liao 已提交
234
    buf = NULL;
235
    qDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
236
           pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
L
Liu Jicong 已提交
237
  }
L
Liu Jicong 已提交
238
  code = 0;
H
Haojun Liao 已提交
239

L
Liu Jicong 已提交
240 241 242 243
CLEAR:
  taosMemoryFree(pRetrieve);
  rpcFreeCont(buf);
  return code;
L
Liu Jicong 已提交
244 245
}

246
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) {
247 248 249 250 251
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
252
  tEncodeSize(tEncodeStreamTaskCheckReq, pReq, tlen, code);
253 254 255 256 257 258 259 260 261 262 263 264 265 266
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(nodeId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
267
  if ((code = tEncodeStreamTaskCheckReq(&encoder, pReq)) < 0) {
268 269
    rpcFreeCont(buf);
    return code;
270
  }
271

272 273 274 275 276 277
  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
  msg.msgType = TDMT_STREAM_TASK_CHECK;

H
Haojun Liao 已提交
278 279
  qDebug("s-task:%s (level:%d) dispatch check msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
         pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId);
280 281 282 283 284

  tmsgSendReq(pEpSet, &msg);
  return 0;
}

285
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
286
                                             SEpSet* pEpSet) {
287 288 289 290 291
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  int32_t tlen;
292
  tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
293 294 295 296 297 298
  if (code < 0) {
    return -1;
  }

  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
H
Haojun Liao 已提交
299
    terrno = TSDB_CODE_OUT_OF_MEMORY;
300 301 302 303 304 305 306 307
    return -1;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
308
  if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
H
Haojun Liao 已提交
309 310 311 312
    if (buf) {
      rpcFreeCont(buf);
    }
    return code;
313
  }
H
Haojun Liao 已提交
314

315 316 317 318
  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
319
  msg.msgType = TDMT_STREAM_SCAN_HISTORY_FINISH;
320 321

  tmsgSendReq(pEpSet, &msg);
322 323

  const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
324 325
  qDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus,
         pReq->downstreamTaskId, vgId);
326 327 328
  return 0;
}

329
static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
L
Liu Jicong 已提交
330 331 332 333 334 335 336
  void*   buf = NULL;
  int32_t code = -1;
  SRpcMsg msg = {0};

  // serialize
  int32_t tlen;
  tEncodeSize(tEncodeStreamDispatchReq, pReq, tlen, code);
H
Haojun Liao 已提交
337 338 339 340
  if (code < 0) {
    goto FAIL;
  }

L
Liu Jicong 已提交
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
  code = -1;
  buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
  if (buf == NULL) {
    goto FAIL;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, tlen);
  if ((code = tEncodeStreamDispatchReq(&encoder, pReq)) < 0) {
    goto FAIL;
  }
  tEncoderClear(&encoder);

  msg.contLen = tlen + sizeof(SMsgHead);
  msg.pCont = buf;
359
  msg.msgType = pTask->msgInfo.msgType;
L
Liu Jicong 已提交
360

361
  qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId);
H
Haojun Liao 已提交
362
  return tmsgSendReq(pEpSet, &msg);
363

364
FAIL:
H
Haojun Liao 已提交
365 366 367 368
  if (buf) {
    rpcFreeCont(buf);
  }

369
  return code;
L
Liu Jicong 已提交
370 371
}

L
Liu Jicong 已提交
372 373
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
                                int64_t groupId) {
L
liuyao 已提交
374 375 376 377
  uint32_t   hashValue = 0;
  SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
  if (pTask->pNameMap == NULL) {
    pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
L
Liu Jicong 已提交
378 379
  }

L
liuyao 已提交
380 381 382 383 384
  void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t));
  if (pVal) {
    SBlockName* pBln = (SBlockName*)pVal;
    hashValue = pBln->hashValue;
    if (!pDataBlock->info.parTbName[0]) {
L
liuyao 已提交
385
      memset(pDataBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
L
liuyao 已提交
386 387
      memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName));
    }
388
  } else {
L
liuyao 已提交
389 390 391 392
    char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
    if (ctbName == NULL) {
      return -1;
    }
L
Liu Jicong 已提交
393

L
liuyao 已提交
394 395 396 397 398 399
    if (pDataBlock->info.parTbName[0]) {
      snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
    } else {
      buildCtbNameByGroupIdImpl(pTask->shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
      snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
    }
L
Liu Jicong 已提交
400

L
liuyao 已提交
401 402 403 404 405 406 407 408 409 410 411 412
    /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
    SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo;
    hashValue =
        taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
    taosMemoryFree(ctbName);
    SBlockName bln = {0};
    bln.hashValue = hashValue;
    memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
    if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) {
      tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName));
    }
  }
413

L
Liu Jicong 已提交
414 415 416 417 418 419
  bool found = false;
  // TODO: optimize search
  int32_t j;
  for (j = 0; j < vgSz; j++) {
    SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
    ASSERT(pVgInfo->vgId > 0);
H
Haojun Liao 已提交
420

L
Liu Jicong 已提交
421
    if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
422
      if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
L
Liu Jicong 已提交
423 424
        return -1;
      }
H
Haojun Liao 已提交
425

L
Liu Jicong 已提交
426 427 428
      if (pReqs[j].blockNum == 0) {
        atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
      }
H
Haojun Liao 已提交
429

L
Liu Jicong 已提交
430 431 432 433 434 435 436 437 438
      pReqs[j].blockNum++;
      found = true;
      break;
    }
  }
  ASSERT(found);
  return 0;
}

L
Liu Jicong 已提交
439
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
440
  int32_t code = 0;
441

442 443
  int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
  ASSERT(numOfBlocks != 0);
L
Liu Jicong 已提交
444

445
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
446 447 448 449 450 451
    SStreamDispatchReq req = {0};

    int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
    code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
L
Liu Jicong 已提交
452 453
    }

454
    for (int32_t i = 0; i < numOfBlocks; i++) {
L
Liu Jicong 已提交
455
      SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
H
Haojun Liao 已提交
456 457 458
      code = streamAddBlockIntoDispatchMsg(pDataBlock, &req);

      if (code != TSDB_CODE_SUCCESS) {
459 460 461
        taosArrayDestroyP(req.data, taosMemoryFree);
        taosArrayDestroy(req.dataLen);
        return code;
L
Liu Jicong 已提交
462 463
      }
    }
464

L
Liu Jicong 已提交
465 466 467
    int32_t vgId = pTask->fixedEpDispatcher.nodeId;
    SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;

468 469
    qDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d)", pTask->id.idStr,
           pTask->info.selfChildId, numOfBlocks, downstreamTaskId, vgId);
L
Liu Jicong 已提交
470

H
Haojun Liao 已提交
471 472 473 474
    code = doSendDispatchMsg(pTask, &req, vgId, pEpSet);
    taosArrayDestroyP(req.data, taosMemoryFree);
    taosArrayDestroy(req.dataLen);
    return code;
475
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
L
Liu Jicong 已提交
476 477 478
    int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
    ASSERT(rspCnt == 0);

479 480 481
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
    int32_t vgSz = taosArrayGetSize(vgInfo);

L
Liu Jicong 已提交
482 483
    SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq));
    if (pReqs == NULL) {
H
Haojun Liao 已提交
484
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
485 486 487 488
      return -1;
    }

    for (int32_t i = 0; i < vgSz; i++) {
489 490 491
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId);
      if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
492 493 494
        goto FAIL_SHUFFLE_DISPATCH;
      }
    }
L
Liu Jicong 已提交
495

496
    for (int32_t i = 0; i < numOfBlocks; i++) {
L
Liu Jicong 已提交
497
      SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
L
Liu Jicong 已提交
498

L
Liu Jicong 已提交
499 500
      // TODO: do not use broadcast
      if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
501

L
Liu Jicong 已提交
502
        for (int32_t j = 0; j < vgSz; j++) {
503
          if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
L
Liu Jicong 已提交
504 505
            goto FAIL_SHUFFLE_DISPATCH;
          }
506

L
Liu Jicong 已提交
507 508 509
          if (pReqs[j].blockNum == 0) {
            atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
          }
L
Liu Jicong 已提交
510
          pReqs[j].blockNum++;
L
Liu Jicong 已提交
511
        }
512

L
Liu Jicong 已提交
513 514 515
        continue;
      }

H
Haojun Liao 已提交
516
      if (streamSearchAndAddBlock(pTask, pReqs, pDataBlock, vgSz, pDataBlock->info.id.groupId) < 0) {
L
Liu Jicong 已提交
517
        goto FAIL_SHUFFLE_DISPATCH;
L
Liu Jicong 已提交
518 519
      }
    }
L
Liu Jicong 已提交
520

521
    qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->info.selfChildId,
522
           numOfBlocks, vgSz);
H
Haojun Liao 已提交
523

L
Liu Jicong 已提交
524 525 526
    for (int32_t i = 0; i < vgSz; i++) {
      if (pReqs[i].blockNum > 0) {
        SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
527
        qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->info.selfChildId,
H
Haojun Liao 已提交
528 529
               pReqs[i].blockNum, pVgInfo->vgId);

H
Haojun Liao 已提交
530 531
        code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet);
        if (code < 0) {
L
Liu Jicong 已提交
532 533 534 535
          goto FAIL_SHUFFLE_DISPATCH;
        }
      }
    }
H
Haojun Liao 已提交
536

L
Liu Jicong 已提交
537
    code = 0;
H
Haojun Liao 已提交
538

L
Liu Jicong 已提交
539
  FAIL_SHUFFLE_DISPATCH:
H
Haojun Liao 已提交
540 541 542
    for (int32_t i = 0; i < vgSz; i++) {
      taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
      taosArrayDestroy(pReqs[i].dataLen);
L
Liu Jicong 已提交
543
    }
544

H
Haojun Liao 已提交
545
    taosMemoryFree(pReqs);
L
Liu Jicong 已提交
546
  }
547

H
Haojun Liao 已提交
548
  return code;
L
Liu Jicong 已提交
549 550
}

551 552
static void doRetryDispatchData(void* param, void* tmrId) {
  SStreamTask* pTask = param;
553
  ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
554 555 556

  int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
557 558 559
    qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
    atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
    streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
560 561 562 563 564
  }
}

void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) {
  qError("s-task:%s dispatch data in %"PRId64"ms", pTask->id.idStr, waitDuration);
565
  taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
566 567
}

568
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
569 570
  STaskOutputInfo* pInfo = &pTask->outputInfo;
  ASSERT((pInfo->type == TASK_OUTPUT__FIXED_DISPATCH || pInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH));
571

572
  int32_t numOfElems = taosQueueItemSize(pInfo->queue->queue);
H
Haojun Liao 已提交
573 574 575 576
  if (numOfElems > 0) {
    qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr,
           numOfElems);
  }
L
Liu Jicong 已提交
577

578
  // to make sure only one dispatch is running
579
  int8_t old = atomic_val_compare_exchange_8(&pInfo->status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
L
Liu Jicong 已提交
580
  if (old != TASK_OUTPUT_STATUS__NORMAL) {
581
    qDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old);
L
Liu Jicong 已提交
582
    return 0;
L
Liu Jicong 已提交
583 584
  }

585
  ASSERT(pTask->msgInfo.pData == NULL);
586
  qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pInfo->status);
587

588
  SStreamDataBlock* pBlock = streamQueueNextItem(pInfo->queue);
589
  if (pBlock == NULL) {
590 591
    atomic_store_8(&pInfo->status, TASK_OUTPUT_STATUS__NORMAL);
    qDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", pTask->id.idStr, pInfo->status);
L
Liu Jicong 已提交
592 593
    return 0;
  }
L
Liu Jicong 已提交
594

595
  pTask->msgInfo.pData = pBlock;
596
  ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
L
Liu Jicong 已提交
597

598 599 600 601 602 603 604 605 606
  int32_t retryCount = 0;

  while (1) {
    int32_t code = streamDispatchAllBlocks(pTask, pBlock);
    if (code == TSDB_CODE_SUCCESS) {
      break;
    }

    qDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", pTask->id.idStr,
607
           tstrerror(terrno), pInfo->status, retryCount);
H
Haojun Liao 已提交
608 609 610

    // todo deal with only partially success dispatch case
    atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
611
    if (terrno == TSDB_CODE_APP_IS_STOPPING) {  // in case of this error, do not retry anymore
H
Haojun Liao 已提交
612 613 614 615
      destroyStreamDataBlock(pTask->msgInfo.pData);
      pTask->msgInfo.pData = NULL;
      return code;
    }
616

617 618 619
    if (++retryCount > MAX_CONTINUE_RETRY_COUNT) {  // add to timer to retry
      qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms",
             pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS);
H
Haojun Liao 已提交
620
      streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
621 622
      break;
    }
L
Liu Jicong 已提交
623
  }
624

625 626
  // this block can not be deleted until it has been sent to downstream task successfully.
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
627
}
628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687

int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->downstreamNode) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pRsp) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->downstreamNode) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
  tEndDecode(pDecoder);
  return 0;
}

int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) {
  int32_t  len = 0;
  int32_t  code = 0;
  SEncoder encoder;

  SStreamCompleteHistoryMsg msg = {
      .streamId = pReq->streamId,
      .upstreamTaskId = pReq->upstreamTaskId,
      .upstreamNodeId = pReq->upstreamNodeId,
      .downstreamId = pReq->downstreamTaskId,
      .downstreamNode = pTask->pMeta->vgId,
  };

  tEncodeSize(tEncodeCompleteHistoryDataMsg, &msg, len, code);
  if (code < 0) {
    return code;
  }

  void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len);
  if (pBuf == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  ((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamNodeId);

  void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));

  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeCompleteHistoryDataMsg(&encoder, &msg);
  tEncoderClear(&encoder);

  SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);

  SStreamContinueExecInfo info = {.taskId = pReq->upstreamTaskId, .epset = pInfo->epSet};
  initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len);
  info.msg.info = *pRpcInfo;

H
Haojun Liao 已提交
688
  taosThreadMutexLock(&pTask->lock);
689 690 691 692
  if (pTask->pRspMsgList == NULL) {
    pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));
  }
  taosArrayPush(pTask->pRspMsgList, &info);
H
Haojun Liao 已提交
693
  taosThreadMutexUnlock(&pTask->lock);
694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717

  int32_t num = taosArrayGetSize(pTask->pRspMsgList);
  qDebug("s-task:%s add scan history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId,
         num);
  return TSDB_CODE_SUCCESS;
}

int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
  ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK);

  int32_t num = taosArrayGetSize(pTask->pRspMsgList);
  for (int32_t i = 0; i < num; ++i) {
    SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i);
    tmsgSendRsp(&pInfo->msg);

    qDebug("s-task:%s level:%d notify upstream:0x%x to continue process data from WAL", pTask->id.idStr, pTask->info.taskLevel,
           pInfo->taskId);
  }

  taosArrayClear(pTask->pRspMsgList);
  qDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel,
         num);
  return 0;
}