tq.c 54.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
static int32_t tqInitialize(STQ* pTq);
dengyihao's avatar
dengyihao 已提交
22

wmmhello's avatar
wmmhello 已提交
23
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
dengyihao's avatar
dengyihao 已提交
24 25
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
wmmhello's avatar
wmmhello 已提交
26

L
Liu Jicong 已提交
27
int32_t tqInit() {
L
Liu Jicong 已提交
28 29 30 31 32 33
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

34 35 36 37 38 39
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
40 41 42
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
43
    atomic_store_8(&tqMgmt.inited, 1);
44
  }
45

L
Liu Jicong 已提交
46 47
  return 0;
}
L
Liu Jicong 已提交
48

49
void tqCleanUp() {
L
Liu Jicong 已提交
50 51 52 53 54 55 56 57
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
58
    streamCleanUp();
L
Liu Jicong 已提交
59 60
    atomic_store_8(&tqMgmt.inited, 0);
  }
61
}
L
Liu Jicong 已提交
62

63
static void destroyTqHandle(void* data) {
64 65
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
wmmhello's avatar
wmmhello 已提交
66

67
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
68
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
69
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
70
    tqReaderClose(pData->execHandle.pTqReader);
71 72
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
73
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
74
    walCloseReader(pData->pWalReader);
75
    tqReaderClose(pData->execHandle.pTqReader);
76 77
    taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
    nodesDestroyNode(pData->execHandle.execTb.node);
78
  }
dengyihao's avatar
dengyihao 已提交
79
  if (pData->msg != NULL) {
80 81 82
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
D
dapan1121 已提交
83
  }
L
Liu Jicong 已提交
84 85
}

86 87 88 89 90
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
91
STQ* tqOpen(const char* path, SVnode* pVnode) {
92
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
93
  if (pTq == NULL) {
S
Shengliang Guan 已提交
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
95 96
    return NULL;
  }
97

98
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
99
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
100
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
101

102
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
103
  taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
104

105
  taosInitRWLatch(&pTq->lock);
106
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
107

108
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
109
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
110

111 112 113 114 115 116 117
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
118 119 120
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
121
  if (tqMetaOpen(pTq) < 0) {
122
    return -1;
123 124
  }

L
Liu Jicong 已提交
125 126
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
127
    return -1;
128 129
  }

130
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
131
  if (pTq->pStreamMeta == NULL) {
132
    return -1;
L
Liu Jicong 已提交
133 134
  }

135 136
  // the version is kept in task's meta data
  // todo check if this version is required or not
137 138
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
139 140
  }

141
  return 0;
L
Liu Jicong 已提交
142
}
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
void tqClose(STQ* pTq) {
145 146
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
147
  }
148 149 150 151 152 153 154 155 156

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
157
}
L
Liu Jicong 已提交
158

H
Haojun Liao 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
      tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
172 173 174
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
175
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
176
      int64_t el = taosGetTimestampMs() - st;
H
Haojun Liao 已提交
177
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
178 179 180 181 182 183
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
  }
}

D
dapan1121 已提交
184 185
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
186 187
  int32_t len = 0;
  int32_t code = 0;
D
dapan1121 已提交
188 189

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
190
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
D
dapan1121 已提交
191 192 193
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
194 195 196 197 198 199 200 201 202 203 204

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

D
dapan1121 已提交
205 206 207
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
208 209 210 211 212 213

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);

D
dapan1121 已提交
214
  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
215
    tEncodeMqDataRsp(&encoder, pRsp);
D
dapan1121 已提交
216
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
X
Xiaoyu Wang 已提交
217
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
dengyihao's avatar
dengyihao 已提交
218
  }
L
Liu Jicong 已提交
219

wmmhello's avatar
wmmhello 已提交
220
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
221 222

  SRpcMsg rsp = {
D
dapan1121 已提交
223
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
224 225 226 227
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
L
Liu Jicong 已提交
228

L
Liu Jicong 已提交
229
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
230 231
  return 0;
}
L
Liu Jicong 已提交
232

H
Haojun Liao 已提交
233
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
234 235 236 237
  SMqDataRsp dataRsp = {0};
  dataRsp.head.consumerId = pHandle->consumerId;
  dataRsp.head.epoch = pHandle->epoch;
  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
238 239

  int64_t sver = 0, ever = 0;
H
Haojun Liao 已提交
240
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
dengyihao's avatar
dengyihao 已提交
241 242
  tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
                  ever);
D
dapan1121 已提交
243 244 245

  char buf1[80] = {0};
  char buf2[80] = {0};
246 247
  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
dengyihao's avatar
dengyihao 已提交
248 249
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
          dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
L
Liu Jicong 已提交
250 251 252
  return 0;
}

253 254 255 256 257 258
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
                      int32_t type, int32_t vgId) {
  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
259

D
dapan1121 已提交
260 261 262 263
  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);
264

dengyihao's avatar
dengyihao 已提交
265 266
  tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId,
          pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
267 268 269 270

  return 0;
}

271
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
272 273
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
274

X
Xiaoyu Wang 已提交
275 276
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
277
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
278 279
    return -1;
  }
