tq.c 57.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
static int32_t tqInitialize(STQ* pTq);
dengyihao's avatar
dengyihao 已提交
22

wmmhello's avatar
wmmhello 已提交
23
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
dengyihao's avatar
dengyihao 已提交
24 25
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
wmmhello's avatar
wmmhello 已提交
26

L
Liu Jicong 已提交
27
int32_t tqInit() {
L
Liu Jicong 已提交
28 29 30 31 32 33
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

34 35 36 37 38 39
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
40 41 42
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
43
    atomic_store_8(&tqMgmt.inited, 1);
44
  }
45

L
Liu Jicong 已提交
46 47
  return 0;
}
L
Liu Jicong 已提交
48

49
void tqCleanUp() {
L
Liu Jicong 已提交
50 51 52 53 54 55 56 57
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
58
    streamCleanUp();
L
Liu Jicong 已提交
59 60
    atomic_store_8(&tqMgmt.inited, 0);
  }
61
}
L
Liu Jicong 已提交
62

63
void tqDestroyTqHandle(void* data) {
64 65
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
wmmhello's avatar
wmmhello 已提交
66

67
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
68
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
69
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
70
    tqReaderClose(pData->execHandle.pTqReader);
71 72
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
73
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
74
    walCloseReader(pData->pWalReader);
75
    tqReaderClose(pData->execHandle.pTqReader);
76 77
    taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
    nodesDestroyNode(pData->execHandle.execTb.node);
78
  }
dengyihao's avatar
dengyihao 已提交
79
  if (pData->msg != NULL) {
80 81 82
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
D
dapan1121 已提交
83
  }
L
Liu Jicong 已提交
84 85
}

86 87 88 89 90
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
91
STQ* tqOpen(const char* path, SVnode* pVnode) {
92
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
93
  if (pTq == NULL) {
S
Shengliang Guan 已提交
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
95 96
    return NULL;
  }
97

98
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
99
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
100
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
101

102
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
103
  taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle);
104

105
  taosInitRWLatch(&pTq->lock);
106
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
107

108
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
109
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
110

111 112 113 114 115 116 117
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
118 119 120
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
121
  if (tqMetaOpen(pTq) < 0) {
122
    return -1;
123 124
  }

L
Liu Jicong 已提交
125 126
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
127
    return -1;
128 129
  }

130
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
131
  if (pTq->pStreamMeta == NULL) {
132
    return -1;
L
Liu Jicong 已提交
133 134
  }

135 136
  // the version is kept in task's meta data
  // todo check if this version is required or not
137 138
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
139 140
  }

141
  return 0;
L
Liu Jicong 已提交
142
}
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
void tqClose(STQ* pTq) {
145 146
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
147
  }
148 149 150 151 152 153 154 155 156

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
157
}
L
Liu Jicong 已提交
158

159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
  bool inTimer = false;

  taosWLockLatch(&pMeta->lock);

  void* pIter = NULL;
  while(1) {
    pIter = taosHashIterate(pMeta->pTasks, pIter);
    if (pIter == NULL) {
      break;
    }

    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->status.timerActive == 1) {
      inTimer = true;
    }
  }

  taosWUnLockLatch(&pMeta->lock);

  return inTimer;
}

H
Haojun Liao 已提交
182 183 184 185 186 187 188 189 190 191 192 193
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
H
Haojun Liao 已提交
194
      tqDebug("vgId:%d s-task:%s set closing flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
195 196 197
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
198
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
H
Haojun Liao 已提交
199

200
      int64_t el = taosGetTimestampMs() - st;
H
Haojun Liao 已提交
201
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
202 203 204
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
H
Haojun Liao 已提交
205 206 207 208 209

    tqDebug("vgId:%d start to check all tasks", pTq->pStreamMeta->vgId);

    int64_t st = taosGetTimestampMs();

210 211 212
    while(hasStreamTaskInTimer(pTq->pStreamMeta)) {
      tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pTq->pStreamMeta->vgId);
      taosMsleep(100);
H
Haojun Liao 已提交
213 214 215 216
    }

    int64_t el = taosGetTimestampMs() - st;
    tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%"PRId64" ms", pTq->pStreamMeta->vgId, el);
H
Haojun Liao 已提交
217 218 219
  }
}

220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
//static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
//                             int64_t consumerId, int32_t type) {
//  int32_t len = 0;
//  int32_t code = 0;
//
//  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) {
//    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
//  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
//    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
//  }
//
//  if (code < 0) {
//    return -1;
//  }
//
//  int32_t tlen = sizeof(SMqRspHead) + len;
//  void*   buf = rpcMallocCont(tlen);
//  if (buf == NULL) {
//    return -1;
//  }
//
//  ((SMqRspHead*)buf)->mqMsgType = type;
//  ((SMqRspHead*)buf)->epoch = epoch;
//  ((SMqRspHead*)buf)->consumerId = consumerId;
//
//  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
//
//  SEncoder encoder = {0};
//  tEncoderInit(&encoder, abuf, len);
//
//  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) {
//    tEncodeMqDataRsp(&encoder, pRsp);
//  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
//    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
//  }
//
//  tEncoderClear(&encoder);
//
//  SRpcMsg rsp = {
//      .info = *pRpcHandleInfo,
//      .pCont = buf,
//      .contLen = tlen,
//      .code = 0,
//  };
//
//  tmsgSendRsp(&rsp);
//  return 0;
//}
L
Liu Jicong 已提交
268

269 270 271 272 273 274 275
int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pHandle->msg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
D
dapan1121 已提交
276

277 278 279 280
  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);
  dataRsp.blockNum = 0;
  dataRsp.rspOffset = dataRsp.reqOffset;
H
Haojun Liao 已提交
281
  tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
282
  tDeleteMqDataRsp(&dataRsp);
