tq.c 57.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
static int32_t tqInitialize(STQ* pTq);
dengyihao's avatar
dengyihao 已提交
22

wmmhello's avatar
wmmhello 已提交
23
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
dengyihao's avatar
dengyihao 已提交
24 25
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
wmmhello's avatar
wmmhello 已提交
26

L
Liu Jicong 已提交
27
int32_t tqInit() {
L
Liu Jicong 已提交
28 29 30 31 32 33
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

34 35 36 37 38 39
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
40 41 42
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
43
    atomic_store_8(&tqMgmt.inited, 1);
44
  }
45

L
Liu Jicong 已提交
46 47
  return 0;
}
L
Liu Jicong 已提交
48

49
void tqCleanUp() {
L
Liu Jicong 已提交
50 51 52 53 54 55 56 57
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
58
    streamCleanUp();
L
Liu Jicong 已提交
59 60
    atomic_store_8(&tqMgmt.inited, 0);
  }
61
}
L
Liu Jicong 已提交
62

63
void tqDestroyTqHandle(void* data) {
64 65
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
wmmhello's avatar
wmmhello 已提交
66

67
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
68
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
69
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
70
    tqReaderClose(pData->execHandle.pTqReader);
71 72
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
73
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
74
    walCloseReader(pData->pWalReader);
75
    tqReaderClose(pData->execHandle.pTqReader);
76 77
    taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
    nodesDestroyNode(pData->execHandle.execTb.node);
78
  }
dengyihao's avatar
dengyihao 已提交
79
  if (pData->msg != NULL) {
80 81 82
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
D
dapan1121 已提交
83
  }
L
Liu Jicong 已提交
84 85
}

86 87 88 89 90
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
91
STQ* tqOpen(const char* path, SVnode* pVnode) {
92
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
93
  if (pTq == NULL) {
S
Shengliang Guan 已提交
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
95 96
    return NULL;
  }
97

98
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
99
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
100
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
101

102
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
103
  taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle);
104

105
  taosInitRWLatch(&pTq->lock);
106
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
107

108
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
109
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
110

111 112 113 114 115 116 117
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
118 119 120
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
121
  if (tqMetaOpen(pTq) < 0) {
122
    return -1;
123 124
  }

L
Liu Jicong 已提交
125 126
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
127
    return -1;
128 129
  }

130
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
131
  if (pTq->pStreamMeta == NULL) {
132
    return -1;
L
Liu Jicong 已提交
133 134
  }

135 136
  // the version is kept in task's meta data
  // todo check if this version is required or not
137 138
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
139 140
  }

141
  return 0;
L
Liu Jicong 已提交
142
}
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
void tqClose(STQ* pTq) {
145 146
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
147
  }
148 149 150 151 152 153 154 155 156

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
157
}
L
Liu Jicong 已提交
158

159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
  bool inTimer = false;

  taosWLockLatch(&pMeta->lock);

  void* pIter = NULL;
  while(1) {
    pIter = taosHashIterate(pMeta->pTasks, pIter);
    if (pIter == NULL) {
      break;
    }

    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->status.timerActive == 1) {
      inTimer = true;
    }
  }

  taosWUnLockLatch(&pMeta->lock);

  return inTimer;
}

H
Haojun Liao 已提交
182 183 184 185 186 187 188 189 190 191 192 193
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
H
Haojun Liao 已提交
194
      tqDebug("vgId:%d s-task:%s set closing flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
195 196 197
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
198
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
H
Haojun Liao 已提交
199

200
      int64_t el = taosGetTimestampMs() - st;
H
Haojun Liao 已提交
201
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
202 203 204
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
H
Haojun Liao 已提交
205 206 207 208 209

    tqDebug("vgId:%d start to check all tasks", pTq->pStreamMeta->vgId);

    int64_t st = taosGetTimestampMs();

210 211 212
    while(hasStreamTaskInTimer(pTq->pStreamMeta)) {
      tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pTq->pStreamMeta->vgId);
      taosMsleep(100);
H
Haojun Liao 已提交
213 214 215 216
    }

    int64_t el = taosGetTimestampMs() - st;
    tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%"PRId64" ms", pTq->pStreamMeta->vgId, el);
H
Haojun Liao 已提交
217 218 219
  }
}

220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
//static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
//                             int64_t consumerId, int32_t type) {
//  int32_t len = 0;
//  int32_t code = 0;
//
//  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) {
//    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
//  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
//    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
//  }
//
//  if (code < 0) {
//    return -1;
//  }
//
//  int32_t tlen = sizeof(SMqRspHead) + len;
//  void*   buf = rpcMallocCont(tlen);
//  if (buf == NULL) {
//    return -1;
//  }
//
//  ((SMqRspHead*)buf)->mqMsgType = type;
//  ((SMqRspHead*)buf)->epoch = epoch;
//  ((SMqRspHead*)buf)->consumerId = consumerId;
//
//  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
//
//  SEncoder encoder = {0};
//  tEncoderInit(&encoder, abuf, len);
//
//  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) {
//    tEncodeMqDataRsp(&encoder, pRsp);
//  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
//    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
//  }
//
//  tEncoderClear(&encoder);
//
//  SRpcMsg rsp = {
//      .info = *pRpcHandleInfo,
//      .pCont = buf,
//      .contLen = tlen,
//      .code = 0,
//  };
//
//  tmsgSendRsp(&rsp);
//  return 0;
//}
L
Liu Jicong 已提交
268

269 270 271 272 273 274 275
int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pHandle->msg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
D
dapan1121 已提交
276

277 278 279 280
  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);
  dataRsp.blockNum = 0;
  dataRsp.rspOffset = dataRsp.reqOffset;
H
Haojun Liao 已提交
281
  tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
282
  tDeleteMqDataRsp(&dataRsp);
L
Liu Jicong 已提交
283 284 285
  return 0;
}