280

281 282
  tDecoderClear(&decoder);

283 284 285
  STqOffset* pOffset = &vgOffset.offset;

  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
286
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
287 288 289 290 291 292
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
            pOffset->val.version);
    if (pOffset->val.version + 1 == sversion) {
      pOffset->val.version += 1;
293
    }
294
  } else {
295
    tqError("invalid commit offset type:%d", pOffset->val.type);
296
    return -1;
297
  }
298

299 300
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
301
    tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
302
            vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
303
    return 0;  // no need to update the offset value
304 305
  }

306
  // save the new offset value
307
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
308
    return -1;
309
  }
310

311 312 313
  return 0;
}

314
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
315 316
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
317 318 319

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
320
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
H
Haojun Liao 已提交
321
    tqError("vgId:%d failed to decode seek msg", vgId);
322 323 324 325 326
    return -1;
  }

  tDecoderClear(&decoder);

H
Haojun Liao 已提交
327 328 329
  tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
          vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);

330 331 332
  STqOffset* pOffset = &vgOffset.offset;
  if (pOffset->val.type != TMQ_OFFSET__LOG) {
    tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
333 334 335
    return -1;
  }

336 337
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
  if (pHandle == NULL) {
dengyihao's avatar
dengyihao 已提交
338
    tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey);
339 340
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
341 342
  }

343 344 345 346 347 348 349 350
  // 2. check consumer-vg assignment status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != vgOffset.consumerId) {
    tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
351
  }
352 353
  taosRUnLockLatch(&pTq->lock);

dengyihao's avatar
dengyihao 已提交
354
  // 3. check the offset info
355 356 357 358 359 360
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL) {
    if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
      return 0;  // no need to update the offset value
    }
361

362 363 364 365 366 367
    if (pSavedOffset->val.version == pOffset->val.version) {
      tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
              pOffset->val.version, pSavedOffset->val.version);
      return 0;
    }
  }
368 369 370

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
371 372 373 374
  if (pOffset->val.version < sver) {
    pOffset->val.version = sver;
  } else if (pOffset->val.version > ever) {
    pOffset->val.version = ever;
375 376 377
  }

  // save the new offset value
378 379 380 381
  if (pSavedOffset != NULL) {
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
            pSavedOffset->val.version);
  } else {
dengyihao's avatar
dengyihao 已提交
382
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version);
383
  }
384

385 386
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
    tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
387 388 389
    return -1;
  }

H
Haojun Liao 已提交
390 391 392
  tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
          vgOffset.consumerId, vgOffset.offset.val.version);

393 394 395
  return 0;
}

L
Liu Jicong 已提交
396
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
397
  void* pIter = NULL;
398

L
Liu Jicong 已提交
399
  while (1) {
400
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
401 402 403 404
    if (pIter == NULL) {
      break;
    }

405
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
406

L
Liu Jicong 已提交
407 408
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
409
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
410 411
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
412
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
413 414 415 416 417
          return -1;
        }
      }
    }
  }
418

L
Liu Jicong 已提交
419 420 421
  return 0;
}

D
dapan1121 已提交
422
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
423
  SMqPollReq req = {0};
dengyihao's avatar
dengyihao 已提交
424
  int        code = 0;
D
dapan1121 已提交
425 426 427 428 429 430 431 432 433 434
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);
wmmhello's avatar
wmmhello 已提交
435
  STqHandle*   pHandle = NULL;
D
dapan1121 已提交
436

wmmhello's avatar
wmmhello 已提交
437 438 439 440 441 442 443 444 445 446
  while (1) {
    taosWLockLatch(&pTq->lock);
    // 1. find handle
    pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
    if (pHandle == NULL) {
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_MSG;
      taosWUnLockLatch(&pTq->lock);
      return -1;
    }
D
dapan1121 已提交
447

448 449
    // 2. check re-balance status
    if (pHandle->consumerId != consumerId) {
dengyihao's avatar
dengyihao 已提交
450 451
      tqError("ERROR tmq poll: consumer:0x%" PRIx64
              " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
452
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
453
      terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
454 455 456
      taosWUnLockLatch(&pTq->lock);
      return -1;
    }
wmmhello's avatar
wmmhello 已提交
457

wmmhello's avatar
wmmhello 已提交
458
    bool exec = tqIsHandleExec(pHandle);
dengyihao's avatar
dengyihao 已提交
459
    if (!exec) {
wmmhello's avatar
wmmhello 已提交
460
      tqSetHandleExec(pHandle);
dengyihao's avatar
dengyihao 已提交
461 462 463
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
      tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
              req.subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
464 465 466
      taosWUnLockLatch(&pTq->lock);
      break;
    }
467
    taosWUnLockLatch(&pTq->lock);
468

dengyihao's avatar
dengyihao 已提交
469 470 471
    tqDebug("tmq poll: consumer:0x%" PRIx64
            "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
            consumerId, vgId, req.subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
472
    taosMsleep(10);
D
dapan1121 已提交
473 474 475
  }

  // 3. update the epoch value
476
  if (pHandle->epoch < reqEpoch) {
dengyihao's avatar
dengyihao 已提交
477
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
X
Xiaoyu Wang 已提交
478
            reqEpoch);
D
dapan1121 已提交
479 480 481 482 483 484 485 486
    pHandle->epoch = reqEpoch;
  }

  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);