L
Liu Jicong 已提交
283 284 285
  return 0;
}

286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
//int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
//  SMqDataRsp dataRsp = {0};
//  dataRsp.head.consumerId = pHandle->consumerId;
//  dataRsp.head.epoch = pHandle->epoch;
//  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
//
//  int64_t sver = 0, ever = 0;
//  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
//  tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
//                  ever);
//
//  char buf1[TSDB_OFFSET_LEN] = {0};
//  char buf2[TSDB_OFFSET_LEN] = {0};
//  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
//  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
//  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
//          dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
//  return 0;
//}

306 307 308 309 310 311
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
                      int32_t type, int32_t vgId) {
  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
312

313 314 315 316
  char buf1[TSDB_OFFSET_LEN] = {0};
  char buf2[TSDB_OFFSET_LEN] = {0};
  tFormatOffset(buf1, TSDB_OFFSET_LEN, &pRsp->reqOffset);
  tFormatOffset(buf2, TSDB_OFFSET_LEN, &pRsp->rspOffset);
317

318
  tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId,
dengyihao's avatar
dengyihao 已提交
319
          pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
320 321 322 323

  return 0;
}

324
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
325 326
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
327

X
Xiaoyu Wang 已提交
328 329
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
330
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
331 332
    return -1;
  }
333

334 335
  tDecoderClear(&decoder);

336 337 338
  STqOffset* pOffset = &vgOffset.offset;

  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
339
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
340 341 342 343 344 345
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
            pOffset->val.version);
    if (pOffset->val.version + 1 == sversion) {
      pOffset->val.version += 1;
346
    }
347
  } else {
348
    tqError("invalid commit offset type:%d", pOffset->val.type);
349
    return -1;
350
  }
351

352 353
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
354
    tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
355
            vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
356
    return 0;  // no need to update the offset value
357 358
  }

359
  // save the new offset value
360
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
361
    return -1;
362
  }
363

364 365 366
  return 0;
}

367
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
368 369
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
370 371 372

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
373
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
H
Haojun Liao 已提交
374
    tqError("vgId:%d failed to decode seek msg", vgId);
375 376 377 378 379
    return -1;
  }

  tDecoderClear(&decoder);

H
Haojun Liao 已提交
380 381 382
  tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
          vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);

383 384 385
  STqOffset* pOffset = &vgOffset.offset;
  if (pOffset->val.type != TMQ_OFFSET__LOG) {
    tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
386 387 388
    return -1;
  }

389 390
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
  if (pHandle == NULL) {
dengyihao's avatar
dengyihao 已提交
391
    tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey);
392 393
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
394 395
  }

396 397 398 399 400 401 402 403
  // 2. check consumer-vg assignment status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != vgOffset.consumerId) {
    tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
404
  }
405 406
  taosRUnLockLatch(&pTq->lock);

dengyihao's avatar
dengyihao 已提交
407
  // 3. check the offset info
408 409 410 411 412 413
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL) {
    if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
      return 0;  // no need to update the offset value
    }
414

415 416 417 418 419 420
    if (pSavedOffset->val.version == pOffset->val.version) {
      tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
              pOffset->val.version, pSavedOffset->val.version);
      return 0;
    }
  }
421 422 423

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
424 425 426 427
  if (pOffset->val.version < sver) {
    pOffset->val.version = sver;
  } else if (pOffset->val.version > ever) {
    pOffset->val.version = ever;
428 429 430
  }

  // save the new offset value
431 432 433 434
  if (pSavedOffset != NULL) {
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
            pSavedOffset->val.version);
  } else {
dengyihao's avatar
dengyihao 已提交
435
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version);
436
  }
437

438 439
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
    tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
440 441 442
    return -1;
  }

H
Haojun Liao 已提交
443 444 445
  tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
          vgOffset.consumerId, vgOffset.offset.val.version);

446 447 448
  return 0;
}

L
Liu Jicong 已提交
449
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
450
  void* pIter = NULL;
451

L
Liu Jicong 已提交
452
  while (1) {
453
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
454 455 456 457
    if (pIter == NULL) {
      break;
    }

458
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
459

L
Liu Jicong 已提交
460 461
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
462
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
463 464
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
465
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
466 467 468 469 470
          return -1;
        }
      }
    }
  }
471

L
Liu Jicong 已提交
472 473 474
  return 0;
}

475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
  int32_t vgId = TD_VID(pTq->pVnode);
  taosWLockLatch(&pTq->lock);
  if (taosHashGetSize(pTq->pPushMgr) > 0) {
    void* pIter = taosHashIterate(pTq->pPushMgr, NULL);

    while (pIter) {
      STqHandle* pHandle = *(STqHandle**)pIter;
      tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);

      if (ASSERT(pHandle->msg != NULL)) {
        tqError("pHandle->msg should not be null");
        break;
      }else{
        SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
        tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
        taosMemoryFree(pHandle->msg);
        pHandle->msg = NULL;
      }

      pIter = taosHashIterate(pTq->pPushMgr, pIter);
    }

    taosHashClear(pTq->pPushMgr);
  }
wmmhello's avatar
wmmhello 已提交
500
  taosWUnLockLatch(&pTq->lock);
501 502 503
  return 0;
}

D
dapan1121 已提交
504
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
505
  SMqPollReq req = {0};
dengyihao's avatar
dengyihao 已提交
506
  int        code = 0;
D
dapan1121 已提交
507 508 509 510 511 512 513 514 515 516
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);
wmmhello's avatar
wmmhello 已提交
517
  STqHandle*   pHandle = NULL;
D
dapan1121 已提交
518