286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
//int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
//  SMqDataRsp dataRsp = {0};
//  dataRsp.head.consumerId = pHandle->consumerId;
//  dataRsp.head.epoch = pHandle->epoch;
//  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
//
//  int64_t sver = 0, ever = 0;
//  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
//  tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
//                  ever);
//
//  char buf1[TSDB_OFFSET_LEN] = {0};
//  char buf2[TSDB_OFFSET_LEN] = {0};
//  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
//  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
//  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
//          dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
//  return 0;
//}

306 307 308 309 310 311
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
                      int32_t type, int32_t vgId) {
  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
312

313 314 315 316
  char buf1[TSDB_OFFSET_LEN] = {0};
  char buf2[TSDB_OFFSET_LEN] = {0};
  tFormatOffset(buf1, TSDB_OFFSET_LEN, &pRsp->reqOffset);
  tFormatOffset(buf2, TSDB_OFFSET_LEN, &pRsp->rspOffset);
317

318
  tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId,
dengyihao's avatar
dengyihao 已提交
319
          pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
320 321 322 323

  return 0;
}

324
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
325 326
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
327

X
Xiaoyu Wang 已提交
328 329
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
330
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
331 332
    return -1;
  }
333

334 335
  tDecoderClear(&decoder);

336 337 338
  STqOffset* pOffset = &vgOffset.offset;

  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
339
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
340 341 342 343 344 345
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
            pOffset->val.version);
    if (pOffset->val.version + 1 == sversion) {
      pOffset->val.version += 1;
346
    }
347
  } else {
348
    tqError("invalid commit offset type:%d", pOffset->val.type);
349
    return -1;
350
  }
351

352 353
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
354
    tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
355
            vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
356
    return 0;  // no need to update the offset value
357 358
  }

359
  // save the new offset value
360
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
361
    return -1;
362
  }
363

364 365 366
  return 0;
}

367
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
368 369
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
370 371 372

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
373
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
H
Haojun Liao 已提交
374
    tqError("vgId:%d failed to decode seek msg", vgId);
375 376 377 378 379
    return -1;
  }

  tDecoderClear(&decoder);

H
Haojun Liao 已提交
380 381 382
  tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
          vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);

383 384 385
  STqOffset* pOffset = &vgOffset.offset;
  if (pOffset->val.type != TMQ_OFFSET__LOG) {
    tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
386 387 388
    return -1;
  }

389 390
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
  if (pHandle == NULL) {
dengyihao's avatar
dengyihao 已提交
391
    tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey);
392 393
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
394 395
  }

396 397 398 399 400 401 402 403
  // 2. check consumer-vg assignment status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != vgOffset.consumerId) {
    tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
404
  }
405 406
  taosRUnLockLatch(&pTq->lock);

dengyihao's avatar
dengyihao 已提交
407
  // 3. check the offset info
408 409 410 411 412 413
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL) {
    if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
      return 0;  // no need to update the offset value
    }
414

415 416 417 418 419 420
    if (pSavedOffset->val.version == pOffset->val.version) {
      tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
              pOffset->val.version, pSavedOffset->val.version);
      return 0;
    }
  }
421 422 423

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
424 425 426 427
  if (pOffset->val.version < sver) {
    pOffset->val.version = sver;
  } else if (pOffset->val.version > ever) {
    pOffset->val.version = ever;
428 429 430
  }

  // save the new offset value
431 432 433 434
  if (pSavedOffset != NULL) {
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
            pSavedOffset->val.version);
  } else {
dengyihao's avatar
dengyihao 已提交
435
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version);
436
  }
437

438 439
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
    tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
440 441 442
    return -1;
  }

H
Haojun Liao 已提交
443 444 445
  tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
          vgOffset.consumerId, vgOffset.offset.val.version);

446 447 448
  return 0;
}

L
Liu Jicong 已提交
449
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
450
  void* pIter = NULL;
451

L
Liu Jicong 已提交
452
  while (1) {
453
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
454 455 456 457
    if (pIter == NULL) {
      break;
    }

458
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
459

L
Liu Jicong 已提交
460 461
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
462
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
463 464
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
465
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
466 467 468 469 470
          return -1;
        }
      }
    }
  }
471

L
Liu Jicong 已提交
472 473 474
  return 0;
}

475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
  int32_t vgId = TD_VID(pTq->pVnode);
  taosWLockLatch(&pTq->lock);
  if (taosHashGetSize(pTq->pPushMgr) > 0) {
    void* pIter = taosHashIterate(pTq->pPushMgr, NULL);

    while (pIter) {
      STqHandle* pHandle = *(STqHandle**)pIter;
      tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);

      if (ASSERT(pHandle->msg != NULL)) {
        tqError("pHandle->msg should not be null");
        break;
      }else{
        SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
        tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
        taosMemoryFree(pHandle->msg);
        pHandle->msg = NULL;
      }

      pIter = taosHashIterate(pTq->pPushMgr, pIter);
    }

    taosHashClear(pTq->pPushMgr);
  }
wmmhello's avatar
wmmhello 已提交
500
  taosWUnLockLatch(&pTq->lock);
501 502 503
  return 0;
}

D
dapan1121 已提交
504
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
505
  SMqPollReq req = {0};
dengyihao's avatar
dengyihao 已提交
506
  int        code = 0;
D
dapan1121 已提交
507 508 509 510 511 512 513 514 515 516
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);
wmmhello's avatar
wmmhello 已提交
517
  STqHandle*   pHandle = NULL;
D
dapan1121 已提交
518

