tq.c 43.7 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
#define WAL_READ_TASKS_ID       (-1)
22

23 24
static int32_t tqInitialize(STQ* pTq);

wmmhello's avatar
wmmhello 已提交
25 26 27
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_EXEC;}
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_IDLE;}
wmmhello's avatar
wmmhello 已提交
28

L
Liu Jicong 已提交
29
int32_t tqInit() {
L
Liu Jicong 已提交
30 31 32 33 34 35
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

36 37 38 39 40 41
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
42 43 44
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
45
    atomic_store_8(&tqMgmt.inited, 1);
46
  }
47

L
Liu Jicong 已提交
48 49
  return 0;
}
L
Liu Jicong 已提交
50

51
void tqCleanUp() {
L
Liu Jicong 已提交
52 53 54 55 56 57 58 59
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
60
    streamCleanUp();
L
Liu Jicong 已提交
61 62
    atomic_store_8(&tqMgmt.inited, 0);
  }
63
}
L
Liu Jicong 已提交
64

65
static void destroyTqHandle(void* data) {
66 67
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
wmmhello's avatar
wmmhello 已提交
68

69
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
70
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
71
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
72
    tqCloseReader(pData->execHandle.pTqReader);
73 74
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
75
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
76
    walCloseReader(pData->pWalReader);
77
    tqCloseReader(pData->execHandle.pTqReader);
78
  }
79 80 81 82
  if(pData->msg != NULL) {
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
H
Haojun Liao 已提交
83
  }
L
Liu Jicong 已提交
84 85
}

86 87 88 89 90
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
91
STQ* tqOpen(const char* path, SVnode* pVnode) {
92
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
93
  if (pTq == NULL) {
S
Shengliang Guan 已提交
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
95 96
    return NULL;
  }
97

98
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
99
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
100
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
101

102
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
103
  taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
104

105
  taosInitRWLatch(&pTq->lock);
106
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
107

108
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
109
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
110

111 112 113 114 115 116 117
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
118 119 120
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
121
  if (tqMetaOpen(pTq) < 0) {
122
    return -1;
123 124
  }

L
Liu Jicong 已提交
125 126
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
127
    return -1;
128 129
  }

130
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
131
  if (pTq->pStreamMeta == NULL) {
132
    return -1;
L
Liu Jicong 已提交
133 134
  }

135 136
  // the version is kept in task's meta data
  // todo check if this version is required or not
137 138
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
139 140
  }

141
  return 0;
L
Liu Jicong 已提交
142
}
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
void tqClose(STQ* pTq) {
145 146
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
147
  }
148 149 150 151 152 153 154 155 156

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
157
}
L
Liu Jicong 已提交
158

H
Haojun Liao 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
      tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
172 173 174
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
175
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
176 177
      int64_t el = taosGetTimestampMs() - st;
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 "ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
178 179 180 181 182 183
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
  }
}

H
Haojun Liao 已提交
184 185
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
186 187
  int32_t len = 0;
  int32_t code = 0;
H
Haojun Liao 已提交
188 189 190 191 192 193

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
    tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
194 195 196 197 198 199 200 201 202 203 204

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

H
Haojun Liao 已提交
205 206 207
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
208 209 210 211 212

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
H
Haojun Liao 已提交
213 214 215 216

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
    tEncodeSMqDataRsp(&encoder, pRsp);
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
X
Xiaoyu Wang 已提交
217
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
H
Haojun Liao 已提交
218 219
  }

L
Liu Jicong 已提交
220 221 222
  tEncoderClear(&encoder);

  SRpcMsg rsp = {
H
Haojun Liao 已提交
223
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
224 225 226 227 228 229 230 231 232
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };

  tmsgSendRsp(&rsp);
  return 0;
}

233 234 235 236 237 238
int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle) {
  SMqDataRsp dataRsp = {0};
  dataRsp.head.consumerId = pHandle->consumerId;
  dataRsp.head.epoch = pHandle->epoch;
  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
  doSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP);
L
Liu Jicong 已提交
239

wmmhello's avatar
wmmhello 已提交
240 241
  char buf1[80] = {0};
  char buf2[80] = {0};