wmmhello's avatar
wmmhello 已提交
519 520 521 522 523 524 525 526 527 528
  while (1) {
    taosWLockLatch(&pTq->lock);
    // 1. find handle
    pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
    if (pHandle == NULL) {
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_MSG;
      taosWUnLockLatch(&pTq->lock);
      return -1;
    }
D
dapan1121 已提交
529

530 531
    // 2. check re-balance status
    if (pHandle->consumerId != consumerId) {
dengyihao's avatar
dengyihao 已提交
532 533
      tqError("ERROR tmq poll: consumer:0x%" PRIx64
              " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
534
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
535
      terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
536 537 538
      taosWUnLockLatch(&pTq->lock);
      return -1;
    }
wmmhello's avatar
wmmhello 已提交
539

wmmhello's avatar
wmmhello 已提交
540
    bool exec = tqIsHandleExec(pHandle);
dengyihao's avatar
dengyihao 已提交
541
    if (!exec) {
wmmhello's avatar
wmmhello 已提交
542
      tqSetHandleExec(pHandle);
dengyihao's avatar
dengyihao 已提交
543
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
544
      tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
dengyihao's avatar
dengyihao 已提交
545
              req.subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
546 547 548
      taosWUnLockLatch(&pTq->lock);
      break;
    }
549
    taosWUnLockLatch(&pTq->lock);
550

dengyihao's avatar
dengyihao 已提交
551 552 553
    tqDebug("tmq poll: consumer:0x%" PRIx64
            "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
            consumerId, vgId, req.subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
554
    taosMsleep(10);
D
dapan1121 已提交
555 556 557
  }

  // 3. update the epoch value
558
  if (pHandle->epoch < reqEpoch) {
dengyihao's avatar
dengyihao 已提交
559
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
X
Xiaoyu Wang 已提交
560
            reqEpoch);
D
dapan1121 已提交
561 562 563
    pHandle->epoch = reqEpoch;
  }

wmmhello's avatar
wmmhello 已提交
564
  char buf[TSDB_OFFSET_LEN] = {0};
565
  tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
D
dapan1121 已提交
566 567 568
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);

wmmhello's avatar
wmmhello 已提交
569
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
570
  tqSetHandleIdle(pHandle);
571

572
  tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId,
dengyihao's avatar
dengyihao 已提交
573
          req.subKey, pHandle);
574
  return code;
D
dapan1121 已提交
575 576
}

577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  // 2. check re-balance status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, vgId, req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
  }
  taosRUnLockLatch(&pTq->lock);

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);

wmmhello's avatar
wmmhello 已提交
614 615 616 617 618 619
  if (req.useSnapshot == true) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_PARA;
    tDeleteMqDataRsp(&dataRsp);
    return -1;
  }
620

wmmhello's avatar
wmmhello 已提交
621
  dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
622

wmmhello's avatar
wmmhello 已提交
623 624
  if (reqOffset.type == TMQ_OFFSET__LOG) {
    dataRsp.rspOffset.version = reqOffset.version;
625 626 627 628 629 630 631 632 633
  } else if(reqOffset.type < 0){
    STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
    if (pOffset != NULL) {
      if (pOffset->val.type != TMQ_OFFSET__LOG) {
        tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey);
        terrno = TSDB_CODE_INVALID_PARA;
        tDeleteMqDataRsp(&dataRsp);
        return -1;
      }
634

635 636 637 638 639 640 641
      dataRsp.rspOffset.version = pOffset->val.version;
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
    }else{
      if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
        dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
      } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
        dataRsp.rspOffset.version = ever;
642
      }
643
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
644
    }
wmmhello's avatar
wmmhello 已提交
645 646 647 648 649 650
  } else {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
            reqOffset.type);
    terrno = TSDB_CODE_INVALID_PARA;
    tDeleteMqDataRsp(&dataRsp);
    return -1;
651 652 653
  }

  tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
wmmhello's avatar
wmmhello 已提交
654
  tDeleteMqDataRsp(&dataRsp);
655 656 657
  return 0;
}

658
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
659
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
dengyihao's avatar
dengyihao 已提交
660
  int32_t        vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
661

662
  tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
663
  int32_t code = 0;
L
Liu Jicong 已提交
664

wmmhello's avatar
wmmhello 已提交
665
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
666 667
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
668
    while (tqIsHandleExec(pHandle)) {
dengyihao's avatar
dengyihao 已提交
669 670
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId,
              pHandle->subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
671
      taosMsleep(10);
672
    }
673

L
Liu Jicong 已提交
674 675 676
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
677

L
Liu Jicong 已提交
678 679 680 681
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
682
  }
683

L
Liu Jicong 已提交
684 685
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
686
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
687
  }
L
Liu Jicong 已提交
688

L
Liu Jicong 已提交
689
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
690
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
691
  }
wmmhello's avatar
wmmhello 已提交
692 693
  taosWUnLockLatch(&pTq->lock);

L
Liu Jicong 已提交
694
  return 0;
L
Liu Jicong 已提交
695 696
}

697
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
698 699
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
700
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
701
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
702 703 704 705
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
706 707 708 709 710
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
711 712 713 714 715 716
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

717
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
718 719 720 721 722 723 724 725 726 727 728
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

729
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
dengyihao's avatar
dengyihao 已提交
730
  int         ret = 0;
L
Liu Jicong 已提交
731
  SMqRebVgReq req = {0};
dengyihao's avatar
dengyihao 已提交
732
  SDecoder    dc = {0};
733 734 735 736 737 738 739 740 741

  tDecoderInit(&dc, msg, msgLen);

  // decode req
  if (tDecodeSMqRebVgReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
  }
L
Liu Jicong 已提交
742

743
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
D
dapan1121 已提交
744
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
745

746 747 748 749 750 751 752 753
  STqHandle* pHandle = NULL;
  while(1){
    pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
    if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0){
      break;
    }
  }