wmmhello's avatar
wmmhello 已提交
519 520 521 522 523 524 525 526 527 528
  while (1) {
    taosWLockLatch(&pTq->lock);
    // 1. find handle
    pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
    if (pHandle == NULL) {
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_MSG;
      taosWUnLockLatch(&pTq->lock);
      return -1;
    }
D
dapan1121 已提交
529

530 531
    // 2. check re-balance status
    if (pHandle->consumerId != consumerId) {
dengyihao's avatar
dengyihao 已提交
532 533
      tqError("ERROR tmq poll: consumer:0x%" PRIx64
              " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
534
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
535
      terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
536 537 538
      taosWUnLockLatch(&pTq->lock);
      return -1;
    }
wmmhello's avatar
wmmhello 已提交
539

wmmhello's avatar
wmmhello 已提交
540
    bool exec = tqIsHandleExec(pHandle);
dengyihao's avatar
dengyihao 已提交
541
    if (!exec) {
wmmhello's avatar
wmmhello 已提交
542
      tqSetHandleExec(pHandle);
dengyihao's avatar
dengyihao 已提交
543
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
544
      tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
dengyihao's avatar
dengyihao 已提交
545
              req.subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
546 547 548
      taosWUnLockLatch(&pTq->lock);
      break;
    }
549
    taosWUnLockLatch(&pTq->lock);
550

dengyihao's avatar
dengyihao 已提交
551 552 553
    tqDebug("tmq poll: consumer:0x%" PRIx64
            "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
            consumerId, vgId, req.subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
554
    taosMsleep(10);
D
dapan1121 已提交
555 556 557
  }

  // 3. update the epoch value
558
  if (pHandle->epoch < reqEpoch) {
dengyihao's avatar
dengyihao 已提交
559
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
X
Xiaoyu Wang 已提交
560
            reqEpoch);
D
dapan1121 已提交
561 562 563
    pHandle->epoch = reqEpoch;
  }

wmmhello's avatar
wmmhello 已提交
564
  char buf[TSDB_OFFSET_LEN] = {0};
565
  tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
D
dapan1121 已提交
566 567 568
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);

wmmhello's avatar
wmmhello 已提交
569
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
570
  tqSetHandleIdle(pHandle);
571

572
  tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId,
dengyihao's avatar
dengyihao 已提交
573
          req.subKey, pHandle);
574
  return code;
D
dapan1121 已提交
575 576
}

577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  // 2. check re-balance status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, vgId, req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
  }
  taosRUnLockLatch(&pTq->lock);

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);

wmmhello's avatar
wmmhello 已提交
614 615 616 617 618 619
  if (req.useSnapshot == true) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_PARA;
    tDeleteMqDataRsp(&dataRsp);
    return -1;
  }
620

wmmhello's avatar
wmmhello 已提交
621
  dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
622

wmmhello's avatar
wmmhello 已提交
623 624
  if (reqOffset.type == TMQ_OFFSET__LOG) {
    dataRsp.rspOffset.version = reqOffset.version;
625 626 627 628 629 630 631 632 633
  } else if(reqOffset.type < 0){
    STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
    if (pOffset != NULL) {
      if (pOffset->val.type != TMQ_OFFSET__LOG) {
        tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey);
        terrno = TSDB_CODE_INVALID_PARA;
        tDeleteMqDataRsp(&dataRsp);
        return -1;
      }
634

635 636 637 638 639 640 641
      dataRsp.rspOffset.version = pOffset->val.version;
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
    }else{
      if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
        dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
      } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
        dataRsp.rspOffset.version = ever;
642
      }
643
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
644
    }
wmmhello's avatar
wmmhello 已提交
645 646 647 648 649 650
  } else {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
            reqOffset.type);
    terrno = TSDB_CODE_INVALID_PARA;
    tDeleteMqDataRsp(&dataRsp);
    return -1;
651 652 653
  }

  tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
wmmhello's avatar
wmmhello 已提交
654
  tDeleteMqDataRsp(&dataRsp);
655 656 657
  return 0;
}

658
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
659
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
dengyihao's avatar
dengyihao 已提交
660
  int32_t        vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
661

662
  tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
663
  int32_t code = 0;
L
Liu Jicong 已提交
664

wmmhello's avatar
wmmhello 已提交
665
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
666 667
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
668
    while (tqIsHandleExec(pHandle)) {
dengyihao's avatar
dengyihao 已提交
669 670
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId,
              pHandle->subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
671
      taosMsleep(10);
672
    }
673

L
Liu Jicong 已提交
674 675 676
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
677

L
Liu Jicong 已提交
678 679 680 681
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
682
  }
683

L
Liu Jicong 已提交
684 685
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
686
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
687
  }
L
Liu Jicong 已提交
688

L
Liu Jicong 已提交
689
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
690
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
691
  }
wmmhello's avatar
wmmhello 已提交
692 693
  taosWUnLockLatch(&pTq->lock);

L
Liu Jicong 已提交
694
  return 0;
L
Liu Jicong 已提交
695 696
}

697
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
698 699
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
700
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
701
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
702 703 704 705
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
706 707 708 709 710
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
711 712 713 714 715 716
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

717
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
718 719 720 721 722 723 724 725 726 727 728
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

729
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
dengyihao's avatar
dengyihao 已提交
730
  int         ret = 0;
L
Liu Jicong 已提交
731
  SMqRebVgReq req = {0};
dengyihao's avatar
dengyihao 已提交
732
  SDecoder    dc = {0};
733 734 735 736 737 738 739 740 741

  tDecoderInit(&dc, msg, msgLen);

  // decode req
  if (tDecodeSMqRebVgReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
  }
L
Liu Jicong 已提交
742

743
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
D
dapan1121 已提交
744
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
745

746 747 748 749 750 751 752 753
  STqHandle* pHandle = NULL;
  while(1){
    pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
    if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0){
      break;
    }
  }