242 243
  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
H
Haojun Liao 已提交
244
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
245
          TD_VID(pTq->pVnode), dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
L
Liu Jicong 已提交
246 247 248
  return 0;
}

H
Haojun Liao 已提交
249 250
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type) {
  doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type);
251 252 253 254 255 256

  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);

X
Xiaoyu Wang 已提交
257
  tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
H
Haojun Liao 已提交
258
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
H
Haojun Liao 已提交
259

260 261 262
  return 0;
}

263
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
264
  STqOffset offset = {0};
X
Xiaoyu Wang 已提交
265
  int32_t   vgId = TD_VID(pTq->pVnode);
266

X
Xiaoyu Wang 已提交
267 268
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
269 270 271
  if (tDecodeSTqOffset(&decoder, &offset) < 0) {
    return -1;
  }
272

273 274
  tDecoderClear(&decoder);

wmmhello's avatar
wmmhello 已提交
275
  if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
276
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
H
Haojun Liao 已提交
277
            offset.subKey, vgId, offset.val.uid, offset.val.ts);
L
Liu Jicong 已提交
278
  } else if (offset.val.type == TMQ_OFFSET__LOG) {
X
Xiaoyu Wang 已提交
279 280
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey, vgId,
            offset.val.version);
281
    if (offset.val.version + 1 == sversion) {
282 283
      offset.val.version += 1;
    }
284
  } else {
285 286
    tqError("invalid commit offset type:%d", offset.val.type);
    return -1;
287
  }
288 289 290 291

  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(&offset, pSavedOffset)) {
    return 0;  // no need to update the offset value
292 293
  }

294
  // save the new offset value
295 296
  if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
    return -1;
297
  }
298 299

  if (offset.val.type == TMQ_OFFSET__LOG) {
wmmhello's avatar
wmmhello 已提交
300
    taosWLockLatch(&pTq->lock);
301
    STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
wmmhello's avatar
wmmhello 已提交
302 303
    if (pHandle && (walSetRefVer(pHandle->pRef, offset.val.version) < 0)) {
      taosWUnLockLatch(&pTq->lock);
304
      return -1;
305
    }
wmmhello's avatar
wmmhello 已提交
306
    taosWUnLockLatch(&pTq->lock);
307 308
  }

309 310 311
  return 0;
}

L
Liu Jicong 已提交
312
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
313
  void* pIter = NULL;
314

L
Liu Jicong 已提交
315
  while (1) {
316
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
317 318 319 320
    if (pIter == NULL) {
      break;
    }

321
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
322

L
Liu Jicong 已提交
323 324
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
325
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
326 327
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
328
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
329 330 331 332 333
          return -1;
        }
      }
    }
  }
334

L
Liu Jicong 已提交
335 336 337
  return 0;
}

338
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
339
  SMqPollReq req = {0};
340 341 342 343 344 345 346 347 348 349 350
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

wmmhello's avatar
wmmhello 已提交
351
  taosWLockLatch(&pTq->lock);
352 353 354 355 356
  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
wmmhello's avatar
wmmhello 已提交
357
    taosWUnLockLatch(&pTq->lock);
358 359 360
    return -1;
  }

wmmhello's avatar
wmmhello 已提交
361 362 363 364 365
  while (tqIsHandleExec(pHandle)) {
    tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", consumerId, vgId, req.subKey);
    taosMsleep(5);
  }

366
  // 2. check re-balance status
367 368 369 370
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
371
    taosWUnLockLatch(&pTq->lock);
372 373
    return -1;
  }
374 375
  tqSetHandleExec(pHandle);
  taosWUnLockLatch(&pTq->lock);
376

377
  // 3. update the epoch value
H
Haojun Liao 已提交
378 379
  int32_t savedEpoch = pHandle->epoch;
  if (savedEpoch < reqEpoch) {
X
Xiaoyu Wang 已提交
380 381
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
            reqEpoch);
382
    pHandle->epoch = reqEpoch;
H
Haojun Liao 已提交
383
  }
384 385 386

  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
H
Haojun Liao 已提交
387 388
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
389