L
Liu Jicong 已提交
754
  if (pHandle == NULL) {
L
Liu Jicong 已提交
755
    if (req.oldConsumerId != -1) {
756
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
757
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
758
    }
L
Liu Jicong 已提交
759
    if (req.newConsumerId == -1) {
760
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
761
      goto end;
L
Liu Jicong 已提交
762
    }
763 764 765 766
    STqHandle handle = {0};
    ret = tqCreateHandle(pTq, &req, &handle);
    if(ret < 0){
      tqDestroyTqHandle(&handle);
767
      goto end;
768
    }
769
    ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
L
Liu Jicong 已提交
770
  } else {
771
    taosWLockLatch(&pTq->lock);
wmmhello's avatar
wmmhello 已提交
772

D
dapan1121 已提交
773
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
dengyihao's avatar
dengyihao 已提交
774 775
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId,
             req.newConsumerId);
776 777 778
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
779 780
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
    }
dengyihao's avatar
dengyihao 已提交
781
    //    atomic_add_fetch_32(&pHandle->epoch, 1);
782

783
    // kill executing task
dengyihao's avatar
dengyihao 已提交
784 785 786 787 788 789 790 791 792 793
    //    if(tqIsHandleExec(pHandle)) {
    //      qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
    //      if (pTaskInfo != NULL) {
    //        qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
    //      }

    //      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
    //        qStreamCloseTsdbReader(pTaskInfo);
    //      }
    //    }
wmmhello's avatar
wmmhello 已提交
794 795
    // remove if it has been register in the push manager, and return one empty block to consumer
    tqUnregisterPushHandle(pTq, pHandle);
796
    taosWUnLockLatch(&pTq->lock);
797
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
L
Liu Jicong 已提交
798
  }
L
Liu Jicong 已提交
799

800
end:
801
  tDecoderClear(&dc);
802
  return ret;
L
Liu Jicong 已提交
803
}
804

dengyihao's avatar
dengyihao 已提交
805
void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
L
liuyao 已提交
806

807
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
D
dapan1121 已提交
808
  int32_t vgId = TD_VID(pTq->pVnode);
809

810
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
811
  pTask->refCnt = 1;
812
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
dengyihao's avatar
dengyihao 已提交
813
  pTask->inputQueue = streamQueueOpen(512 << 10);
814
  pTask->outputInfo.queue = streamQueueOpen(512 << 10);
L
Liu Jicong 已提交
815

816
  if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) {
H
Haojun Liao 已提交
817
    tqError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
L
Liu Jicong 已提交
818
    return -1;
L
Liu Jicong 已提交
819 820
  }

821
  pTask->initTs = taosGetTimestampMs();
L
Liu Jicong 已提交
822
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
823
  pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
824
  pTask->pMsgCb = &pTq->pVnode->msgCb;
825
  pTask->pMeta = pTq->pStreamMeta;
826

827
  // backup the initial status, and set it to be TASK_STATUS__INIT
828
  pTask->chkInfo.version = ver;
829
  pTask->chkInfo.currentVer = ver;
830

831 832
  pTask->dataRange.range.maxVer = ver;
  pTask->dataRange.range.minVer = ver;
833

834
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
L
liuyao 已提交
835
    SStreamTask* pSateTask = pTask;
L
liuyao 已提交
836
    SStreamTask task = {0};
L
liuyao 已提交
837
    if (pTask->info.fillHistory) {
L
liuyao 已提交
838 839 840
      task.id = pTask->streamTaskId;
      task.pMeta = pTask->pMeta;
      pSateTask = &task;
L
liuyao 已提交
841
    }
842

L
liuyao 已提交
843
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
844 845 846 847
    if (pTask->pState == NULL) {
      return -1;
    }

L
liuyao 已提交
848 849 850 851 852
    SReadHandle handle = {.vnode = pTq->pVnode,
                          .initTqReader = 1,
                          .pStateBackend = pTask->pState,
                          .fillHistory = pTask->info.fillHistory,
                          .winRange = pTask->dataRange.window};
853
    initStorageAPI(&handle.api);
854

855 856
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
857 858
      return -1;
    }
859

860
    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
861
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
L
liuyao 已提交
862
    SStreamTask* pSateTask = pTask;
L
liuyao 已提交
863
    SStreamTask task = {0};
L
liuyao 已提交
864
    if (pTask->info.fillHistory) {
L
liuyao 已提交
865 866 867
      task.id = pTask->streamTaskId;
      task.pMeta = pTask->pMeta;
      pSateTask = &task;
L
liuyao 已提交
868
    }
L
liuyao 已提交
869
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
870 871 872
    if (pTask->pState == NULL) {
      return -1;
    }
873

874
    int32_t     numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList);
L
liuyao 已提交
875 876 877 878 879
    SReadHandle handle = {.vnode = NULL,
                          .numOfVgroups = numOfVgroups,
                          .pStateBackend = pTask->pState,
                          .fillHistory = pTask->info.fillHistory,
                          .winRange = pTask->dataRange.window};
880
    initStorageAPI(&handle.api);
881

882
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
883
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
884 885
      return -1;
    }
886
    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
L
Liu Jicong 已提交
887
  }
L
Liu Jicong 已提交
888 889

  // sink
890
  if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
891
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
892
    pTask->smaSink.smaSink = smaHandleRes;
893
  } else if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
894
    pTask->tbSink.vnode = pTq->pVnode;
H
Haojun Liao 已提交
895
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
L
Liu Jicong 已提交
896

X
Xiaoyu Wang 已提交
897
    int32_t   ver1 = 1;
5
54liuyao 已提交
898
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
899
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
900
    if (code == TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
901
      ver1 = info.skmVer;
5
54liuyao 已提交
902
    }
L
Liu Jicong 已提交
903