L
Liu Jicong 已提交
754
  if (pHandle == NULL) {
L
Liu Jicong 已提交
755
    if (req.oldConsumerId != -1) {
756
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
757
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
758
    }
L
Liu Jicong 已提交
759
    if (req.newConsumerId == -1) {
760
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
761
      goto end;
L
Liu Jicong 已提交
762
    }
763 764 765 766
    STqHandle handle = {0};
    ret = tqCreateHandle(pTq, &req, &handle);
    if(ret < 0){
      tqDestroyTqHandle(&handle);
767
      goto end;
768
    }
769
    ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
L
Liu Jicong 已提交
770
  } else {
771
    taosWLockLatch(&pTq->lock);
wmmhello's avatar
wmmhello 已提交
772

D
dapan1121 已提交
773
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
dengyihao's avatar
dengyihao 已提交
774 775
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId,
             req.newConsumerId);
776 777 778
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
779 780
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
    }
dengyihao's avatar
dengyihao 已提交
781
    //    atomic_add_fetch_32(&pHandle->epoch, 1);
782

783
    // kill executing task
dengyihao's avatar
dengyihao 已提交
784 785 786 787 788 789 790 791 792 793
    //    if(tqIsHandleExec(pHandle)) {
    //      qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
    //      if (pTaskInfo != NULL) {
    //        qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
    //      }

    //      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
    //        qStreamCloseTsdbReader(pTaskInfo);
    //      }
    //    }
wmmhello's avatar
wmmhello 已提交
794 795
    // remove if it has been register in the push manager, and return one empty block to consumer
    tqUnregisterPushHandle(pTq, pHandle);
796
    taosWUnLockLatch(&pTq->lock);
797
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
L
Liu Jicong 已提交
798
  }
L
Liu Jicong 已提交
799

800
end:
801
  tDecoderClear(&dc);
802
  return ret;
L
Liu Jicong 已提交
803
}
804

dengyihao's avatar
dengyihao 已提交
805
void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
L
liuyao 已提交
806

807
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
D
dapan1121 已提交
808
  int32_t vgId = TD_VID(pTq->pVnode);
809

810
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
811
  pTask->refCnt = 1;
812
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
dengyihao's avatar
dengyihao 已提交
813
  pTask->inputQueue = streamQueueOpen(512 << 10);
814
  pTask->outputInfo.queue = streamQueueOpen(512 << 10);
L
Liu Jicong 已提交
815

816
  if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) {
H
Haojun Liao 已提交
817
    tqError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
L
Liu Jicong 已提交
818
    return -1;
L
Liu Jicong 已提交
819 820
  }

821
  pTask->initTs = taosGetTimestampMs();
L
Liu Jicong 已提交
822
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
823
  pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
824
  pTask->pMsgCb = &pTq->pVnode->msgCb;
825
  pTask->pMeta = pTq->pStreamMeta;
826

827
  // backup the initial status, and set it to be TASK_STATUS__INIT
828
  pTask->chkInfo.version = ver;
829
  pTask->chkInfo.currentVer = ver;
830

831 832
  pTask->dataRange.range.maxVer = ver;
  pTask->dataRange.range.minVer = ver;
833

834
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
L
liuyao 已提交
835
    SStreamTask* pSateTask = pTask;
L
liuyao 已提交
836
    SStreamTask task = {0};
L
liuyao 已提交
837
    if (pTask->info.fillHistory) {
L
liuyao 已提交
838 839 840
      task.id = pTask->streamTaskId;
      task.pMeta = pTask->pMeta;
      pSateTask = &task;
L
liuyao 已提交
841
    }
842

L
liuyao 已提交
843
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
844 845 846 847
    if (pTask->pState == NULL) {
      return -1;
    }

L
liuyao 已提交
848 849 850 851 852
    SReadHandle handle = {.vnode = pTq->pVnode,
                          .initTqReader = 1,
                          .pStateBackend = pTask->pState,
                          .fillHistory = pTask->info.fillHistory,
                          .winRange = pTask->dataRange.window};
853
    initStorageAPI(&handle.api);
854

855 856
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
857 858
      return -1;
    }
859

860
    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
861
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
L
liuyao 已提交
862
    SStreamTask* pSateTask = pTask;
L
liuyao 已提交
863
    SStreamTask task = {0};
L
liuyao 已提交
864
    if (pTask->info.fillHistory) {
L
liuyao 已提交
865 866 867
      task.id = pTask->streamTaskId;
      task.pMeta = pTask->pMeta;
      pSateTask = &task;
L
liuyao 已提交
868
    }
L
liuyao 已提交
869
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
870 871 872
    if (pTask->pState == NULL) {
      return -1;
    }
873

874
    int32_t     numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList);
L
liuyao 已提交
875 876 877 878 879
    SReadHandle handle = {.vnode = NULL,
                          .numOfVgroups = numOfVgroups,
                          .pStateBackend = pTask->pState,
                          .fillHistory = pTask->info.fillHistory,
                          .winRange = pTask->dataRange.window};
880
    initStorageAPI(&handle.api);
881

882
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
883
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
884 885
      return -1;
    }
886
    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
L
Liu Jicong 已提交
887
  }
L
Liu Jicong 已提交
888 889

  // sink
890
  if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
891
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
892
    pTask->smaSink.smaSink = smaHandleRes;
893
  } else if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
894
    pTask->tbSink.vnode = pTq->pVnode;
H
Haojun Liao 已提交
895
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
L
Liu Jicong 已提交
896

X
Xiaoyu Wang 已提交
897
    int32_t   ver1 = 1;
5
54liuyao 已提交
898
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
899
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
900
    if (code == TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
901
      ver1 = info.skmVer;
5
54liuyao 已提交
902
    }
L
Liu Jicong 已提交
903

904 905
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
906
    if (pTask->tbSink.pTSchema == NULL) {
D
dapan1121 已提交
907 908
      return -1;
    }
L
liuyao 已提交
909 910
    pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
    tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
L
Liu Jicong 已提交
911
  }
912