wmmhello's avatar
wmmhello 已提交
487
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
488
  tqSetHandleIdle(pHandle);
489

dengyihao's avatar
dengyihao 已提交
490 491
  tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId,
          req.subKey, pHandle);
492
  return code;
D
dapan1121 已提交
493 494
}

495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  // 2. check re-balance status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, vgId, req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
  }
  taosRUnLockLatch(&pTq->lock);

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);

  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
  if (pOffset != NULL) {
    if (pOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
    dataRsp.rspOffset.version = pOffset->val.version;
  } else {
    if (req.useSnapshot == true) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;

553
    if (reqOffset.type == TMQ_OFFSET__LOG) {
554
      int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
dengyihao's avatar
dengyihao 已提交
555
      if (currentVer == -1) {  // not start to read data from wal yet, return req offset directly
556 557 558 559
        dataRsp.rspOffset.version = reqOffset.version;
      } else {
        dataRsp.rspOffset.version = currentVer;  // return current consume offset value
      }
560 561
    } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
      dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
562 563 564 565 566 567 568 569 570 571 572 573 574 575 576
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      dataRsp.rspOffset.version = ever;
    } else {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
              reqOffset.type);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }
  }

  tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
  return 0;
}

577
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
578
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
dengyihao's avatar
dengyihao 已提交
579
  int32_t        vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
580

581
  tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
582
  int32_t code = 0;
L
Liu Jicong 已提交
583

wmmhello's avatar
wmmhello 已提交
584
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
585 586
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
587
    while (tqIsHandleExec(pHandle)) {
dengyihao's avatar
dengyihao 已提交
588 589
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId,
              pHandle->subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
590
      taosMsleep(10);
591
    }
592

L
Liu Jicong 已提交
593 594 595
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
596

L
Liu Jicong 已提交
597 598 599 600
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
601
  }
602

L
Liu Jicong 已提交
603 604
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
605
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
606
  }
L
Liu Jicong 已提交
607

L
Liu Jicong 已提交
608
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
609
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
610
  }
wmmhello's avatar
wmmhello 已提交
611 612
  taosWUnLockLatch(&pTq->lock);

L
Liu Jicong 已提交
613
  return 0;
L
Liu Jicong 已提交
614 615
}

616
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
617 618
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
619
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
620
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
621 622 623 624
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
625 626 627 628 629
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
630 631 632 633 634 635
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

636
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
637 638 639 640 641 642 643 644 645 646 647
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

648
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
dengyihao's avatar
dengyihao 已提交
649
  int         ret = 0;
L
Liu Jicong 已提交
650
  SMqRebVgReq req = {0};
dengyihao's avatar
dengyihao 已提交
651
  SDecoder    dc = {0};
652 653 654 655 656 657 658 659 660

  tDecoderInit(&dc, msg, msgLen);

  // decode req
  if (tDecodeSMqRebVgReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
  }
L
Liu Jicong 已提交
661

D
dapan1121 已提交
662 663 664
  SVnode* pVnode = pTq->pVnode;
  int32_t vgId = TD_VID(pVnode);

665
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
D
dapan1121 已提交
666
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
667

668
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
669
  if (pHandle == NULL) {
L
Liu Jicong 已提交
670
    if (req.oldConsumerId != -1) {
671
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
672
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
673
    }
D
dapan1121 已提交
674

L
Liu Jicong 已提交
675
    if (req.newConsumerId == -1) {
676
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
677
      goto end;
L
Liu Jicong 已提交
678
    }
D
dapan1121 已提交
679

L
Liu Jicong 已提交
680 681
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
682

L
Liu Jicong 已提交
683 684 685
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
686

L
Liu Jicong 已提交
687
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
688
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
689

690
    // TODO version should be assigned and refed during preprocess
D
dapan1121 已提交
691
    SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
692
    if (pRef == NULL) {
693 694
      ret = -1;
      goto end;
695
    }
D
dapan1121 已提交
696

697 698
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
699

700
    SReadHandle handle = {.vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
H
Haojun Liao 已提交
701 702
    initStorageAPI(&handle.api);

wmmhello's avatar
wmmhello 已提交
703
    pHandle->snapshotVer = ver;
704

L
Liu Jicong 已提交
705
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
dengyihao's avatar
dengyihao 已提交
706
      pHandle->execHandle.execCol.qmsg = taosStrdup(req.qmsg);
707

X
Xiaoyu Wang 已提交
708 709
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId,
                                                          &pHandle->execHandle.numOfCols, req.newConsumerId);
L
Liu Jicong 已提交
710
      void* scanner = NULL;
711
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
712
      pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
L
Liu Jicong 已提交
713
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
D
dapan1121 已提交
714
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
715
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
D
dapan1121 已提交
716

L
Liu Jicong 已提交
717
      pHandle->execHandle.execDb.pFilterOutTbUid =
wmmhello's avatar
wmmhello 已提交
718
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
719
      buildSnapContext(handle.vnode, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
720
                       (SSnapContext**)(&handle.sContext));
721

722
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
723
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
D
dapan1121 已提交
724
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
725
      pHandle->execHandle.execTb.suid = req.suid;
726
      pHandle->execHandle.execTb.qmsg = taosStrdup(req.qmsg);
wmmhello's avatar
wmmhello 已提交
727

dengyihao's avatar
dengyihao 已提交
728
      if (strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) {
729 730 731 732 733
        if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) {
          tqError("nodesStringToNode error in sub stable, since %s, vgId:%d, subkey:%s consumer:0x%" PRIx64, terrstr(),
                  pVnode->config.vgId, req.subKey, pHandle->consumerId);
          return -1;
        }
L
Liu Jicong 已提交
734
      }