904 905
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
906
    if (pTask->tbSink.pTSchema == NULL) {
D
dapan1121 已提交
907 908
      return -1;
    }
L
liuyao 已提交
909 910
    pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
    tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
L
Liu Jicong 已提交
911
  }
912

913
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
914
    SWalFilterCond cond = {.deleteMsg = 1};  // delete msg also extract from wal files
915
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
916 917
  }

918 919 920 921 922 923
  // reset the task status from unfinished transaction
  if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
    tqWarn("s-task:%s reset task status to be normal, kept in meta status: Paused", pTask->id.idStr);
    pTask->status.taskStatus = TASK_STATUS__NORMAL;
  }

H
Haojun Liao 已提交
924
  taosThreadMutexInit(&pTask->lock, NULL);
925
  streamSetupScheduleTrigger(pTask);
926

927
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64
928
         " child id:%d, level:%d, fill-history:%d, trigger:%" PRId64 " ms, disable pause",
929
         vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel,
930
         pTask->info.fillHistory, pTask->triggerParam);
931 932 933

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
934
  return 0;
L
Liu Jicong 已提交
935
}
L
Liu Jicong 已提交
936

937
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
938 939 940 941
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

942 943
  SStreamTaskCheckReq req;
  SDecoder            decoder;
944

X
Xiaoyu Wang 已提交
945
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
946
  tDecodeStreamTaskCheckReq(&decoder, &req);
947
  tDecoderClear(&decoder);
948

949 950
  int32_t taskId = req.downstreamTaskId;

951 952 953 954 955 956 957 958 959
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
960

L
Liu Jicong 已提交
961
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
962
  if (pTask != NULL) {
963
    rsp.status = streamTaskCheckStatus(pTask);
964 965
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

966
    tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d",
967 968
            pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status);
969 970
  } else {
    rsp.status = 0;
971 972
    tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
            taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
973 974
  }

975
  return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId);
976 977
}

978 979 980 981
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
  char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t len = pMsg->contLen - sizeof(SMsgHead);

982 983 984 985
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
986
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
987
  code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
988

989 990 991 992 993
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

994
  tDecoderClear(&decoder);
995 996
  tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
          rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
997

L
Liu Jicong 已提交
998
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
999
  if (pTask == NULL) {
1000
    tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
1001
            pTq->pStreamMeta->vgId);
1002
    terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
1003 1004 1005
    return -1;
  }

1006
  code = streamProcessCheckRsp(pTask, &rsp);
L
Liu Jicong 已提交
1007 1008
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
1009 1010
}

1011
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1012 1013 1014
  int32_t code = 0;
  int32_t vgId = TD_VID(pTq->pVnode);

5
54liuyao 已提交
1015 1016 1017
  if (tsDisableStream) {
    return 0;
  }
1018 1019 1020 1021

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
1022
    terrno = TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
1023 1024
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId,
            (int32_t)sizeof(SStreamTask));
1025 1026
    return -1;
  }
1027

1028 1029
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1030
  code = tDecodeStreamTask(&decoder, pTask);
1031 1032 1033 1034 1035
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
1036

1037 1038
  tDecoderClear(&decoder);

1039 1040
  SStreamMeta* pStreamMeta = pTq->pStreamMeta;

1041
  // 2.save task, use the newest commit version as the initial start version of stream task.
1042
  int32_t taskId = 0;
1043 1044 1045
  taosWLockLatch(&pStreamMeta->lock);
  code = streamMetaAddDeployedTask(pStreamMeta, sversion, pTask);

1046
  taskId = pTask->id.taskId;
1047
  int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
1048
  if (code < 0) {
1049
    tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
1050
    taosWUnLockLatch(&pStreamMeta->lock);
1051 1052 1053
    return -1;
  }

1054
  taosWUnLockLatch(&pStreamMeta->lock);
1055 1056
  tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr,
          streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
1057

1058
  // 3. It's an fill history task, do nothing. wait for the main task to start it
1059 1060 1061 1062 1063 1064
  SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId);
  if (p != NULL) {
    streamTaskCheckDownstreamTasks(pTask);
  }

  streamMetaReleaseTask(pStreamMeta, p);
1065 1066 1067
  return 0;
}

1068
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
1069
  int32_t code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1070 1071
  char*   msg = pMsg->pCont;

1072
  SStreamMeta*           pMeta = pTq->pStreamMeta;
1073
  SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg;
1074 1075

  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
1076
  if (pTask == NULL) {
1077 1078
    tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
            pMeta->vgId, pReq->taskId);
1079 1080 1081 1082
    return -1;
  }

  // do recovery step 1
1083
  const char* id = pTask->id.idStr;
1084 1085
  const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
  tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, pStatus);
1086

H
Haojun Liao 已提交
1087
  int64_t st = taosGetTimestampMs();
1088

1089
  // we have to continue retrying to successfully execute the scan history task.
1090 1091 1092 1093 1094 1095 1096 1097
  int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
                                                     TASK_SCHED_STATUS__WAITING);
  if (schedStatus != TASK_SCHED_STATUS__INACTIVE) {
    tqError(
        "s-task:%s failed to start scan-history in first stream time window since already started, unexpected "
        "sched-status:%d",
        id, schedStatus);
    return 0;
1098
  }
1099

1100 1101 1102 1103 1104 1105
  ASSERT(pTask->status.pauseAllowed == false);

  if (pTask->info.fillHistory == 1) {
    streamTaskEnablePause(pTask);
  }

1106
  if (!streamTaskRecoverScanStep1Finished(pTask)) {
L
liuyao 已提交
1107 1108
    streamSourceScanHistoryData(pTask);
  }
H
Haojun Liao 已提交
1109