390 391 392
  int code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
  tqSetHandleIdle(pHandle);
  return code;
393 394
}

395
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
396
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
397
  int32_t        vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
398

L
Liu Jicong 已提交
399
  tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
400
  int32_t code = 0;
L
Liu Jicong 已提交
401

wmmhello's avatar
wmmhello 已提交
402
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
403 404
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
405
    while (tqIsHandleExec(pHandle)) {
406 407 408
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
      taosMsleep(5);
    }
wmmhello's avatar
wmmhello 已提交
409 410 411
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
L
Liu Jicong 已提交
412 413 414 415
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
416
  }
417

L
Liu Jicong 已提交
418 419
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
420
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
421
  }
L
Liu Jicong 已提交
422

L
Liu Jicong 已提交
423
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
424
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
425
  }
wmmhello's avatar
wmmhello 已提交
426 427
  taosWUnLockLatch(&pTq->lock);

L
Liu Jicong 已提交
428
  return 0;
L
Liu Jicong 已提交
429 430
}

431
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
432 433
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
434
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
435
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
436 437 438 439
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
440 441 442 443 444
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
445 446 447 448 449 450
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

451
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
452 453 454 455 456 457 458 459 460 461 462
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

463
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
464
  int ret = 0;
L
Liu Jicong 已提交
465
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
466
  tDecodeSMqRebVgReq(msg, &req);
L
Liu Jicong 已提交
467

468 469 470
  SVnode* pVnode = pTq->pVnode;
  int32_t vgId = TD_VID(pVnode);

471
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
472
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
473

wmmhello's avatar
wmmhello 已提交
474
  taosWLockLatch(&pTq->lock);
475
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
476
  if (pHandle == NULL) {
L
Liu Jicong 已提交
477
    if (req.oldConsumerId != -1) {
478
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
479
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
480
    }
481

L
Liu Jicong 已提交
482
    if (req.newConsumerId == -1) {
483
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
484
      goto end;
L
Liu Jicong 已提交
485
    }
486

L
Liu Jicong 已提交
487 488
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
489

H
Haojun Liao 已提交
490
    uint64_t oldConsumerId = pHandle->consumerId;
L
Liu Jicong 已提交
491 492 493
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
494

L
Liu Jicong 已提交
495
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
496
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
497

498
    // TODO version should be assigned and refed during preprocess
499
    SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
500
    if (pRef == NULL) {
501 502
      ret = -1;
      goto end;
503
    }
H
Haojun Liao 已提交
504

505 506
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
507

508
    SReadHandle handle = {
509
        .meta = pVnode->pMeta, .vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
wmmhello's avatar
wmmhello 已提交
510
    pHandle->snapshotVer = ver;
511

L
Liu Jicong 已提交
512
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
513
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
514
      req.qmsg = NULL;
515

X
Xiaoyu Wang 已提交
516 517
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId,
                                                          &pHandle->execHandle.numOfCols, req.newConsumerId);
L
Liu Jicong 已提交
518
      void* scanner = NULL;
519
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
520
      pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
L
Liu Jicong 已提交
521
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
522
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
523
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
524

L
Liu Jicong 已提交
525
      pHandle->execHandle.execDb.pFilterOutTbUid =
wmmhello's avatar
wmmhello 已提交
526
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
527 528
      buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
529

530
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
531
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
532
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
533 534
      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
535
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
536 537
      vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
538 539
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
540
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
L
Liu Jicong 已提交
541
      }
542
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
543
      tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
L
Liu Jicong 已提交
544
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
545

L
Liu Jicong 已提交
546 547
      buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
548
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
549
    }
H
Haojun Liao 已提交
550

551
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
552 553
    tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
            pHandle->consumerId, oldConsumerId);
554 555
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
556
  } else {
wmmhello's avatar
wmmhello 已提交
557 558 559 560 561
    while (tqIsHandleExec(pHandle)) {
      tqDebug("sub req vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
      taosMsleep(5);
    }

562 563 564
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
      atomic_add_fetch_32(&pHandle->epoch, 1);
565

566 567 568
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
569
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
wmmhello's avatar
wmmhello 已提交
570
      atomic_store_32(&pHandle->epoch, 0);
571 572 573 574 575 576 577 578
    }
    // kill executing task
    qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
    if (pTaskInfo != NULL) {
      qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
    }
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
      qStreamCloseTsdbReader(pTaskInfo);
L
Liu Jicong 已提交
579
    }