wmmhello's avatar
wmmhello 已提交
735

736
      buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
L
Liu Jicong 已提交
737
                       (SSnapContext**)(&handle.sContext));
738
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
wmmhello's avatar
wmmhello 已提交
739

740
      SArray* tbUidList = NULL;
wmmhello's avatar
wmmhello 已提交
741
      ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList, pHandle->execHandle.task);
dengyihao's avatar
dengyihao 已提交
742 743 744
      if (ret != TDB_CODE_SUCCESS) {
        tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey,
                pHandle->consumerId);
745 746
        taosArrayDestroy(tbUidList);
        goto end;
L
Liu Jicong 已提交
747
      }
dengyihao's avatar
dengyihao 已提交
748 749
      tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64,
              pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid);
750
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
H
Haojun Liao 已提交
751
      tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList, NULL);
L
Liu Jicong 已提交
752
      taosArrayDestroy(tbUidList);
L
Liu Jicong 已提交
753
    }
H
Haojun Liao 已提交
754

755
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
dengyihao's avatar
dengyihao 已提交
756
    tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, pHandle->consumerId);
757 758
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
759
  } else {
760
    taosWLockLatch(&pTq->lock);
wmmhello's avatar
wmmhello 已提交
761

D
dapan1121 已提交
762
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
dengyihao's avatar
dengyihao 已提交
763 764
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId,
             req.newConsumerId);
765 766 767
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
768 769
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
    }
dengyihao's avatar
dengyihao 已提交
770
    //    atomic_add_fetch_32(&pHandle->epoch, 1);
771

772
    // kill executing task
dengyihao's avatar
dengyihao 已提交
773 774 775 776 777 778 779 780 781 782
    //    if(tqIsHandleExec(pHandle)) {
    //      qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
    //      if (pTaskInfo != NULL) {
    //        qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
    //      }

    //      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
    //        qStreamCloseTsdbReader(pTaskInfo);
    //      }
    //    }
wmmhello's avatar
wmmhello 已提交
783 784
    // remove if it has been register in the push manager, and return one empty block to consumer
    tqUnregisterPushHandle(pTq, pHandle);
785
    taosWUnLockLatch(&pTq->lock);
786
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
L
Liu Jicong 已提交
787
  }
L
Liu Jicong 已提交
788

789
end:
790
  tDecoderClear(&dc);
791
  return ret;
L
Liu Jicong 已提交
792
}
793

dengyihao's avatar
dengyihao 已提交
794
void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
L
liuyao 已提交
795

796
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
D
dapan1121 已提交
797
  int32_t vgId = TD_VID(pTq->pVnode);
798

799
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
800
  pTask->refCnt = 1;
801
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
dengyihao's avatar
dengyihao 已提交
802 803
  pTask->inputQueue = streamQueueOpen(512 << 10);
  pTask->outputQueue = streamQueueOpen(512 << 10);
L
Liu Jicong 已提交
804 805

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
806
    return -1;
L
Liu Jicong 已提交
807 808
  }

L
Liu Jicong 已提交
809 810
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
811
  pTask->pMsgCb = &pTq->pVnode->msgCb;
812
  pTask->pMeta = pTq->pStreamMeta;
813

814
  pTask->chkInfo.version = ver;
815
  pTask->chkInfo.currentVer = ver;
816

817 818 819
  pTask->dataRange.range.maxVer = ver;
  pTask->dataRange.range.minVer = ver;

820
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
821
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
822 823 824 825
    if (pTask->pState == NULL) {
      return -1;
    }

826
    SReadHandle handle = {.vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
827
    initStorageAPI(&handle.api);
828

829 830
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
831 832
      return -1;
    }
833

834
    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
835
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
836
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
837 838 839
    if (pTask->pState == NULL) {
      return -1;
    }
840

841
    int32_t     numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList);
842 843
    SReadHandle handle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};
    initStorageAPI(&handle.api);
844

845
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
846
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
847 848
      return -1;
    }
849 850

    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
L
Liu Jicong 已提交
851
  }
L
Liu Jicong 已提交
852 853

  // sink
854
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
855
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
856
    pTask->smaSink.smaSink = smaHandleRes;
857
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
858
    pTask->tbSink.vnode = pTq->pVnode;