1110 1111 1112
  // disable the pause when handling the step2 scan of tsdb data.
  // the whole next procedure cann't be stopped.
  // todo fix it: the following procedure should be executed completed and then shutdown when trying to close vnode.
1113 1114 1115 1116
  if (pTask->info.fillHistory == 1) {
    streamTaskDisablePause(pTask);
  }

L
liuyao 已提交
1117
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
1118
    tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
L
liuyao 已提交
1119
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
1120
    streamMetaReleaseTask(pMeta, pTask);
L
Liu Jicong 已提交
1121 1122 1123
    return 0;
  }

H
Haojun Liao 已提交
1124
  double el = (taosGetTimestampMs() - st) / 1000.0;
1125
  tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", id, el);
H
Haojun Liao 已提交
1126

H
Haojun Liao 已提交
1127
  if (pTask->info.fillHistory) {
L
liuyao 已提交
1128
    SVersionRange* pRange = NULL;
1129
    SStreamTask*   pStreamTask = NULL;
1130

L
liuyao 已提交
1131 1132 1133 1134
    if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
      // 1. stop the related stream task, get the current scan wal version of stream task, ver.
      pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
      if (pStreamTask == NULL) {
1135
        qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s",
1136 1137 1138
               pTask->streamTaskId.taskId, pTask->id.idStr);

        pTask->status.taskStatus = TASK_STATUS__DROPPING;
1139
        tqDebug("s-task:%s scan-history-task set status to be dropping", id);
1140 1141 1142 1143

        streamMetaSaveTask(pMeta, pTask);
        streamMetaReleaseTask(pMeta, pTask);
        return -1;
L
liuyao 已提交
1144
      }
L
Liu Jicong 已提交
1145

L
liuyao 已提交
1146
      ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
L
Liu Jicong 已提交
1147

1148
      // stream task in TASK_STATUS__SCAN_HISTORY can not be paused.
L
liuyao 已提交
1149
      // wait for the stream task get ready for scan history data
1150
      while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
1151 1152
        tqDebug(
            "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
1153
            id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
L
liuyao 已提交
1154 1155
        taosMsleep(100);
      }
1156

L
liuyao 已提交
1157
      // now we can stop the stream task execution
1158 1159 1160
      streamTaskHalt(pStreamTask);
      tqDebug("s-task:%s level:%d is halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel,
              id);
H
Haojun Liao 已提交
1161

L
liuyao 已提交
1162
      // if it's an source task, extract the last version in wal.
1163 1164 1165
      pRange = &pTask->dataRange.range;
      int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
      streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
L
liuyao 已提交
1166
    }
1167

L
liuyao 已提交
1168
    if (!streamTaskRecoverScanStep1Finished(pTask)) {
1169
      tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s",
1170
              id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, id);
1171
      ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
L
Liu Jicong 已提交
1172

1173
      st = taosGetTimestampMs();
L
liuyao 已提交
1174 1175
      streamSetParamForStreamScannerStep2(pTask, pRange, &pTask->dataRange.window);
    }
1176

1177
    if (!streamTaskRecoverScanStep2Finished(pTask)) {
1178
      streamSourceScanHistoryData(pTask);
1179

L
liuyao 已提交
1180
      if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
1181
        tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
1182 1183 1184
        streamMetaReleaseTask(pMeta, pTask);
        return 0;
      }
1185

L
liuyao 已提交
1186
      streamTaskRecoverSetAllStepFinished(pTask);
1187
    }
H
Haojun Liao 已提交
1188

L
liuyao 已提交
1189
    el = (taosGetTimestampMs() - st) / 1000.0;
1190
    tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", id, el);
L
liuyao 已提交
1191

1192
    // 3. notify downstream tasks to transfer executor state after handle all history blocks.
L
liuyao 已提交
1193 1194 1195 1196 1197
    if (!pTask->status.transferState) {
      code = streamDispatchTransferStateMsg(pTask);
      if (code != TSDB_CODE_SUCCESS) {
        // todo handle error
      }
1198 1199

      pTask->status.transferState = true;
H
Haojun Liao 已提交
1200
    }
H
Haojun Liao 已提交
1201 1202 1203

    // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
    // 5. resume the related stream task.
1204
    streamTryExec(pTask);
H
Haojun Liao 已提交
1205

1206
    streamMetaReleaseTask(pMeta, pTask);
1207
    streamMetaReleaseTask(pMeta, pStreamTask);
1208 1209 1210 1211
  } else {
    // todo update the chkInfo version for current task.
    // this task has an associated history stream task, so we need to scan wal from the end version of
    // history scan. The current version of chkInfo.current is not updated during the history scan
1212 1213
    STimeWindow* pWindow = &pTask->dataRange.window;

1214
    if (pTask->historyTaskId.taskId == 0) {
1215
      *pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
1216
      tqDebug(
1217
          "s-task:%s scan history in stream time window completed, no related fill history task, reset the time "
1218 1219
          "window:%" PRId64 " - %" PRId64,
          id, pWindow->skey, pWindow->ekey);
1220
    } else {
1221
      tqDebug(
1222
          "s-task:%s scan history in stream time window completed, now start to handle data from WAL, start "
1223
          "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
1224
          id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
1225
    }
1226

1227
    // notify the downstream agg tasks that upstream tasks are ready to processing the WAL data, update the
1228 1229
    code = streamTaskScanHistoryDataComplete(pTask);
    streamMetaReleaseTask(pMeta, pTask);
1230

1231 1232
    // when all source task complete to scan history data in stream time window, they are allowed to handle stream data
    // at the same time.
1233 1234
    return code;
  }
1235 1236 1237 1238

  return 0;
}

H
Haojun Liao 已提交
1239
// notify the downstream tasks to transfer executor state after handle all history blocks.
H
Haojun Liao 已提交
1240 1241 1242 1243 1244
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t len = pMsg->contLen - sizeof(SMsgHead);

  SStreamTransferReq req = {0};