913
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
914
    SWalFilterCond cond = {.deleteMsg = 1};  // delete msg also extract from wal files
915
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
916 917
  }

918 919 920 921 922 923
  // reset the task status from unfinished transaction
  if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
    tqWarn("s-task:%s reset task status to be normal, kept in meta status: Paused", pTask->id.idStr);
    pTask->status.taskStatus = TASK_STATUS__NORMAL;
  }

H
Haojun Liao 已提交
924
  taosThreadMutexInit(&pTask->lock, NULL);
925
  streamSetupScheduleTrigger(pTask);
926

927
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64
928
         " child id:%d, level:%d, fill-history:%d, trigger:%" PRId64 " ms, disable pause",
929
         vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel,
930
         pTask->info.fillHistory, pTask->triggerParam);
931 932 933

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
934
  return 0;
L
Liu Jicong 已提交
935
}
L
Liu Jicong 已提交
936

937
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
938 939 940 941
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

942 943
  SStreamTaskCheckReq req;
  SDecoder            decoder;
944

X
Xiaoyu Wang 已提交
945
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
946
  tDecodeStreamTaskCheckReq(&decoder, &req);
947
  tDecoderClear(&decoder);
948

949 950
  int32_t taskId = req.downstreamTaskId;

951 952 953 954 955 956 957 958 959
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
960

L
Liu Jicong 已提交
961
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
962
  if (pTask != NULL) {
963
    rsp.status = streamTaskCheckStatus(pTask);
964 965
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

966
    tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d",
967 968
            pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status);
969 970
  } else {
    rsp.status = 0;
971 972
    tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
            taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
973 974
  }

975
  return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId);
976 977
}

978 979 980 981
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
  char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t len = pMsg->contLen - sizeof(SMsgHead);

982 983 984 985
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
986
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
987
  code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
988

989 990 991 992 993
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

994
  tDecoderClear(&decoder);
995 996
  tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
          rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
997

L
Liu Jicong 已提交
998
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
999
  if (pTask == NULL) {
1000
    tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
1001
            pTq->pStreamMeta->vgId);
1002
    terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
1003 1004 1005
    return -1;
  }

1006
  code = streamProcessCheckRsp(pTask, &rsp);
L
Liu Jicong 已提交
1007 1008
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
1009 1010
}

1011
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1012 1013 1014
  int32_t code = 0;
  int32_t vgId = TD_VID(pTq->pVnode);

5
54liuyao 已提交
1015 1016 1017
  if (tsDisableStream) {
    return 0;
  }
1018 1019 1020 1021

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
1022
    terrno = TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
1023 1024
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId,
            (int32_t)sizeof(SStreamTask));
1025 1026
    return -1;
  }
1027

1028 1029
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1030
  code = tDecodeStreamTask(&decoder, pTask);
1031 1032 1033 1034 1035
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
1036

1037 1038
  tDecoderClear(&decoder);

1039 1040
  SStreamMeta* pStreamMeta = pTq->pStreamMeta;

1041
  // 2.save task, use the newest commit version as the initial start version of stream task.
1042
  int32_t taskId = 0;
1043
  taosWLockLatch(&pStreamMeta->lock);
1044
  code = streamMetaRegisterTask(pStreamMeta, sversion, pTask);
1045

1046
  taskId = pTask->id.taskId;
1047
  int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
1048
  if (code < 0) {
1049
    tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
H
Haojun Liao 已提交
1050
    tFreeStreamTask(pTask);
1051
    taosWUnLockLatch(&pStreamMeta->lock);
1052 1053 1054
    return -1;
  }

1055
  taosWUnLockLatch(&pStreamMeta->lock);
1056 1057
  tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr,
          streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
1058

1059
  // 3. It's an fill history task, do nothing. wait for the main task to start it
1060 1061 1062 1063 1064 1065
  SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId);
  if (p != NULL) {
    streamTaskCheckDownstreamTasks(pTask);
  }

  streamMetaReleaseTask(pStreamMeta, p);
1066 1067 1068
  return 0;
}

1069
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
1070
  int32_t code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1071 1072
  char*   msg = pMsg->pCont;

1073
  SStreamMeta*           pMeta = pTq->pStreamMeta;
1074
  SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg;
1075 1076

  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
1077
  if (pTask == NULL) {
1078 1079
    tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
            pMeta->vgId, pReq->taskId);
1080 1081 1082 1083
    return -1;
  }

  // do recovery step 1
1084
  const char* id = pTask->id.idStr;
1085 1086
  const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
  tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, pStatus);
1087

H
Haojun Liao 已提交
1088
  int64_t st = taosGetTimestampMs();
1089

1090
  // we have to continue retrying to successfully execute the scan history task.
1091 1092 1093 1094 1095 1096 1097 1098
  int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
                                                     TASK_SCHED_STATUS__WAITING);
  if (schedStatus != TASK_SCHED_STATUS__INACTIVE) {
    tqError(
        "s-task:%s failed to start scan-history in first stream time window since already started, unexpected "
        "sched-status:%d",
        id, schedStatus);
    return 0;
1099
  }
1100

1101 1102 1103 1104 1105 1106
  ASSERT(pTask->status.pauseAllowed == false);

  if (pTask->info.fillHistory == 1) {
    streamTaskEnablePause(pTask);
  }

1107
  if (!streamTaskRecoverScanStep1Finished(pTask)) {
L
liuyao 已提交
1108 1109
    streamSourceScanHistoryData(pTask);
  }
H
Haojun Liao 已提交
1110

1111 1112 1113
  // disable the pause when handling the step2 scan of tsdb data.
  // the whole next procedure cann't be stopped.
  // todo fix it: the following procedure should be executed completed and then shutdown when trying to close vnode.
1114 1115 1116 1117
  if (pTask->info.fillHistory == 1) {
    streamTaskDisablePause(pTask);
  }