H
Haojun Liao 已提交
859
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
L
Liu Jicong 已提交
860

X
Xiaoyu Wang 已提交
861
    int32_t   ver1 = 1;
5
54liuyao 已提交
862
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
863
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
864
    if (code == TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
865
      ver1 = info.skmVer;
5
54liuyao 已提交
866
    }
L
Liu Jicong 已提交
867

868 869
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
870
    if (pTask->tbSink.pTSchema == NULL) {
D
dapan1121 已提交
871 872
      return -1;
    }
L
liuyao 已提交
873 874
    pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
    tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
L
Liu Jicong 已提交
875
  }
876

877
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
878
    SWalFilterCond cond = {.deleteMsg = 1};  // delete msg also extract from wal files
879
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
880 881
  }

882
  streamSetupTrigger(pTask);
883

dengyihao's avatar
dengyihao 已提交
884
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId,
885
         pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel);
886 887 888

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
889
  return 0;
L
Liu Jicong 已提交
890
}
L
Liu Jicong 已提交
891

892
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
893 894 895 896
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

897 898
  SStreamTaskCheckReq req;
  SDecoder            decoder;
899

X
Xiaoyu Wang 已提交
900
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
901
  tDecodeStreamTaskCheckReq(&decoder, &req);
902
  tDecoderClear(&decoder);
903

904 905
  int32_t taskId = req.downstreamTaskId;

906 907 908 909 910 911 912 913 914
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
915

L
Liu Jicong 已提交
916
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
917

918
  if (pTask != NULL) {
919
    rsp.status = streamTaskCheckStatus(pTask);
920 921
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

922
    tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d",
923 924
            pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status);
925 926
  } else {
    rsp.status = 0;
927 928
    tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
            taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
929 930 931 932 933
  }

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
934

935
  tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code);
936
  if (code < 0) {
937
    tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId);
L
Liu Jicong 已提交
938
    return -1;
939
  }
L
Liu Jicong 已提交
940

941 942 943 944 945
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
946
  tEncodeStreamTaskCheckRsp(&encoder, &rsp);
947 948
  tEncoderClear(&encoder);

949
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
950

951 952 953 954
  tmsgSendRsp(&rspMsg);
  return 0;
}

955
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
956 957 958 959 960
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
961
  code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
962

963 964 965 966 967
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

968
  tDecoderClear(&decoder);
969 970
  tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
          rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
971

L
Liu Jicong 已提交
972
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
973
  if (pTask == NULL) {
974
    tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
975
            pTq->pStreamMeta->vgId);
976 977 978
    return -1;
  }

979
  code = streamProcessCheckRsp(pTask, &rsp);
L
Liu Jicong 已提交
980 981
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
982 983
}

984
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
985 986 987
  int32_t code = 0;
  int32_t vgId = TD_VID(pTq->pVnode);

5
54liuyao 已提交
988 989 990
  if (tsDisableStream) {
    return 0;
  }
991 992 993 994

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
995
    terrno = TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
996 997
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId,
            (int32_t)sizeof(SStreamTask));
998 999
    return -1;
  }
1000

1001 1002
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1003
  code = tDecodeStreamTask(&decoder, pTask);
1004 1005 1006 1007 1008
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
1009

1010 1011
  tDecoderClear(&decoder);

1012 1013
  SStreamMeta* pStreamMeta = pTq->pStreamMeta;

1014
  // 2.save task, use the newest commit version as the initial start version of stream task.
1015 1016 1017 1018
  taosWLockLatch(&pStreamMeta->lock);
  code = streamMetaAddDeployedTask(pStreamMeta, sversion, pTask);

  int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
1019
  if (code < 0) {
1020
    tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
1021
    taosWUnLockLatch(&pStreamMeta->lock);
1022 1023 1024
    return -1;
  }

1025
  taosWUnLockLatch(&pStreamMeta->lock);
1026

1027
  // 3. It's an fill history task, do nothing. wait for the main task to start it
1028
  if (pTask->info.fillHistory) {
1029 1030
    tqDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
  } else {
1031
    // calculate the correct start time window, and start the handle the history data for the main task.
1032
    if (pTask->historyTaskId.taskId != 0) {
1033
      // launch the history fill stream task
1034
      streamTaskStartHistoryTask(pTask);
1035 1036

      // launch current task
1037
      SHistDataRange* pRange = &pTask->dataRange;
1038 1039 1040 1041 1042 1043 1044
      int64_t ekey = pRange->window.ekey;
      int64_t ver = pRange->range.minVer;

      pRange->window.skey = ekey;
      pRange->window.ekey = INT64_MAX;
      pRange->range.minVer = 0;
      pRange->range.maxVer = ver;
1045 1046 1047 1048 1049 1050 1051 1052 1053

      tqDebug("s-task:%s fill-history task exists, update stream time window:%" PRId64 " - %" PRId64
              ", ver range:%" PRId64 " - %" PRId64,
              pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
    } else {
      SHistDataRange* pRange = &pTask->dataRange;
      tqDebug("s-task:%s no associated task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64
              " - %" PRId64,
              pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
1054
    }
1055

1056
    ASSERT(pTask->status.checkDownstream == 0);
1057
    streamTaskCheckDownstreamTasks(pTask);
1058 1059
  }

1060 1061
  tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr,
          streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
1062

1063 1064 1065
  return 0;
}

1066
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
1067
  int32_t code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1068 1069
  char*   msg = pMsg->pCont;

1070
  SStreamMeta* pMeta = pTq->pStreamMeta;
1071
  SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg;
1072 1073

  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
1074
  if (pTask == NULL) {
1075 1076
    tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
            pMeta->vgId, pReq->taskId);
1077 1078 1079 1080
    return -1;
  }

  // check param