H
Haojun Liao 已提交
1245

1246
  SDecoder decoder;
H
Haojun Liao 已提交
1247
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
1248
  int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
H
Haojun Liao 已提交
1249
  tDecoderClear(&decoder);
H
Haojun Liao 已提交
1250

1251
  tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId);
1252

1253
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId);
1254
  if (pTask == NULL) {
1255
    tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId);
1256 1257 1258
    return -1;
  }

H
Haojun Liao 已提交
1259 1260
  int32_t remain = streamAlignTransferState(pTask);
  if (remain > 0) {
1261
    tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain);
H
Haojun Liao 已提交
1262 1263 1264
    return 0;
  }

L
liuyao 已提交
1265
  // transfer the ownership of executor state
1266 1267
  tqDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", pTask->id.idStr);
  ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
1268

1269
  pTask->status.transferState = true;
L
Liu Jicong 已提交
1270

H
Haojun Liao 已提交
1271
  streamSchedExec(pTask);
L
Liu Jicong 已提交
1272
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1273 1274 1275
  return 0;
}

1276
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
1277 1278
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1279 1280

  // deserialize
1281
  SStreamScanHistoryFinishReq req = {0};
1282 1283

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1284
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1285
  tDecodeStreamScanHistoryFinishReq(&decoder, &req);
1286 1287
  tDecoderClear(&decoder);

1288
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId);
1289
  if (pTask == NULL) {
1290 1291
    tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed",
            pTq->pStreamMeta->vgId, req.downstreamTaskId);
1292 1293 1294
    return -1;
  }

1295 1296 1297
  tqDebug("s-task:%s receive scan-history finish msg from task:0x%x", pTask->id.idStr, req.upstreamTaskId);

  int32_t code = streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info);
L
Liu Jicong 已提交
1298
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1299
  return code;
L
Liu Jicong 已提交
1300
}
L
Liu Jicong 已提交
1301

1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

  // deserialize
  SStreamCompleteHistoryMsg req = {0};

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  tDecodeCompleteHistoryDataMsg(&decoder, &req);
  tDecoderClear(&decoder);

  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.upstreamTaskId);
  if (pTask == NULL) {
    tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed",
            pTq->pStreamMeta->vgId, req.upstreamTaskId);
    return -1;
  }

1321
  tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x", pTask->id.idStr, req.downstreamId);
1322 1323 1324 1325 1326

  int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
  if (remain > 0) {
    tqDebug("s-task:%s remain:%d not send finish rsp", pTask->id.idStr, remain);
  } else {
1327
    tqDebug("s-task:%s all downstream tasks rsp scan-history completed msg", pTask->id.idStr);
1328 1329 1330 1331
    streamProcessScanHistoryFinishRsp(pTask);
  }

  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1332 1333 1334
  return 0;
}

1335 1336 1337 1338
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

H
Haojun Liao 已提交
1339
  (*pRefBlock) = NULL;
H
Haojun Liao 已提交
1340

1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tDecoderInit(pCoder, (uint8_t*)pData, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
  if (numOfTables == 0 || pRes->affectedRows == 0) {
    taosArrayDestroy(pRes->uidList);
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, numOfTables);
  pDelBlock->info.rows = numOfTables;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < numOfTables; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataSetVal(pUidCol, i, (const char*)pUid, false);

    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

  taosArrayDestroy(pRes->uidList);
  *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
H
Haojun Liao 已提交
1379
  if ((*pRefBlock) == NULL) {
1380 1381 1382 1383 1384 1385 1386 1387
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
  (*pRefBlock)->pBlock = pDelBlock;
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1388 1389
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1390 1391 1392 1393

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1394 1395 1396 1397 1398 1399
  if (taskId == STREAM_TASK_STATUS_CHECK_ID) {
    tqStreamTasksStatusCheck(pTq);
    return 0;
  }

  if (taskId == EXTRACT_DATA_FROM_WAL_ID) {  // all tasks are extracted submit data from the wal
1400
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1401
    return 0;
1402
  }
1403

1404
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1405
  if (pTask != NULL) {
1406
    // even in halt status, the data in inputQ must be processed
1407
    int8_t status = pTask->status.taskStatus;
1408
    if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) {
1409
      tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
dengyihao's avatar
dengyihao 已提交
1410
              pTask->chkInfo.version);
1411
      streamProcessRunReq(pTask);
1412
    } else {
1413
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
1414 1415
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
              pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
1416
    }
1417

L
Liu Jicong 已提交
1418
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1419
    tqStartStreamTasks(pTq);
L
Liu Jicong 已提交
1420
    return 0;
1421
  } else {
1422
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
1423
    return -1;
L
Liu Jicong 已提交
1424
  }
L
Liu Jicong 已提交
1425 1426
}

L
Liu Jicong 已提交
1427
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
dengyihao's avatar
dengyihao 已提交
1428 1429 1430
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1431 1432 1433 1434

  SStreamDispatchReq req = {0};

  SDecoder decoder;
L
Liu Jicong 已提交
1435
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1436
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1437

1438
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1439
  if (pTask) {
1440
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1441
    streamProcessDispatchMsg(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1442
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1443
    return 0;
1444
  } else {
L
liuyao 已提交
1445
    tDeleteStreamDispatchReq(&req);
1446
    return -1;
L
Liu Jicong 已提交
1447
  }
L
Liu Jicong 已提交
1448 1449
}

L
Liu Jicong 已提交
1450 1451
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1452
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1453
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1454 1455

  int32_t vgId = pTq->pStreamMeta->vgId;
L
Liu Jicong 已提交
1456
  if (pTask) {
1457
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1458
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1459
    return 0;
1460
  } else {
1461
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
1462
    return TSDB_CODE_INVALID_MSG;
L
Liu Jicong 已提交
1463
  }