L
liuyao 已提交
1118
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
1119
    tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
L
liuyao 已提交
1120
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
1121
    streamMetaReleaseTask(pMeta, pTask);
L
Liu Jicong 已提交
1122 1123 1124
    return 0;
  }

H
Haojun Liao 已提交
1125
  double el = (taosGetTimestampMs() - st) / 1000.0;
1126
  tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", id, el);
H
Haojun Liao 已提交
1127

H
Haojun Liao 已提交
1128
  if (pTask->info.fillHistory) {
L
liuyao 已提交
1129
    SVersionRange* pRange = NULL;
1130
    SStreamTask*   pStreamTask = NULL;
1131

L
liuyao 已提交
1132 1133 1134 1135
    if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
      // 1. stop the related stream task, get the current scan wal version of stream task, ver.
      pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
      if (pStreamTask == NULL) {
1136
        qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s",
1137 1138 1139
               pTask->streamTaskId.taskId, pTask->id.idStr);

        pTask->status.taskStatus = TASK_STATUS__DROPPING;
1140
        tqDebug("s-task:%s fill-history task set status to be dropping", id);
1141 1142 1143 1144

        streamMetaSaveTask(pMeta, pTask);
        streamMetaReleaseTask(pMeta, pTask);
        return -1;
L
liuyao 已提交
1145
      }
L
Liu Jicong 已提交
1146

L
liuyao 已提交
1147
      ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
L
Liu Jicong 已提交
1148

1149
      // stream task in TASK_STATUS__SCAN_HISTORY can not be paused.
L
liuyao 已提交
1150
      // wait for the stream task get ready for scan history data
1151
      while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
1152 1153
        tqDebug(
            "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
1154
            id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
L
liuyao 已提交
1155 1156
        taosMsleep(100);
      }
1157

L
liuyao 已提交
1158
      // now we can stop the stream task execution
1159 1160 1161
      streamTaskHalt(pStreamTask);
      tqDebug("s-task:%s level:%d is halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel,
              id);
H
Haojun Liao 已提交
1162

L
liuyao 已提交
1163
      // if it's an source task, extract the last version in wal.
1164 1165 1166
      pRange = &pTask->dataRange.range;
      int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
      streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
L
liuyao 已提交
1167
    }
1168

L
liuyao 已提交
1169
    if (!streamTaskRecoverScanStep1Finished(pTask)) {
1170 1171 1172 1173
      STimeWindow* pWindow = &pTask->dataRange.window;
      tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
              ", do secondary scan-history data after halt the related stream task:%s",
              id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id);
1174
      ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
L
Liu Jicong 已提交
1175

1176
      st = taosGetTimestampMs();
1177
      streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
L
liuyao 已提交
1178
    }
1179

1180
    if (!streamTaskRecoverScanStep2Finished(pTask)) {
1181
      streamSourceScanHistoryData(pTask);
1182

L
liuyao 已提交
1183
      if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
1184
        tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
1185 1186 1187
        streamMetaReleaseTask(pMeta, pTask);
        return 0;
      }
1188

L
liuyao 已提交
1189
      streamTaskRecoverSetAllStepFinished(pTask);
1190
    }
H
Haojun Liao 已提交
1191

L
liuyao 已提交
1192
    el = (taosGetTimestampMs() - st) / 1000.0;
1193
    tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", id, el);
L
liuyao 已提交
1194

1195
    // 3. notify downstream tasks to transfer executor state after handle all history blocks.
L
liuyao 已提交
1196 1197 1198 1199 1200
    if (!pTask->status.transferState) {
      code = streamDispatchTransferStateMsg(pTask);
      if (code != TSDB_CODE_SUCCESS) {
        // todo handle error
      }
1201 1202

      pTask->status.transferState = true;
H
Haojun Liao 已提交
1203
    }
H
Haojun Liao 已提交
1204 1205 1206

    // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
    // 5. resume the related stream task.
1207
    streamTryExec(pTask);
H
Haojun Liao 已提交
1208

1209
    streamMetaReleaseTask(pMeta, pTask);
1210
    streamMetaReleaseTask(pMeta, pStreamTask);
1211 1212 1213 1214
  } else {
    // todo update the chkInfo version for current task.
    // this task has an associated history stream task, so we need to scan wal from the end version of
    // history scan. The current version of chkInfo.current is not updated during the history scan
1215 1216
    STimeWindow* pWindow = &pTask->dataRange.window;

1217
    if (pTask->historyTaskId.taskId == 0) {
1218
      *pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
1219
      tqDebug(
1220
          "s-task:%s scan history in stream time window completed, no related fill history task, reset the time "
1221 1222
          "window:%" PRId64 " - %" PRId64,
          id, pWindow->skey, pWindow->ekey);
1223
    } else {
1224
      tqDebug(
1225
          "s-task:%s scan history in stream time window completed, now start to handle data from WAL, start "
1226
          "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
1227
          id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
1228
    }
1229

1230
    // notify the downstream agg tasks that upstream tasks are ready to processing the WAL data, update the
1231 1232
    code = streamTaskScanHistoryDataComplete(pTask);
    streamMetaReleaseTask(pMeta, pTask);
1233

1234 1235
    // when all source task complete to scan history data in stream time window, they are allowed to handle stream data
    // at the same time.
1236 1237
    return code;
  }
1238 1239 1240 1241

  return 0;
}

H
Haojun Liao 已提交
1242
// notify the downstream tasks to transfer executor state after handle all history blocks.
H
Haojun Liao 已提交
1243 1244 1245 1246 1247
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t len = pMsg->contLen - sizeof(SMsgHead);

  SStreamTransferReq req = {0};
H
Haojun Liao 已提交
1248

1249
  SDecoder decoder;
H
Haojun Liao 已提交
1250
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
1251
  int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