1081
  int64_t fillVer1 = pTask->chkInfo.version;
1082
  if (fillVer1 <= 0) {
1083
    streamMetaReleaseTask(pMeta, pTask);
1084 1085 1086 1087
    return -1;
  }

  // do recovery step 1
1088 1089
  tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus));

H
Haojun Liao 已提交
1090
  int64_t st = taosGetTimestampMs();
1091 1092
  int8_t  schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
                                                      TASK_SCHED_STATUS__WAITING);
1093 1094 1095 1096 1097
  if (schedStatus != TASK_SCHED_STATUS__INACTIVE) {
    ASSERT(0);
    return 0;
  }

1098
  streamSourceScanHistoryData(pTask);
1099
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
H
Haojun Liao 已提交
1100
    tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);
1101
    streamMetaReleaseTask(pMeta, pTask);
L
Liu Jicong 已提交
1102 1103 1104
    return 0;
  }

H
Haojun Liao 已提交
1105
  double el = (taosGetTimestampMs() - st) / 1000.0;
H
Haojun Liao 已提交
1106
  tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
1107

H
Haojun Liao 已提交
1108 1109
  if (pTask->info.fillHistory) {
    // 1. stop the related stream task, get the current scan wal version of stream task, ver.
H
Haojun Liao 已提交
1110 1111 1112 1113 1114
    SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
    if (pStreamTask == NULL) {
      // todo handle error
    }

1115 1116 1117
    ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);

    // wait for the stream task get ready for scan history data
1118
    while (pStreamTask->status.checkDownstream == 0 || pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
1119 1120 1121 1122 1123
      tqDebug("s-task:%s level:%d not ready for halt, wait for 100ms and recheck", pStreamTask->id.idStr,
              pStreamTask->info.taskLevel);
      taosMsleep(100);
    }

1124
    // now we can stop the stream task execution
1125
    pStreamTask->status.taskStatus = TASK_STATUS__HALT;
1126 1127
    tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr,
            pStreamTask->info.taskLevel, pTask->id.idStr);
H
Haojun Liao 已提交
1128 1129

    // if it's an source task, extract the last version in wal.
1130
    int64_t ver = pTask->dataRange.range.maxVer + 1;
H
Haojun Liao 已提交
1131
    int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
1132 1133 1134
    if (latestVer >= ver) {
      ver = latestVer;
    }
H
Haojun Liao 已提交
1135

H
Haojun Liao 已提交
1136
    // 2. do secondary scan of the history data, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1]
1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149
    SVersionRange* pRange = &pTask->dataRange.range;

    pRange->minVer = pRange->maxVer + 1;
    pRange->maxVer = ver;
    if (pRange->minVer == pRange->maxVer) {
      tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest", pTask->id.idStr);
    } else {
      tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64
              " do secondary scan-history-data after halt the related stream task:%s",
              pTask->id.idStr, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr);

      ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
      st = taosGetTimestampMs();
H
Haojun Liao 已提交
1150

1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162
      streamSetParamForStreamScanner(pTask, pRange, &pTask->dataRange.window);

      streamSourceScanHistoryData(pTask);
      if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
        tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);
        streamMetaReleaseTask(pMeta, pTask);
        return 0;
      }

      el = (taosGetTimestampMs() - st) / 1000.0;
      tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pTask->id.idStr, el);
    }
H
Haojun Liao 已提交
1163

H
Haojun Liao 已提交
1164
    // 3. notify the downstream tasks to transfer executor state after handle all history blocks.
1165 1166
    pTask->status.transferState = true;

H
Haojun Liao 已提交
1167 1168 1169 1170
    code = streamDispatchTransferStateMsg(pTask);
    if (code != TSDB_CODE_SUCCESS) {
      // todo handle error
    }
H
Haojun Liao 已提交
1171 1172 1173

    // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
    // 5. resume the related stream task.
1174
    streamTryExec(pTask);
H
Haojun Liao 已提交
1175

1176 1177 1178 1179
    pTask->status.taskStatus = TASK_STATUS__DROPPING;
    tqDebug("s-task:%s set status to be dropping", pTask->id.idStr);

    streamMetaSaveTask(pMeta, pTask);
1180
    streamMetaSaveTask(pMeta, pStreamTask);
1181

1182
    streamMetaReleaseTask(pMeta, pTask);
1183 1184
    streamMetaReleaseTask(pMeta, pStreamTask);