L
Liu Jicong 已提交
1464
}
L
Liu Jicong 已提交
1465

1466
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1467
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1468 1469
  tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);

1470
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1471
  return 0;
L
Liu Jicong 已提交
1472
}
L
Liu Jicong 已提交
1473

5
54liuyao 已提交
1474 1475
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
1476 1477 1478 1479 1480 1481

  SStreamMeta* pMeta = pTq->pStreamMeta;
  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
  if (pTask == NULL) {
    tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
            pReq->taskId);
1482 1483 1484

    // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
    return TSDB_CODE_SUCCESS;
1485 1486 1487
  }

  tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
1488
  streamTaskPause(pTask);
1489 1490 1491 1492 1493

  SStreamTask* pHistoryTask = NULL;
  if (pTask->historyTaskId.taskId != 0) {
    pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
    if (pHistoryTask == NULL) {
1494 1495
      tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already. Pause success",
              pMeta->vgId, pTask->historyTaskId.taskId);
1496 1497

      streamMetaReleaseTask(pMeta, pTask);
1498 1499 1500

      // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
      return TSDB_CODE_SUCCESS;
1501 1502
    }

1503 1504
    tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
    streamTaskPause(pHistoryTask);
1505 1506 1507 1508 1509
  }

  streamMetaReleaseTask(pMeta, pTask);
  if (pHistoryTask != NULL) {
    streamMetaReleaseTask(pMeta, pHistoryTask);
L
liuyao 已提交
1510
  }
1511

1512
  return TSDB_CODE_SUCCESS;
L
liuyao 已提交
1513 1514 1515 1516
}

int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int8_t igUntreated) {
  int32_t vgId = pTq->pStreamMeta->vgId;
1517 1518 1519
  if (pTask == NULL) {
    return -1;
  }
L
liuyao 已提交
1520

1521 1522
  // todo: handle the case: resume from halt to pause/ from halt to normal/ from pause to normal
  streamTaskResume(pTask);
1523

1524 1525 1526
  int32_t level = pTask->info.taskLevel;
  int8_t  status = pTask->status.taskStatus;
  if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) {
1527
    // no lock needs to secure the access of the version
1528
    if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
1529 1530 1531 1532 1533 1534 1535 1536 1537 1538
      // discard all the data  when the stream task is suspended.
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
              ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
    } else {  // from the previous paused version and go on
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
    }

1539
    if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory) {
1540
      streamStartRecoverTask(pTask, igUntreated);
1541
    } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
1542 1543 1544
      tqStartStreamTasks(pTq);
    } else {
      streamSchedExec(pTask);
L
liuyao 已提交
1545
    }
L
liuyao 已提交
1546
  }
1547 1548

  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
5
54liuyao 已提交
1549 1550 1551 1552 1553
  return 0;
}

int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
L
liuyao 已提交
1554
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
L
liuyao 已提交
1555 1556 1557
  int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
  if (code != 0) {
    return code;
L
liuyao 已提交
1558
  }
1559

L
liuyao 已提交
1560
  SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId);
L
liuyao 已提交
1561 1562 1563
  if (pHistoryTask) {
    code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
  }
1564

L
liuyao 已提交
1565
  return code;
5
54liuyao 已提交
1566 1567
}

L
Liu Jicong 已提交
1568
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
1569 1570 1571 1572 1573
  char*    msgStr = pMsg->pCont;
  char*    msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t  msgLen = pMsg->contLen - sizeof(SMsgHead);
  SDecoder decoder;

L
Liu Jicong 已提交
1574
  SStreamRetrieveReq req;
1575
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1576
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1577
  tDecoderClear(&decoder);
1578

L
Liu Jicong 已提交
1579
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1580
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1581

L
Liu Jicong 已提交
1582
  if (pTask) {
1583
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
1584
    streamProcessRetrieveReq(pTask, &req, &rsp);
1585

L
Liu Jicong 已提交
1586
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1587
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1588
    return 0;
L
Liu Jicong 已提交
1589
  } else {
L
liuyao 已提交
1590
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1591
    return -1;
L
Liu Jicong 已提交
1592 1593 1594 1595 1596 1597 1598
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1599

1600 1601 1602 1603 1604 1605
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1606 1607 1608

  SStreamDispatchReq req;
  SDecoder           decoder;
1609
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1610 1611
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1612
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1613 1614
    goto FAIL;
  }
L
Liu Jicong 已提交
1615
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1616

L
Liu Jicong 已提交
1617
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1618
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1619
  if (pTask != NULL) {
1620
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1621
    streamProcessDispatchMsg(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1622
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1623 1624
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1625
    return 0;
5
54liuyao 已提交
1626
  } else {
1627

5
54liuyao 已提交
1628
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1629
  }
L
Liu Jicong 已提交
1630

1631 1632
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1633
FAIL:
1634 1635 1636 1637
  if (pMsg->info.handle == NULL) {
    tqError("s-task:0x%x vgId:%d msg handle is null, abort enqueue dispatch msg", pTq->pStreamMeta->vgId, taskId);
    return -1;
  }
1638 1639 1640

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1641
    SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info};
1642
    tqError("s-task:0x%x send dispatch error rsp, code:%s", taskId, tstrerror(code));
1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

1658 1659 1660 1661
  int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
  SRpcMsg rsp = { .code = code, .info = pMsg->info, .contLen = len, .pCont = pRspHead};
  tqError("s-task:0x%x send dispatch error rsp, code:%s", taskId, tstrerror(code));

L
Liu Jicong 已提交
1662
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1663 1664
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1665
  return -1;
L
Liu Jicong 已提交
1666
}
L
Liu Jicong 已提交
1667

1668
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1669