H
Haojun Liao 已提交
1252
  tDecoderClear(&decoder);
H
Haojun Liao 已提交
1253

1254
  tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId);
1255

1256
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId);
1257
  if (pTask == NULL) {
1258
    tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId);
1259 1260 1261
    return -1;
  }

H
Haojun Liao 已提交
1262 1263
  int32_t remain = streamAlignTransferState(pTask);
  if (remain > 0) {
1264
    tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain);
H
Haojun Liao 已提交
1265
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
H
Haojun Liao 已提交
1266 1267 1268
    return 0;
  }

L
liuyao 已提交
1269
  // transfer the ownership of executor state
1270 1271
  tqDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", pTask->id.idStr);
  ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
1272

1273
  pTask->status.transferState = true;
L
Liu Jicong 已提交
1274

H
Haojun Liao 已提交
1275
  streamSchedExec(pTask);
L
Liu Jicong 已提交
1276
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1277 1278 1279
  return 0;
}

1280
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
1281 1282
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1283 1284

  // deserialize
1285
  SStreamScanHistoryFinishReq req = {0};
1286 1287

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1288
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1289
  tDecodeStreamScanHistoryFinishReq(&decoder, &req);
1290 1291
  tDecoderClear(&decoder);

1292
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId);
1293
  if (pTask == NULL) {
1294 1295
    tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed",
            pTq->pStreamMeta->vgId, req.downstreamTaskId);
1296 1297 1298
    return -1;
  }

1299 1300 1301
  tqDebug("s-task:%s receive scan-history finish msg from task:0x%x", pTask->id.idStr, req.upstreamTaskId);

  int32_t code = streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info);
L
Liu Jicong 已提交
1302
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1303
  return code;
L
Liu Jicong 已提交
1304
}
L
Liu Jicong 已提交
1305

1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

  // deserialize
  SStreamCompleteHistoryMsg req = {0};

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  tDecodeCompleteHistoryDataMsg(&decoder, &req);
  tDecoderClear(&decoder);

  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.upstreamTaskId);
  if (pTask == NULL) {
    tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed",
            pTq->pStreamMeta->vgId, req.upstreamTaskId);
    return -1;
  }

  int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
  if (remain > 0) {
1327 1328
    tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, remain:%d not send finish rsp",
            pTask->id.idStr, req.downstreamId, remain);
1329
  } else {
1330 1331
    tqDebug(
        "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history "
H
Haojun Liao 已提交
1332
        "completed msg", pTask->id.idStr, req.downstreamId);
1333 1334 1335 1336
    streamProcessScanHistoryFinishRsp(pTask);
  }

  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1337 1338 1339
  return 0;
}

1340 1341 1342 1343
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

H
Haojun Liao 已提交
1344
  (*pRefBlock) = NULL;
H
Haojun Liao 已提交
1345

1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tDecoderInit(pCoder, (uint8_t*)pData, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
  if (numOfTables == 0 || pRes->affectedRows == 0) {
    taosArrayDestroy(pRes->uidList);
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, numOfTables);
  pDelBlock->info.rows = numOfTables;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < numOfTables; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataSetVal(pUidCol, i, (const char*)pUid, false);

    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

  taosArrayDestroy(pRes->uidList);
  *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
H
Haojun Liao 已提交
1384
  if ((*pRefBlock) == NULL) {
1385 1386 1387 1388 1389 1390 1391 1392
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
  (*pRefBlock)->pBlock = pDelBlock;
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1393 1394
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1395 1396 1397 1398

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1399 1400 1401 1402 1403 1404
  if (taskId == STREAM_TASK_STATUS_CHECK_ID) {
    tqStreamTasksStatusCheck(pTq);
    return 0;
  }

  if (taskId == EXTRACT_DATA_FROM_WAL_ID) {  // all tasks are extracted submit data from the wal
1405
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1406
    return 0;
1407
  }
1408

1409
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1410
  if (pTask != NULL) {
1411
    // even in halt status, the data in inputQ must be processed
1412
    int8_t status = pTask->status.taskStatus;
1413
    if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) {
1414
      tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
dengyihao's avatar
dengyihao 已提交
1415
              pTask->chkInfo.version);
1416
      streamProcessRunReq(pTask);
1417
    } else {
1418
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
1419 1420
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
              pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
1421
    }
1422

L
Liu Jicong 已提交
1423
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1424
    tqStartStreamTasks(pTq);
L
Liu Jicong 已提交
1425
    return 0;
1426
  } else {
1427
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
1428
    return -1;
L
Liu Jicong 已提交
1429
  }
L
Liu Jicong 已提交
1430 1431
}

L
Liu Jicong 已提交
1432
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
dengyihao's avatar
dengyihao 已提交
1433 1434 1435
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1436 1437 1438 1439

  SStreamDispatchReq req = {0};

  SDecoder decoder;
L
Liu Jicong 已提交
1440
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1441
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1442

1443
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1444
  if (pTask) {
1445
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1446
    streamProcessDispatchMsg(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1447
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1448
    return 0;
1449
  } else {
L
liuyao 已提交
1450
    tDeleteStreamDispatchReq(&req);
1451
    return -1;
L
Liu Jicong 已提交
1452
  }
L
Liu Jicong 已提交
1453 1454
}

L
Liu Jicong 已提交
1455 1456
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1457
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1458
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1459 1460

  int32_t vgId = pTq->pStreamMeta->vgId;
L
Liu Jicong 已提交
1461
  if (pTask) {
1462
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1463
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1464
    return 0;
1465
  } else {
1466
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
1467
    return TSDB_CODE_INVALID_MSG;
L
Liu Jicong 已提交
1468
  }
L
Liu Jicong 已提交
1469
}
L
Liu Jicong 已提交
1470

1471
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1472
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1473
  tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);