1185 1186
    if (streamMetaCommit(pTask->pMeta) < 0) {
      // persist to disk
1187
    }
1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199
  } else {
    // todo update the chkInfo version for current task.
    // this task has an associated history stream task, so we need to scan wal from the end version of
    // history scan. The current version of chkInfo.current is not updated during the history scan
    tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64
            ", window:%" PRId64 " - %" PRId64,
            pTask->id.idStr, pTask->chkInfo.currentVer, pTask->dataRange.window.skey, pTask->dataRange.window.ekey);

    code = streamTaskScanHistoryDataComplete(pTask);
    streamMetaReleaseTask(pMeta, pTask);
    return code;
  }
H
Haojun Liao 已提交
1200

1201 1202 1203
  return 0;
}

H
Haojun Liao 已提交
1204 1205
// notify the downstream tasks to transfer executor state after handle all history blocks.
int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1206 1207 1208 1209 1210
  SStreamTransferReq req;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  int32_t code = tDecodeStreamRecoverFinishReq(&decoder, &req);
H
Haojun Liao 已提交
1211

1212
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1213
  if (pTask == NULL) {
1214
    tqError("failed to find task:0x%x", req.taskId);
1215 1216 1217
    return -1;
  }

H
Haojun Liao 已提交
1218 1219 1220 1221 1222
  ASSERT(pTask->streamTaskId.taskId != 0);
  pTask->status.transferState = true;  // persistent data?

#if 0
  // do check if current task handle all data in the input queue
H
Haojun Liao 已提交
1223
  int64_t st = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
1224
  tqDebug("s-task:%s start step2 recover, ts:%" PRId64, pTask->id.idStr, st);
H
Haojun Liao 已提交
1225

1226
  code = streamSourceRecoverScanStep2(pTask, sversion);
1227
  if (code < 0) {
L
Liu Jicong 已提交
1228
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1229 1230 1231
    return -1;
  }

1232
  qDebug("s-task:%s set start wal scan start ver:%"PRId64, pTask->id.idStr, sversion);
1233

1234
  walReaderSeekVer(pTask->exec.pWalReader, sversion);
L
liuyao 已提交
1235
  pTask->chkInfo.currentVer = sversion;
1236

1237
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1238 1239 1240 1241
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1242 1243 1244
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1245
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1246 1247 1248 1249
    return -1;
  }

  // set status normal
H
Haojun Liao 已提交
1250
  tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr);
1251 1252
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1253
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1254 1255 1256
    return -1;
  }

dengyihao's avatar
dengyihao 已提交
1257
  double el = (taosGetTimestampMs() - st) / 1000.0;
H
Haojun Liao 已提交
1258
  tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1259

1260
  // dispatch recover finish req to all related downstream task
1261
  code = streamDispatchScanHistoryFinishMsg(pTask);
1262
  if (code < 0) {
L
Liu Jicong 已提交
1263
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1264 1265 1266
    return -1;
  }

1267
  atomic_store_8(&pTask->info.fillHistory, 0);
L
Liu Jicong 已提交
1268
  streamMetaSaveTask(pTq->pStreamMeta, pTask);
H
Haojun Liao 已提交
1269
#endif
L
Liu Jicong 已提交
1270

H
Haojun Liao 已提交
1271
  streamSchedExec(pTask);
L
Liu Jicong 已提交
1272
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
H
Haojun Liao 已提交
1273

1274 1275 1276
  return 0;
}

L
Liu Jicong 已提交
1277 1278 1279
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1280 1281

  // deserialize
1282 1283 1284
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1285
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
H
Haojun Liao 已提交
1286
  tDecodeStreamRecoverFinishReq(&decoder, &req);
1287 1288
  tDecoderClear(&decoder);

1289
  // find task
L
Liu Jicong 已提交
1290
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1291 1292 1293
  if (pTask == NULL) {
    return -1;
  }
1294
  // do process request
1295
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
1296
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1297 1298 1299
    return -1;
  }

L
Liu Jicong 已提交
1300
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1301
  return 0;
L
Liu Jicong 已提交
1302
}
L
Liu Jicong 已提交
1303

L
Liu Jicong 已提交
1304 1305 1306 1307 1308
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

1309 1310 1311 1312
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

H
Haojun Liao 已提交
1313 1314
  *pRefBlock = NULL;