wmmhello's avatar
wmmhello 已提交
580 581
    // remove if it has been register in the push manager, and return one empty block to consumer
    tqUnregisterPushHandle(pTq, pHandle);
582 583
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
584
  }
L
Liu Jicong 已提交
585

586
end:
wmmhello's avatar
wmmhello 已提交
587
  taosWUnLockLatch(&pTq->lock);
H
Haojun Liao 已提交
588
  taosMemoryFree(req.qmsg);
589
  return ret;
L
Liu Jicong 已提交
590
}
591

592
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
593
  int32_t vgId = TD_VID(pTq->pVnode);
594
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
595
  pTask->refCnt = 1;
596
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
L
Liu Jicong 已提交
597 598
  pTask->inputQueue = streamQueueOpen();
  pTask->outputQueue = streamQueueOpen();
L
Liu Jicong 已提交
599 600

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
601
    return -1;
L
Liu Jicong 已提交
602 603
  }

L
Liu Jicong 已提交
604 605
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
606
  pTask->pMsgCb = &pTq->pVnode->msgCb;
607
  pTask->pMeta = pTq->pStreamMeta;
608
  pTask->chkInfo.version = ver;
609
  pTask->chkInfo.currentVer = ver;
610

611
  // expand executor
612
  pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL;
613

614
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
615
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
616 617 618 619
    if (pTask->pState == NULL) {
      return -1;
    }

620
    SReadHandle handle = {
621
        .meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
622

623 624
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
625 626
      return -1;
    }
627

628
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
629
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
630 631 632
    if (pTask->pState == NULL) {
      return -1;
    }
633

634 635 636 637 638
    int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo);
    SReadHandle mgHandle = { .vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};

    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
639 640
      return -1;
    }
L
Liu Jicong 已提交
641
  }
L
Liu Jicong 已提交
642 643

  // sink
L
Liu Jicong 已提交
644
  /*pTask->ahandle = pTq->pVnode;*/
645
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
646
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
647
    pTask->smaSink.smaSink = smaHandleRes;
648
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
649
    pTask->tbSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
650
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
L
Liu Jicong 已提交
651

X
Xiaoyu Wang 已提交
652
    int32_t   ver1 = 1;
5
54liuyao 已提交
653
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
654
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
655
    if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
656
      ver1 = info.skmVer;
5
54liuyao 已提交
657
    }
L
Liu Jicong 已提交
658

659 660
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
wmmhello's avatar
wmmhello 已提交
661
    if(pTask->tbSink.pTSchema == NULL) {
wmmhello's avatar
wmmhello 已提交
662
      return -1;
wmmhello's avatar
wmmhello 已提交
663
    }
L
Liu Jicong 已提交
664
  }
665

666
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
667 668
    SWalFilterCond cond = {.deleteMsg = 1};
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
669 670
  }

671
  streamSetupTrigger(pTask);
672

673
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
674
         pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
675 676 677

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
678
  return 0;
L
Liu Jicong 已提交
679
}
L
Liu Jicong 已提交
680

681 682 683 684 685 686
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
  char*               msgStr = pMsg->pCont;
  char*               msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t             msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamTaskCheckReq req;
  SDecoder            decoder;
X
Xiaoyu Wang 已提交
687
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
688 689 690 691 692 693 694 695 696 697 698 699
  tDecodeSStreamTaskCheckReq(&decoder, &req);
  tDecoderClear(&decoder);
  int32_t             taskId = req.downstreamTaskId;
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
700

L
Liu Jicong 已提交
701
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
702