1474 1475 1476 1477 1478
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask == NULL) {
    tqError("vgId:%d failed to acquire s-task:0x%x when dropping it", pTq->pStreamMeta->vgId, pReq->taskId);
    return 0;
  }
1479

1480 1481
  streamMetaUnregisterTask(pTq->pStreamMeta, pReq->taskId);
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1482
  return 0;
L
Liu Jicong 已提交
1483
}
L
Liu Jicong 已提交
1484

5
54liuyao 已提交
1485 1486
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
1487 1488 1489 1490 1491 1492

  SStreamMeta* pMeta = pTq->pStreamMeta;
  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
  if (pTask == NULL) {
    tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
            pReq->taskId);
1493 1494 1495

    // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
    return TSDB_CODE_SUCCESS;
1496 1497 1498
  }

  tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
1499
  streamTaskPause(pTask);
1500 1501 1502 1503 1504

  SStreamTask* pHistoryTask = NULL;
  if (pTask->historyTaskId.taskId != 0) {
    pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
    if (pHistoryTask == NULL) {
1505 1506
      tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already. Pause success",
              pMeta->vgId, pTask->historyTaskId.taskId);
1507 1508

      streamMetaReleaseTask(pMeta, pTask);
1509 1510 1511

      // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
      return TSDB_CODE_SUCCESS;
1512 1513
    }

1514 1515
    tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
    streamTaskPause(pHistoryTask);
1516 1517 1518 1519 1520
  }

  streamMetaReleaseTask(pMeta, pTask);
  if (pHistoryTask != NULL) {
    streamMetaReleaseTask(pMeta, pHistoryTask);
L
liuyao 已提交
1521
  }
1522

1523
  return TSDB_CODE_SUCCESS;
L
liuyao 已提交
1524 1525 1526 1527
}

int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int8_t igUntreated) {
  int32_t vgId = pTq->pStreamMeta->vgId;
1528 1529 1530
  if (pTask == NULL) {
    return -1;
  }
L
liuyao 已提交
1531

1532 1533
  // todo: handle the case: resume from halt to pause/ from halt to normal/ from pause to normal
  streamTaskResume(pTask);
1534

1535 1536 1537
  int32_t level = pTask->info.taskLevel;
  int8_t  status = pTask->status.taskStatus;
  if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) {
1538
    // no lock needs to secure the access of the version
1539
    if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
1540 1541 1542 1543 1544 1545 1546 1547 1548 1549
      // discard all the data  when the stream task is suspended.
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
              ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
    } else {  // from the previous paused version and go on
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
    }

1550
    if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory) {
1551
      streamStartRecoverTask(pTask, igUntreated);
1552
    } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
1553 1554 1555
      tqStartStreamTasks(pTq);
    } else {
      streamSchedExec(pTask);
L
liuyao 已提交
1556
    }
L
liuyao 已提交
1557
  }
1558 1559

  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
5
54liuyao 已提交
1560 1561 1562 1563 1564
  return 0;
}

int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
L
liuyao 已提交
1565
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
L
liuyao 已提交
1566 1567 1568
  int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
  if (code != 0) {
    return code;
L
liuyao 已提交
1569
  }
1570

L
liuyao 已提交
1571
  SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId);
L
liuyao 已提交
1572 1573 1574
  if (pHistoryTask) {
    code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
  }
1575

L
liuyao 已提交
1576
  return code;
5
54liuyao 已提交
1577 1578
}

L
Liu Jicong 已提交
1579
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
1580 1581 1582 1583 1584
  char*    msgStr = pMsg->pCont;
  char*    msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t  msgLen = pMsg->contLen - sizeof(SMsgHead);
  SDecoder decoder;

L
Liu Jicong 已提交
1585
  SStreamRetrieveReq req;
1586
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1587
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1588
  tDecoderClear(&decoder);
1589

L
Liu Jicong 已提交
1590
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1591
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1592

L
Liu Jicong 已提交
1593
  if (pTask) {
1594
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
1595
    streamProcessRetrieveReq(pTask, &req, &rsp);
1596

L
Liu Jicong 已提交
1597
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1598
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1599
    return 0;
L
Liu Jicong 已提交
1600
  } else {
L
liuyao 已提交
1601
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1602
    return -1;
L
Liu Jicong 已提交
1603 1604 1605 1606 1607 1608 1609
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1610

1611 1612 1613 1614 1615 1616
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1617 1618 1619

  SStreamDispatchReq req;
  SDecoder           decoder;
1620
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1621 1622
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1623
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1624 1625
    goto FAIL;
  }
L
Liu Jicong 已提交
1626
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1627

L
Liu Jicong 已提交
1628
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1629
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1630
  if (pTask != NULL) {
1631
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1632
    streamProcessDispatchMsg(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1633
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1634 1635
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1636
    return 0;
5
54liuyao 已提交
1637
  } else {
1638

5
54liuyao 已提交
1639
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1640
  }
L
Liu Jicong 已提交
1641

1642 1643
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1644
FAIL:
1645 1646 1647 1648
  if (pMsg->info.handle == NULL) {
    tqError("s-task:0x%x vgId:%d msg handle is null, abort enqueue dispatch msg", pTq->pStreamMeta->vgId, taskId);
    return -1;
  }
1649 1650 1651

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1652
    SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info};
1653
    tqError("s-task:0x%x send dispatch error rsp, code:%s", taskId, tstrerror(code));
1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

1669 1670 1671 1672
  int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
  SRpcMsg rsp = { .code = code, .info = pMsg->info, .contLen = len, .pCont = pRspHead};
  tqError("s-task:0x%x send dispatch error rsp, code:%s", taskId, tstrerror(code));

L
Liu Jicong 已提交
1673
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1674 1675
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1676
  return -1;
L
Liu Jicong 已提交
1677
}
L
Liu Jicong 已提交
1678

1679
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1680