1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tDecoderInit(pCoder, (uint8_t*)pData, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
  if (numOfTables == 0 || pRes->affectedRows == 0) {
    taosArrayDestroy(pRes->uidList);
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, numOfTables);
  pDelBlock->info.rows = numOfTables;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < numOfTables; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataSetVal(pUidCol, i, (const char*)pUid, false);

    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

  taosArrayDestroy(pRes->uidList);
  *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
  if (pRefBlock == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
  (*pRefBlock)->pBlock = pDelBlock;
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1362 1363
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1364 1365 1366 1367

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1368 1369 1370 1371 1372 1373
  if (taskId == STREAM_TASK_STATUS_CHECK_ID) {
    tqStreamTasksStatusCheck(pTq);
    return 0;
  }

  if (taskId == EXTRACT_DATA_FROM_WAL_ID) {  // all tasks are extracted submit data from the wal
1374
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1375
    return 0;
1376
  }
1377

1378
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1379
  if (pTask != NULL) {
1380 1381 1382
    int8_t status = pTask->status.taskStatus;
    if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) {
      tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
dengyihao's avatar
dengyihao 已提交
1383
              pTask->chkInfo.version);
1384
      streamProcessRunReq(pTask);
1385
    } else {
1386
      if (streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
1387
        atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
L
liuyao 已提交
1388
      }
1389 1390 1391

      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
              pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
1392
    }
1393

L
Liu Jicong 已提交
1394
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1395
    tqStartStreamTasks(pTq);
L
Liu Jicong 已提交
1396
    return 0;
1397
  } else {
1398
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
1399
    return -1;
L
Liu Jicong 已提交
1400
  }
L
Liu Jicong 已提交
1401 1402
}

L
Liu Jicong 已提交
1403
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
dengyihao's avatar
dengyihao 已提交
1404 1405 1406
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1407 1408 1409 1410

  SStreamDispatchReq req = {0};

  SDecoder decoder;
L
Liu Jicong 已提交
1411
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1412
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1413

1414
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1415
  if (pTask) {
1416
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1417
    streamProcessDispatchMsg(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1418
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1419
    return 0;
1420
  } else {
L
liuyao 已提交
1421
    tDeleteStreamDispatchReq(&req);
1422
    return -1;
L
Liu Jicong 已提交
1423
  }
L
Liu Jicong 已提交
1424 1425
}

L
Liu Jicong 已提交
1426 1427
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1428
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1429
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1430 1431

  int32_t vgId = pTq->pStreamMeta->vgId;
L
Liu Jicong 已提交
1432
  if (pTask) {
1433
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1434
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1435
    return 0;
1436
  } else {
1437
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
1438
    return TSDB_CODE_INVALID_MSG;
L
Liu Jicong 已提交
1439
  }
L
Liu Jicong 已提交
1440
}
L
Liu Jicong 已提交
1441

1442
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1443
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1444 1445
  tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);

1446
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1447
  return 0;
L
Liu Jicong 已提交
1448
}
L
Liu Jicong 已提交
1449

5
54liuyao 已提交
1450 1451
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
dengyihao's avatar
dengyihao 已提交
1452
  SStreamTask*          pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
L
liuyao 已提交
1453 1454
  if (pTask) {
    tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
L
liuyao 已提交
1455
    atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
L
liuyao 已提交
1456 1457 1458
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  }
5
54liuyao 已提交
1459 1460 1461 1462 1463
  return 0;
}

int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
1464 1465

  int32_t      vgId = pTq->pStreamMeta->vgId;
L
liuyao 已提交
1466 1467
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask) {
L
liuyao 已提交
1468
    atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
1469 1470

    // no lock needs to secure the access of the version
L
liuyao 已提交
1471
    if (pReq->igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
1472 1473 1474 1475 1476
      // discard all the data  when the stream task is suspended.
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
              ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
1477
    } else {  // from the previous paused version and go on
1478 1479
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
1480 1481
    }

L
liuyao 已提交
1482 1483 1484
    if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
      streamStartRecoverTask(pTask, pReq->igUntreated);
    } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
L
liuyao 已提交
1485 1486 1487 1488
      tqStartStreamTasks(pTq);
    } else {
      streamSchedExec(pTask);
    }
L
liuyao 已提交
1489
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1490 1491
  } else {
    tqError("vgId:%d failed to find the s-task:0x%x for resume stream task", vgId, pReq->taskId);
L
liuyao 已提交
1492
  }
1493

5
54liuyao 已提交
1494 1495 1496
  return 0;
}

L
Liu Jicong 已提交
1497 1498 1499 1500 1501 1502
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1503
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1504
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1505
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1506
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1507
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1508
  if (pTask) {
1509
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
1510
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1511
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1512
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1513
    return 0;
L
Liu Jicong 已提交
1514
  } else {
L
liuyao 已提交
1515
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1516
    return -1;
L
Liu Jicong 已提交
1517 1518 1519 1520 1521 1522 1523
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1524

1525 1526 1527 1528 1529 1530
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1531 1532 1533

  SStreamDispatchReq req;
  SDecoder           decoder;
1534
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1535 1536
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1537
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1538 1539
    goto FAIL;
  }
L
Liu Jicong 已提交
1540
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1541

L
Liu Jicong 已提交
1542
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1543

L
Liu Jicong 已提交
1544
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1545
  if (pTask) {
1546
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1547
    streamProcessDispatchMsg(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1548
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1549 1550
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1551
    return 0;
5
54liuyao 已提交
1552 1553
  } else {
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1554
  }
L
Liu Jicong 已提交
1555

1556 1557
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1558
FAIL:
1559 1560 1561 1562
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1563
    SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info};
1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1580
  SRpcMsg rsp = {
1581
      .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
1582
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1583
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1584 1585
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1586
  return -1;
L
Liu Jicong 已提交
1587
}
L
Liu Jicong 已提交
1588

1589
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1590