703
  if (pTask) {
704
    rsp.status = streamTaskCheckStatus(pTask);
705 706 707 708 709 710
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

    tqDebug("tq recv task check req(reqId:0x%" PRIx64
            ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d",
            rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus, rsp.upstreamTaskId,
            rsp.upstreamNodeId, rsp.status);
711 712
  } else {
    rsp.status = 0;
713 714 715 716
    tqDebug("tq recv task check(taskId:%d not built yet) req(reqId:0x%" PRIx64
            ") %d at node %d, check req from task %d at node %d, rsp status %d",
            taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            rsp.status);
717 718 719 720 721 722 723
  }

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
  tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
  if (code < 0) {
L
Liu Jicong 已提交
724
    tqError("unable to encode rsp %d", __LINE__);
L
Liu Jicong 已提交
725
    return -1;
726
  }
L
Liu Jicong 已提交
727

728 729 730 731 732 733 734 735
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
  tEncoderClear(&encoder);

736
  SRpcMsg rspMsg = { .code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info };
737 738 739 740
  tmsgSendRsp(&rspMsg);
  return 0;
}

741
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
742 743 744 745 746 747 748 749 750 751 752
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

753
  tDecoderClear(&decoder);
754
  tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
755 756
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

L
Liu Jicong 已提交
757
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
758 759 760 761
  if (pTask == NULL) {
    return -1;
  }

762
  code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
L
Liu Jicong 已提交
763 764
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
765 766
}

767
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
768 769 770 771 772
  int32_t code;
#if 0
  code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
  if (code < 0) return code;
#endif
5
54liuyao 已提交
773 774 775
  if (tsDisableStream) {
    return 0;
  }
776 777 778 779 780 781

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
782

783 784
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
785
  code = tDecodeStreamTask(&decoder, pTask);
786 787 788 789 790
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
791

792 793
  tDecoderClear(&decoder);

794
  // 2.save task, use the newest commit version as the initial start version of stream task.
795
  taosWLockLatch(&pTq->pStreamMeta->lock);
796
  code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
797
  if (code < 0) {
798 799
    tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr,
            streamMetaGetNumOfTasks(pTq->pStreamMeta));
800
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
801 802 803
    return -1;
  }

804 805
  taosWUnLockLatch(&pTq->pStreamMeta->lock);

806 807
  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
808
    streamTaskCheckDownstream(pTask, sversion);
809 810
  }

811 812
  tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", TD_VID(pTq->pVnode),
          pTask->id.idStr, pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta));
813 814 815
  return 0;
}

L
Liu Jicong 已提交
816 817 818 819 820
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
  int32_t code;
  char*   msg = pMsg->pCont;
  int32_t msgLen = pMsg->contLen;

821
  SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
L
Liu Jicong 已提交
822
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
823 824 825 826 827
  if (pTask == NULL) {
    return -1;
  }

  // check param
828
  int64_t fillVer1 = pTask->chkInfo.version;
829
  if (fillVer1 <= 0) {
L
Liu Jicong 已提交
830
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
831 832 833 834
    return -1;
  }

  // do recovery step 1
H
Haojun Liao 已提交
835 836
  tqDebug("s-task:%s start recover step 1 scan", pTask->id.idStr);
  int64_t st = taosGetTimestampMs();
837

H
Haojun Liao 已提交
838
  streamSourceRecoverScanStep1(pTask);
839
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
840 841 842 843
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

H
Haojun Liao 已提交
844 845 846
  double el = (taosGetTimestampMs() - st) / 1000.0;
  tqDebug("s-task:%s recover step 1 ended, elapsed time:%.2fs", pTask->id.idStr, el);

847 848 849 850
  // build msg to launch next step
  SStreamRecoverStep2Req req;
  code = streamBuildSourceRecover2Req(pTask, &req);
  if (code < 0) {
L
Liu Jicong 已提交
851
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
852 853 854
    return -1;
  }

L
Liu Jicong 已提交
855
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
856

857
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
858 859 860
    return 0;
  }

861
  // serialize msg
L
Liu Jicong 已提交
862 863 864 865 866 867 868 869
  int32_t len = sizeof(SStreamRecoverStep1Req);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);
870 871

  // dispatch msg
872
  tqDebug("s-task:%s start to recover blocking stage", pTask->id.idStr);
873

H
Haojun Liao 已提交
874 875
  SRpcMsg rpcMsg = {
      .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
876 877 878 879
  tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
  return 0;
}

880
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
H
Haojun Liao 已提交
881 882
  int32_t code = 0;

883
  SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
L
Liu Jicong 已提交
884
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
885 886 887 888 889
  if (pTask == NULL) {
    return -1;
  }

  // do recovery step 2
890
  code = streamSourceRecoverScanStep2(pTask, sversion);
891
  if (code < 0) {
L
Liu Jicong 已提交
892
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
893 894 895
    return -1;
  }

896 897 898
  qDebug("s-task:%s set the start wal offset to be:%"PRId64, pTask->id.idStr, sversion);
  walReaderSeekVer(pTask->exec.pWalReader, sversion);

899
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
900 901 902 903
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

904 905 906
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
907
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
908 909 910 911 912 913
    return -1;
  }

  // set status normal
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
914
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
915 916 917 918 919 920
    return -1;
  }

  // dispatch recover finish req to all related downstream task
  code = streamDispatchRecoverFinishReq(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
921
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
922 923 924
    return -1;
  }

L
Liu Jicong 已提交
925 926 927
  atomic_store_8(&pTask->fillHistory, 0);
  streamMetaSaveTask(pTq->pStreamMeta, pTask);

L
Liu Jicong 已提交
928 929
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);

930 931 932
  return 0;
}

L
Liu Jicong 已提交
933 934 935
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
936 937

  // deserialize
938 939 940
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
941
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
942 943 944
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

945
  // find task
L
Liu Jicong 已提交
946
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
947 948 949
  if (pTask == NULL) {
    return -1;
  }
950
  // do process request
951
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
952
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
953 954 955
    return -1;
  }

L
Liu Jicong 已提交
956
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
957
  return 0;
L
Liu Jicong 已提交
958
}
L
Liu Jicong 已提交
959

L
Liu Jicong 已提交
960 961 962 963 964
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

965 966 967 968
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

H
Haojun Liao 已提交
969 970
  *pRefBlock = NULL;

971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tDecoderInit(pCoder, (uint8_t*)pData, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
  if (numOfTables == 0 || pRes->affectedRows == 0) {
    taosArrayDestroy(pRes->uidList);
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, numOfTables);
  pDelBlock->info.rows = numOfTables;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < numOfTables; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataSetVal(pUidCol, i, (const char*)pUid, false);

    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

  taosArrayDestroy(pRes->uidList);
  *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
  if (pRefBlock == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
  (*pRefBlock)->pBlock = pDelBlock;
  return TSDB_CODE_SUCCESS;
}

int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
L
Liu Jicong 已提交
1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033
  bool        failed = false;
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    failed = true;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t sz = taosArrayGetSize(pRes->uidList);
L
Liu Jicong 已提交
1034
  if (sz == 0 || pRes->affectedRows == 0) {
L
Liu Jicong 已提交
1035 1036 1037
    taosArrayDestroy(pRes->uidList);
    return 0;
  }
1038

L
Liu Jicong 已提交
1039 1040 1041 1042 1043 1044 1045 1046
  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, sz);
  pDelBlock->info.rows = sz;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < sz; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
1047
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
L
Liu Jicong 已提交
1048
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
1049
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
L
Liu Jicong 已提交
1050 1051 1052
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
1053
    colDataSetVal(pUidCol, i, (const char*)pUid, false);
L
Liu Jicong 已提交
1054

1055 1056 1057
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
L
Liu Jicong 已提交
1058 1059
  }

L
Liu Jicong 已提交
1060 1061
  taosArrayDestroy(pRes->uidList);

L
Liu Jicong 已提交
1062 1063 1064
  int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
  *pRef = 1;

1065 1066
  taosWLockLatch(&pTq->pStreamMeta->lock);

L
Liu Jicong 已提交
1067 1068 1069
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
1070 1071 1072 1073
    if (pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
1074
    SStreamTask* pTask = *(SStreamTask**)pIter;
1075 1076 1077
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
      continue;
    }
L
Liu Jicong 已提交
1078

1079
    qDebug("s-task:%s delete req enqueue, ver: %" PRId64, pTask->id.idStr, ver);
L
Liu Jicong 已提交
1080

L
Liu Jicong 已提交
1081
    if (!failed) {
S
Shengliang Guan 已提交
1082
      SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1083 1084 1085
      pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
      pRefBlock->pBlock = pDelBlock;

1086
      if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
L
Liu Jicong 已提交
1087
        atomic_sub_fetch_32(pRef, 1);
L
Liu Jicong 已提交
1088
        taosFreeQitem(pRefBlock);
L
Liu Jicong 已提交
1089 1090
        continue;
      }
L
Liu Jicong 已提交
1091

L
Liu Jicong 已提交
1092
      if (streamSchedExec(pTask) < 0) {
1093
        qError("s-task:%s stream task launch failed", pTask->id.idStr);
L
Liu Jicong 已提交
1094 1095
        continue;
      }
L
Liu Jicong 已提交
1096

L
Liu Jicong 已提交
1097 1098 1099 1100
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1101

1102 1103
  taosWUnLockLatch(&pTq->pStreamMeta->lock);

L
Liu Jicong 已提交
1104 1105
  int32_t ref = atomic_sub_fetch_32(pRef, 1);
  if (ref == 0) {
L
Liu Jicong 已提交
1106
    blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1107 1108 1109 1110
    taosMemoryFree(pRef);
  }

#if 0
S
Shengliang Guan 已提交
1111
    SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1112 1113 1114 1115 1116 1117 1118 1119
    pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
    pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
    SSDataBlock block = {0};
    assignOneDataBlock(&block, pDelBlock);
    block.info.type = STREAM_DELETE_DATA;
    taosArrayPush(pStreamBlock->blocks, &block);

    if (!failed) {
1120
      if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
1121
        qError("stream task input del failed, task id %d", pTask->id.taskId);
L
Liu Jicong 已提交
1122 1123 1124 1125
        continue;
      }

      if (streamSchedExec(pTask) < 0) {
1126
        qError("stream task launch failed, task id %d", pTask->id.taskId);
L
Liu Jicong 已提交
1127 1128 1129 1130 1131 1132
        continue;
      }
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1133
  blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1134
#endif
L
Liu Jicong 已提交
1135 1136 1137
  return 0;
}

1138 1139 1140 1141
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
  int32_t vgId = TD_VID(pTq->pVnode);

  taosWLockLatch(&pTq->lock);
1142 1143 1144 1145 1146

  if (taosHashGetSize(pTq->pPushMgr) > 0) {
    void* pIter = taosHashIterate(pTq->pPushMgr, NULL);

    while (pIter) {
wmmhello's avatar
wmmhello 已提交
1147
      STqHandle* pHandle = *(STqHandle**)pIter;
1148 1149 1150
      tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);

      if (ASSERT(pHandle->msg != NULL)) {
wmmhello's avatar
wmmhello 已提交
1151 1152 1153 1154 1155 1156 1157 1158
        tqError("pHandle->msg should not be null");
        break;
      }else{
        SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
        tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
        taosMemoryFree(pHandle->msg);
        pHandle->msg = NULL;
      }
1159

wmmhello's avatar
wmmhello 已提交
1160
      pIter = taosHashIterate(pTq->pPushMgr, pIter);
1161
    }
1162

wmmhello's avatar
wmmhello 已提交
1163
    taosHashClear(pTq->pPushMgr);
1164
  }
1165

1166 1167
  // unlock
  taosWUnLockLatch(&pTq->lock);
1168
  return 0;
L
Liu Jicong 已提交
1169 1170
}

L
Liu Jicong 已提交
1171 1172
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1173 1174 1175 1176

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1177 1178
  if (taskId == WAL_READ_TASKS_ID) {  // all tasks are extracted submit data from the wal
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1179
    return 0;
1180
  }
1181

1182
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1183 1184 1185 1186 1187
  if (pTask != NULL) {
    if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
      tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId,
              pTask->id.idStr, pTask->chkInfo.version);
      streamProcessRunReq(pTask);
1188
    } else {
1189
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
1190
    }
1191 1192 1193 1194 1195 1196 1197

    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    tqStartStreamTasks(pTq);
    return 0;
  } else {
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
    return -1;
L
Liu Jicong 已提交
1198
  }
L
Liu Jicong 已提交
1199 1200
}

L
Liu Jicong 已提交
1201
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
1202 1203 1204 1205 1206
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamDispatchReq req;
  SDecoder           decoder;
L
Liu Jicong 已提交
1207
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1208
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1209

1210
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1211
  if (pTask) {
1212
    SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
L
Liu Jicong 已提交
1213
    streamProcessDispatchReq(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1214
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1215
    return 0;
1216
  } else {
L
liuyao 已提交
1217
    tDeleteStreamDispatchReq(&req);
1218
    return -1;
L
Liu Jicong 已提交
1219
  }
L
Liu Jicong 已提交
1220 1221
}

L
Liu Jicong 已提交
1222 1223
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1224
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1225
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1226
  tqDebug("recv dispatch rsp, code:%x", pMsg->code);
L
Liu Jicong 已提交
1227
  if (pTask) {
1228
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1229
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1230
    return 0;
1231 1232
  } else {
    return -1;
L
Liu Jicong 已提交
1233
  }
L
Liu Jicong 已提交
1234
}
L
Liu Jicong 已提交
1235

1236
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1237
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1238
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1239
  return 0;
L
Liu Jicong 已提交
1240
}
L
Liu Jicong 已提交
1241 1242 1243 1244 1245 1246 1247

int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1248
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1249
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1250
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1251
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1252
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1253
  if (pTask) {
1254
    SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
L
Liu Jicong 已提交
1255
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1256
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1257
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1258
    return 0;
L
Liu Jicong 已提交
1259
  } else {
L
liuyao 已提交
1260
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1261
    return -1;
L
Liu Jicong 已提交
1262 1263 1264 1265 1266 1267 1268
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1269

1270 1271 1272 1273 1274 1275
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1276 1277 1278

  SStreamDispatchReq req;
  SDecoder           decoder;
1279
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1280 1281
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1282
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1283 1284
    goto FAIL;
  }
L
Liu Jicong 已提交
1285
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1286

L
Liu Jicong 已提交
1287
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1288

L
Liu Jicong 已提交
1289
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1290
  if (pTask) {
1291
    SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
L
Liu Jicong 已提交
1292
    streamProcessDispatchReq(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1293
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1294 1295
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1296
    return 0;
5
54liuyao 已提交
1297 1298
  } else {
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1299
  }
L
Liu Jicong 已提交
1300

1301 1302
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1303
FAIL:
1304 1305 1306 1307
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1308
    SRpcMsg rsp = { .code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info };
1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1325
  SRpcMsg rsp = {
1326
      .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
1327
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1328
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1329 1330
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1331
  return -1;
L
Liu Jicong 已提交
1332
}
L
Liu Jicong 已提交
1333

1334
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1335

1336
int32_t tqStartStreamTasks(STQ* pTq) {
1337
  int32_t      vgId = TD_VID(pTq->pVnode);
1338
  SStreamMeta* pMeta = pTq->pStreamMeta;
1339

1340
  taosWLockLatch(&pMeta->lock);
1341

1342
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
1343 1344 1345 1346 1347 1348
  if (numOfTasks == 0) {
    tqInfo("vgId:%d no stream tasks exists", vgId);
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1349
  pMeta->walScanCounter += 1;
1350

1351 1352
  if (pMeta->walScanCounter > 1) {
    tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
1353 1354 1355 1356
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1357 1358 1359 1360
  SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
  if (pRunReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tqError("vgId:%d failed restore stream tasks, code:%s", vgId, terrstr(terrno));
1361
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
1362 1363 1364
    return -1;
  }

H
Haojun Liao 已提交
1365
  tqDebug("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks);
1366 1367
  pRunReq->head.vgId = vgId;
  pRunReq->streamId = 0;
1368
  pRunReq->taskId = WAL_READ_TASKS_ID;
1369 1370 1371

  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
  tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
1372
  taosWUnLockLatch(&pTq->pStreamMeta->lock);
1373 1374 1375

  return 0;
}