tq.c 43.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
#define WAL_READ_TASKS_ID       (-1)
22

23 24
static int32_t tqInitialize(STQ* pTq);

L
Liu Jicong 已提交
25
int32_t tqInit() {
L
Liu Jicong 已提交
26 27 28 29 30 31
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

32 33 34 35 36 37
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
38 39 40
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
41
    atomic_store_8(&tqMgmt.inited, 1);
42
  }
43

L
Liu Jicong 已提交
44 45
  return 0;
}
L
Liu Jicong 已提交
46

47
void tqCleanUp() {
L
Liu Jicong 已提交
48 49 50 51 52 53 54 55
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
56
    streamCleanUp();
L
Liu Jicong 已提交
57 58
    atomic_store_8(&tqMgmt.inited, 0);
  }
59
}
L
Liu Jicong 已提交
60

61
static void destroyTqHandle(void* data) {
62 63 64
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
65
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
66
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
67
    tqCloseReader(pData->execHandle.pTqReader);
68 69
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
70
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
71
    walCloseReader(pData->pWalReader);
72
    tqCloseReader(pData->execHandle.pTqReader);
73
  }
74 75 76 77
  if(pData->msg != NULL) {
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
H
Haojun Liao 已提交
78
  }
L
Liu Jicong 已提交
79 80
}

81 82 83 84 85
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
86
STQ* tqOpen(const char* path, SVnode* pVnode) {
87
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
88
  if (pTq == NULL) {
S
Shengliang Guan 已提交
89
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
90 91
    return NULL;
  }
92

93
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
94
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
95
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
96

97
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
98
  taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
99

100
  taosInitRWLatch(&pTq->lock);
101
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
102

103
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
104
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
105

106 107 108 109 110 111 112
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
113 114 115
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
116
  if (tqMetaOpen(pTq) < 0) {
117
    return -1;
118 119
  }

L
Liu Jicong 已提交
120 121
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
122
    return -1;
123 124
  }

125
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
126
  if (pTq->pStreamMeta == NULL) {
127
    return -1;
L
Liu Jicong 已提交
128 129
  }

130 131
  // the version is kept in task's meta data
  // todo check if this version is required or not
132 133
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
134 135
  }

136
  return 0;
L
Liu Jicong 已提交
137
}
L
Liu Jicong 已提交
138

L
Liu Jicong 已提交
139
void tqClose(STQ* pTq) {
140 141
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
142
  }
143 144 145 146 147 148 149 150 151

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
152
}
L
Liu Jicong 已提交
153

H
Haojun Liao 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
      tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
167 168 169
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
170
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
171 172
      int64_t el = taosGetTimestampMs() - st;
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 "ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
173 174 175 176 177 178
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
  }
}

H
Haojun Liao 已提交
179 180
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
181 182
  int32_t len = 0;
  int32_t code = 0;
H
Haojun Liao 已提交
183 184 185 186 187 188

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
    tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
189 190 191 192 193 194 195 196 197 198 199

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

H
Haojun Liao 已提交
200 201 202
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
203 204 205 206 207

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
H
Haojun Liao 已提交
208 209 210 211

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
    tEncodeSMqDataRsp(&encoder, pRsp);
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
X
Xiaoyu Wang 已提交
212
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
H
Haojun Liao 已提交
213 214
  }

L
Liu Jicong 已提交
215 216 217
  tEncoderClear(&encoder);

  SRpcMsg rsp = {
H
Haojun Liao 已提交
218
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
219 220 221 222 223 224 225 226 227
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };

  tmsgSendRsp(&rsp);
  return 0;
}

228 229 230 231 232 233
int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle) {
  SMqDataRsp dataRsp = {0};
  dataRsp.head.consumerId = pHandle->consumerId;
  dataRsp.head.epoch = pHandle->epoch;
  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
  doSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP);
L
Liu Jicong 已提交
234

wmmhello's avatar
wmmhello 已提交
235 236
  char buf1[80] = {0};
  char buf2[80] = {0};
237 238
  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
H
Haojun Liao 已提交
239
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
240
          TD_VID(pTq->pVnode), dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
L
Liu Jicong 已提交
241 242 243
  return 0;
}

H
Haojun Liao 已提交
244 245
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type) {
  doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type);
246 247 248 249 250 251

  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);

X
Xiaoyu Wang 已提交
252
  tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
H
Haojun Liao 已提交
253
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
H
Haojun Liao 已提交
254

255 256 257
  return 0;
}

258
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
259
  STqOffset offset = {0};
X
Xiaoyu Wang 已提交
260
  int32_t   vgId = TD_VID(pTq->pVnode);
261

X
Xiaoyu Wang 已提交
262 263
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
264 265 266
  if (tDecodeSTqOffset(&decoder, &offset) < 0) {
    return -1;
  }
267

268 269
  tDecoderClear(&decoder);

wmmhello's avatar
wmmhello 已提交
270
  if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
271
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
H
Haojun Liao 已提交
272
            offset.subKey, vgId, offset.val.uid, offset.val.ts);
L
Liu Jicong 已提交
273
  } else if (offset.val.type == TMQ_OFFSET__LOG) {
X
Xiaoyu Wang 已提交
274 275
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey, vgId,
            offset.val.version);
276
    if (offset.val.version + 1 == sversion) {
277 278
      offset.val.version += 1;
    }
279
  } else {
280 281
    tqError("invalid commit offset type:%d", offset.val.type);
    return -1;
282
  }
283 284 285 286

  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(&offset, pSavedOffset)) {
    return 0;  // no need to update the offset value
287 288
  }

289
  // save the new offset value
290 291
  if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
    return -1;
292
  }
293 294

  if (offset.val.type == TMQ_OFFSET__LOG) {
295
    STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
296 297
    if (pHandle && (walRefVer(pHandle->pRef, offset.val.version) < 0)) {
      return -1;
298 299 300
    }
  }

301 302 303
  return 0;
}

L
Liu Jicong 已提交
304
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
305
  void* pIter = NULL;
306

L
Liu Jicong 已提交
307
  while (1) {
308
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
309 310 311 312
    if (pIter == NULL) {
      break;
    }

313
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
314

L
Liu Jicong 已提交
315 316
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
317
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
318 319
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
320
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
321 322 323 324 325
          return -1;
        }
      }
    }
  }
326

L
Liu Jicong 已提交
327 328 329
  return 0;
}

330
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
331
  SMqPollReq req = {0};
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

351
  // 2. check re-balance status
352
  taosRLockLatch(&pTq->lock);
353 354 355 356
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
357
    taosRUnLockLatch(&pTq->lock);
358 359
    return -1;
  }
360
  taosRUnLockLatch(&pTq->lock);
361

362
  // 3. update the epoch value
363
  taosWLockLatch(&pTq->lock);
H
Haojun Liao 已提交
364 365
  int32_t savedEpoch = pHandle->epoch;
  if (savedEpoch < reqEpoch) {
X
Xiaoyu Wang 已提交
366 367
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
            reqEpoch);
368
    pHandle->epoch = reqEpoch;
H
Haojun Liao 已提交
369
  }
370
  taosWUnLockLatch(&pTq->lock);
371 372 373

  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
H
Haojun Liao 已提交
374 375
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
376

377
  return tqExtractDataForMq(pTq, pHandle, &req, pMsg);
378 379
}

380
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
381
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
382
  int32_t        vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
383

L
Liu Jicong 已提交
384
  tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
385
  int32_t code = 0;
L
Liu Jicong 已提交
386

L
Liu Jicong 已提交
387 388
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
X
Xiaoyu Wang 已提交
389
    // walCloseRef(pHandle->pWalReader->pWal, pHandle->pRef->refId);
L
Liu Jicong 已提交
390 391 392
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
393 394 395 396 397 398

    while (tqIsHandleExecuting(pHandle)) {
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
      taosMsleep(5);
    }

L
Liu Jicong 已提交
399 400 401 402
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
403
  }
404

L
Liu Jicong 已提交
405 406
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
407
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
408
  }
L
Liu Jicong 已提交
409

L
Liu Jicong 已提交
410
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
411
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
412
  }
L
Liu Jicong 已提交
413
  return 0;
L
Liu Jicong 已提交
414 415
}

416
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
417 418
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
419
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
420
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
421 422 423 424
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
425 426 427 428 429
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
430 431 432 433 434 435
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

436
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
437 438 439 440 441 442 443 444 445 446 447
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

448
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
449
  int ret = 0;
L
Liu Jicong 已提交
450
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
451
  tDecodeSMqRebVgReq(msg, &req);
L
Liu Jicong 已提交
452

453 454 455
  SVnode* pVnode = pTq->pVnode;
  int32_t vgId = TD_VID(pVnode);

456
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
457
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
458

459
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
460
  if (pHandle == NULL) {
L
Liu Jicong 已提交
461
    if (req.oldConsumerId != -1) {
462
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
463
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
464
    }
465

L
Liu Jicong 已提交
466
    if (req.newConsumerId == -1) {
467
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
468
      goto end;
L
Liu Jicong 已提交
469
    }
470

L
Liu Jicong 已提交
471 472
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
473

H
Haojun Liao 已提交
474
    uint64_t oldConsumerId = pHandle->consumerId;
L
Liu Jicong 已提交
475 476 477
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
478

L
Liu Jicong 已提交
479
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
480
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
481

482
    // TODO version should be assigned and refed during preprocess
483
    SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
484
    if (pRef == NULL) {
485 486
      ret = -1;
      goto end;
487
    }
H
Haojun Liao 已提交
488

489 490
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
491

492
    SReadHandle handle = {
493
        .meta = pVnode->pMeta, .vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
wmmhello's avatar
wmmhello 已提交
494
    pHandle->snapshotVer = ver;
495

L
Liu Jicong 已提交
496
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
497
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
498
      req.qmsg = NULL;
499

X
Xiaoyu Wang 已提交
500 501
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId,
                                                          &pHandle->execHandle.numOfCols, req.newConsumerId);
L
Liu Jicong 已提交
502
      void* scanner = NULL;
503
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
504
      pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
L
Liu Jicong 已提交
505
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
506
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
507
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
508

L
Liu Jicong 已提交
509
      pHandle->execHandle.execDb.pFilterOutTbUid =
L
Liu Jicong 已提交
510
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
511 512
      buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
513

514
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
515
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
516
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
517 518
      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
519
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
520 521
      vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
522 523
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
524
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
L
Liu Jicong 已提交
525
      }
526
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
527
      tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
L
Liu Jicong 已提交
528
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
529

L
Liu Jicong 已提交
530 531
      buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
532
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
533
    }
H
Haojun Liao 已提交
534

535
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
536 537
    tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
            pHandle->consumerId, oldConsumerId);
538 539
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
540
  } else {
541 542 543
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
      atomic_add_fetch_32(&pHandle->epoch, 1);
544

545 546 547
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
548
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
wmmhello's avatar
wmmhello 已提交
549
      atomic_store_32(&pHandle->epoch, 0);
550 551 552 553 554 555
    }
    // kill executing task
    qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
    if (pTaskInfo != NULL) {
      qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
    }
556

557 558 559
    taosWLockLatch(&pTq->lock);
    // remove if it has been register in the push manager, and return one empty block to consumer
    tqUnregisterPushHandle(pTq, pHandle);
560

561

562 563
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
      qStreamCloseTsdbReader(pTaskInfo);
L
Liu Jicong 已提交
564
    }
565 566 567 568

    taosWUnLockLatch(&pTq->lock);
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
569
  }
L
Liu Jicong 已提交
570

571
end:
H
Haojun Liao 已提交
572
  taosMemoryFree(req.qmsg);
573
  return ret;
L
Liu Jicong 已提交
574
}
575

576
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
577
  int32_t vgId = TD_VID(pTq->pVnode);
578
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
579
  pTask->refCnt = 1;
580
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
L
Liu Jicong 已提交
581 582
  pTask->inputQueue = streamQueueOpen();
  pTask->outputQueue = streamQueueOpen();
L
Liu Jicong 已提交
583 584

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
585
    return -1;
L
Liu Jicong 已提交
586 587
  }

L
Liu Jicong 已提交
588 589
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
590
  pTask->pMsgCb = &pTq->pVnode->msgCb;
591
  pTask->pMeta = pTq->pStreamMeta;
592
  pTask->chkInfo.version = ver;
593
  pTask->chkInfo.currentVer = ver;
594

595
  // expand executor
596
  pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL;
597

598
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
599
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
600 601 602 603
    if (pTask->pState == NULL) {
      return -1;
    }

604
    SReadHandle handle = {
605
        .meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
606

607 608
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
609 610
      return -1;
    }
611

612
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
613
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
614 615 616
    if (pTask->pState == NULL) {
      return -1;
    }
617

618 619 620 621 622
    int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo);
    SReadHandle mgHandle = { .vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};

    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
623 624
      return -1;
    }
L
Liu Jicong 已提交
625
  }
L
Liu Jicong 已提交
626 627

  // sink
L
Liu Jicong 已提交
628
  /*pTask->ahandle = pTq->pVnode;*/
629
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
630
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
631
    pTask->smaSink.smaSink = smaHandleRes;
632
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
633
    pTask->tbSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
634
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
L
Liu Jicong 已提交
635

X
Xiaoyu Wang 已提交
636
    int32_t   ver1 = 1;
5
54liuyao 已提交
637
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
638
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
639
    if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
640
      ver1 = info.skmVer;
5
54liuyao 已提交
641
    }
L
Liu Jicong 已提交
642

643 644
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
wmmhello's avatar
wmmhello 已提交
645
    if(pTask->tbSink.pTSchema == NULL) {
wmmhello's avatar
wmmhello 已提交
646
      return -1;
wmmhello's avatar
wmmhello 已提交
647
    }
L
Liu Jicong 已提交
648
  }
649

650
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
651 652
    SWalFilterCond cond = {.deleteMsg = 1};
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
653 654
  }

655
  streamSetupTrigger(pTask);
656

657
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
658
         pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
659 660 661

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
662
  return 0;
L
Liu Jicong 已提交
663
}
L
Liu Jicong 已提交
664

665 666 667 668 669 670
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
  char*               msgStr = pMsg->pCont;
  char*               msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t             msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamTaskCheckReq req;
  SDecoder            decoder;
X
Xiaoyu Wang 已提交
671
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
672 673 674 675 676 677 678 679 680 681 682 683
  tDecodeSStreamTaskCheckReq(&decoder, &req);
  tDecoderClear(&decoder);
  int32_t             taskId = req.downstreamTaskId;
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
684

L
Liu Jicong 已提交
685
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
686

687
  if (pTask) {
688
    rsp.status = streamTaskCheckStatus(pTask);
689 690 691 692 693 694
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

    tqDebug("tq recv task check req(reqId:0x%" PRIx64
            ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d",
            rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus, rsp.upstreamTaskId,
            rsp.upstreamNodeId, rsp.status);
695 696
  } else {
    rsp.status = 0;
697 698 699 700
    tqDebug("tq recv task check(taskId:%d not built yet) req(reqId:0x%" PRIx64
            ") %d at node %d, check req from task %d at node %d, rsp status %d",
            taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            rsp.status);
701 702 703 704 705 706 707
  }

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
  tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
  if (code < 0) {
L
Liu Jicong 已提交
708
    tqError("unable to encode rsp %d", __LINE__);
L
Liu Jicong 已提交
709
    return -1;
710
  }
L
Liu Jicong 已提交
711

712 713 714 715 716 717 718 719
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
  tEncoderClear(&encoder);

720
  SRpcMsg rspMsg = { .code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info };
721 722 723 724
  tmsgSendRsp(&rspMsg);
  return 0;
}

725
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
726 727 728 729 730 731 732 733 734 735 736
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

737
  tDecoderClear(&decoder);
738
  tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
739 740
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

L
Liu Jicong 已提交
741
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
742 743 744 745
  if (pTask == NULL) {
    return -1;
  }

746
  code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
L
Liu Jicong 已提交
747 748
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
749 750
}

751
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
752 753 754 755 756
  int32_t code;
#if 0
  code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
  if (code < 0) return code;
#endif
5
54liuyao 已提交
757 758 759
  if (tsDisableStream) {
    return 0;
  }
760 761 762 763 764 765

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
766

767 768
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
769
  code = tDecodeStreamTask(&decoder, pTask);
770 771 772 773 774
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
775

776 777
  tDecoderClear(&decoder);

778
  // 2.save task, use the newest commit version as the initial start version of stream task.
779
  taosWLockLatch(&pTq->pStreamMeta->lock);
780
  code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
781
  if (code < 0) {
782 783
    tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr,
            streamMetaGetNumOfTasks(pTq->pStreamMeta));
784
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
785 786 787
    return -1;
  }

788 789
  taosWUnLockLatch(&pTq->pStreamMeta->lock);

790 791
  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
792
    streamTaskCheckDownstream(pTask, sversion);
793 794
  }

795 796
  tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", TD_VID(pTq->pVnode),
          pTask->id.idStr, pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta));
797 798 799
  return 0;
}

L
Liu Jicong 已提交
800 801 802 803 804
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
  int32_t code;
  char*   msg = pMsg->pCont;
  int32_t msgLen = pMsg->contLen;

805
  SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
L
Liu Jicong 已提交
806
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
807 808 809 810 811
  if (pTask == NULL) {
    return -1;
  }

  // check param
812
  int64_t fillVer1 = pTask->chkInfo.version;
813
  if (fillVer1 <= 0) {
L
Liu Jicong 已提交
814
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
815 816 817 818
    return -1;
  }

  // do recovery step 1
H
Haojun Liao 已提交
819 820
  tqDebug("s-task:%s start recover step 1 scan", pTask->id.idStr);
  int64_t st = taosGetTimestampMs();
821

H
Haojun Liao 已提交
822
  streamSourceRecoverScanStep1(pTask);
823
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
824 825 826 827
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

H
Haojun Liao 已提交
828 829 830
  double el = (taosGetTimestampMs() - st) / 1000.0;
  tqDebug("s-task:%s recover step 1 ended, elapsed time:%.2fs", pTask->id.idStr, el);

831 832 833 834
  // build msg to launch next step
  SStreamRecoverStep2Req req;
  code = streamBuildSourceRecover2Req(pTask, &req);
  if (code < 0) {
L
Liu Jicong 已提交
835
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
836 837 838
    return -1;
  }

L
Liu Jicong 已提交
839
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
840

841
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
842 843 844
    return 0;
  }

845
  // serialize msg
L
Liu Jicong 已提交
846 847 848 849 850 851 852 853
  int32_t len = sizeof(SStreamRecoverStep1Req);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);
854 855

  // dispatch msg
H
Haojun Liao 已提交
856
  tqDebug("s-task:%s start recover block stage", pTask->id.idStr);
857

H
Haojun Liao 已提交
858 859
  SRpcMsg rpcMsg = {
      .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
860 861 862 863
  tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
  return 0;
}

864
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
H
Haojun Liao 已提交
865 866
  int32_t code = 0;

867
  SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
L
Liu Jicong 已提交
868
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
869 870 871 872 873
  if (pTask == NULL) {
    return -1;
  }

  // do recovery step 2
874
  code = streamSourceRecoverScanStep2(pTask, sversion);
875
  if (code < 0) {
L
Liu Jicong 已提交
876
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
877 878 879
    return -1;
  }

880
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
881 882 883 884
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

885 886 887
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
888
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
889 890 891 892 893 894
    return -1;
  }

  // set status normal
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
895
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
896 897 898 899 900 901
    return -1;
  }

  // dispatch recover finish req to all related downstream task
  code = streamDispatchRecoverFinishReq(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
902
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
903 904 905
    return -1;
  }

L
Liu Jicong 已提交
906 907 908
  atomic_store_8(&pTask->fillHistory, 0);
  streamMetaSaveTask(pTq->pStreamMeta, pTask);

L
Liu Jicong 已提交
909 910
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);

911 912 913
  return 0;
}

L
Liu Jicong 已提交
914 915 916
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
917 918

  // deserialize
919 920 921
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
922
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
923 924 925
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

926
  // find task
L
Liu Jicong 已提交
927
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
928 929 930
  if (pTask == NULL) {
    return -1;
  }
931
  // do process request
932
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
933
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
934 935 936
    return -1;
  }

L
Liu Jicong 已提交
937
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
938
  return 0;
L
Liu Jicong 已提交
939
}
L
Liu Jicong 已提交
940

L
Liu Jicong 已提交
941 942 943 944 945
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tDecoderInit(pCoder, (uint8_t*)pData, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
  if (numOfTables == 0 || pRes->affectedRows == 0) {
    taosArrayDestroy(pRes->uidList);
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, numOfTables);
  pDelBlock->info.rows = numOfTables;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < numOfTables; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataSetVal(pUidCol, i, (const char*)pUid, false);

    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

  taosArrayDestroy(pRes->uidList);

  int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
  *pRef = 1;

  *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
  if (pRefBlock == NULL) {
    taosMemoryFree(pRef);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
  (*pRefBlock)->pBlock = pDelBlock;
  (*pRefBlock)->dataRef = pRef;
  atomic_add_fetch_32((*pRefBlock)->dataRef, 1);

  return TSDB_CODE_SUCCESS;
}

int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
L
Liu Jicong 已提交
1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
  bool        failed = false;
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    failed = true;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t sz = taosArrayGetSize(pRes->uidList);
L
Liu Jicong 已提交
1021
  if (sz == 0 || pRes->affectedRows == 0) {
L
Liu Jicong 已提交
1022 1023 1024
    taosArrayDestroy(pRes->uidList);
    return 0;
  }
1025

L
Liu Jicong 已提交
1026 1027 1028 1029 1030 1031 1032 1033
  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, sz);
  pDelBlock->info.rows = sz;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < sz; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
1034
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
L
Liu Jicong 已提交
1035
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
1036
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
L
Liu Jicong 已提交
1037 1038 1039
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
1040
    colDataSetVal(pUidCol, i, (const char*)pUid, false);
L
Liu Jicong 已提交
1041

1042 1043 1044
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
L
Liu Jicong 已提交
1045 1046
  }

L
Liu Jicong 已提交
1047 1048
  taosArrayDestroy(pRes->uidList);

L
Liu Jicong 已提交
1049 1050 1051
  int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
  *pRef = 1;

1052 1053
  taosWLockLatch(&pTq->pStreamMeta->lock);

L
Liu Jicong 已提交
1054 1055 1056
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
1057 1058 1059 1060
    if (pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
1061
    SStreamTask* pTask = *(SStreamTask**)pIter;
1062 1063 1064
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
      continue;
    }
L
Liu Jicong 已提交
1065

1066
    qDebug("s-task:%s delete req enqueue, ver: %" PRId64, pTask->id.idStr, ver);
L
Liu Jicong 已提交
1067

L
Liu Jicong 已提交
1068
    if (!failed) {
S
Shengliang Guan 已提交
1069
      SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1070 1071 1072 1073 1074
      pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
      pRefBlock->pBlock = pDelBlock;
      pRefBlock->dataRef = pRef;
      atomic_add_fetch_32(pRefBlock->dataRef, 1);

1075
      if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
L
Liu Jicong 已提交
1076
        atomic_sub_fetch_32(pRef, 1);
L
Liu Jicong 已提交
1077
        taosFreeQitem(pRefBlock);
L
Liu Jicong 已提交
1078 1079
        continue;
      }
L
Liu Jicong 已提交
1080

L
Liu Jicong 已提交
1081
      if (streamSchedExec(pTask) < 0) {
1082
        qError("s-task:%s stream task launch failed", pTask->id.idStr);
L
Liu Jicong 已提交
1083 1084
        continue;
      }
L
Liu Jicong 已提交
1085

L
Liu Jicong 已提交
1086 1087 1088 1089
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1090

1091 1092
  taosWUnLockLatch(&pTq->pStreamMeta->lock);

L
Liu Jicong 已提交
1093 1094
  int32_t ref = atomic_sub_fetch_32(pRef, 1);
  if (ref == 0) {
L
Liu Jicong 已提交
1095
    blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1096 1097 1098 1099
    taosMemoryFree(pRef);
  }

#if 0
S
Shengliang Guan 已提交
1100
    SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1101 1102 1103 1104 1105 1106 1107 1108
    pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
    pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
    SSDataBlock block = {0};
    assignOneDataBlock(&block, pDelBlock);
    block.info.type = STREAM_DELETE_DATA;
    taosArrayPush(pStreamBlock->blocks, &block);

    if (!failed) {
1109
      if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
1110
        qError("stream task input del failed, task id %d", pTask->id.taskId);
L
Liu Jicong 已提交
1111 1112 1113 1114
        continue;
      }

      if (streamSchedExec(pTask) < 0) {
1115
        qError("stream task launch failed, task id %d", pTask->id.taskId);
L
Liu Jicong 已提交
1116 1117 1118 1119 1120 1121
        continue;
      }
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1122
  blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1123
#endif
L
Liu Jicong 已提交
1124 1125 1126
  return 0;
}

1127 1128 1129 1130
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
  int32_t vgId = TD_VID(pTq->pVnode);

  taosWLockLatch(&pTq->lock);
1131 1132 1133 1134 1135

  if (taosHashGetSize(pTq->pPushMgr) > 0) {
    void* pIter = taosHashIterate(pTq->pPushMgr, NULL);

    while (pIter) {
wmmhello's avatar
wmmhello 已提交
1136
      STqHandle* pHandle = *(STqHandle**)pIter;
1137 1138 1139
      tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);

      if (ASSERT(pHandle->msg != NULL)) {
wmmhello's avatar
wmmhello 已提交
1140 1141 1142 1143 1144 1145 1146 1147
        tqError("pHandle->msg should not be null");
        break;
      }else{
        SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
        tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
        taosMemoryFree(pHandle->msg);
        pHandle->msg = NULL;
      }
1148

wmmhello's avatar
wmmhello 已提交
1149
      pIter = taosHashIterate(pTq->pPushMgr, pIter);
1150
    }
1151

wmmhello's avatar
wmmhello 已提交
1152
    taosHashClear(pTq->pPushMgr);
1153
  }
1154

1155 1156
  // unlock
  taosWUnLockLatch(&pTq->lock);
1157
  return 0;
L
Liu Jicong 已提交
1158 1159
}

L
Liu Jicong 已提交
1160 1161
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1162 1163 1164 1165

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1166 1167
  if (taskId == WAL_READ_TASKS_ID) {  // all tasks are extracted submit data from the wal
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1168
    return 0;
1169
  }
1170

1171
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1172 1173 1174 1175 1176
  if (pTask != NULL) {
    if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
      tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId,
              pTask->id.idStr, pTask->chkInfo.version);
      streamProcessRunReq(pTask);
1177
    } else {
1178
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
1179
    }
1180 1181 1182 1183 1184 1185 1186

    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    tqStartStreamTasks(pTq);
    return 0;
  } else {
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
    return -1;
L
Liu Jicong 已提交
1187
  }
L
Liu Jicong 已提交
1188 1189
}

L
Liu Jicong 已提交
1190
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
1191 1192 1193 1194 1195
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamDispatchReq req;
  SDecoder           decoder;
L
Liu Jicong 已提交
1196
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1197
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1198

1199
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1200
  if (pTask) {
1201
    SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
L
Liu Jicong 已提交
1202
    streamProcessDispatchReq(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1203
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1204
    return 0;
1205
  } else {
L
liuyao 已提交
1206
    tDeleteStreamDispatchReq(&req);
1207
    return -1;
L
Liu Jicong 已提交
1208
  }
L
Liu Jicong 已提交
1209 1210
}

L
Liu Jicong 已提交
1211 1212
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1213
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1214
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1215
  tqDebug("recv dispatch rsp, code:%x", pMsg->code);
L
Liu Jicong 已提交
1216
  if (pTask) {
1217
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1218
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1219
    return 0;
1220 1221
  } else {
    return -1;
L
Liu Jicong 已提交
1222
  }
L
Liu Jicong 已提交
1223
}
L
Liu Jicong 已提交
1224

1225
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1226
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1227
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1228
  return 0;
L
Liu Jicong 已提交
1229
}
L
Liu Jicong 已提交
1230 1231 1232 1233 1234 1235 1236

int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1237
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1238
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1239
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1240
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1241
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1242
  if (pTask) {
1243
    SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
L
Liu Jicong 已提交
1244
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1245
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1246
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1247
    return 0;
L
Liu Jicong 已提交
1248
  } else {
L
liuyao 已提交
1249
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1250
    return -1;
L
Liu Jicong 已提交
1251 1252 1253 1254 1255 1256 1257
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1258

1259 1260 1261 1262 1263 1264
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1265 1266 1267

  SStreamDispatchReq req;
  SDecoder           decoder;
1268
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1269 1270
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1271
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1272 1273
    goto FAIL;
  }
L
Liu Jicong 已提交
1274
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1275

L
Liu Jicong 已提交
1276
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1277

L
Liu Jicong 已提交
1278
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1279
  if (pTask) {
1280
    SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
L
Liu Jicong 已提交
1281
    streamProcessDispatchReq(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1282
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1283 1284
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1285
    return 0;
5
54liuyao 已提交
1286 1287
  } else {
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1288
  }
L
Liu Jicong 已提交
1289

1290 1291
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1292
FAIL:
1293 1294 1295 1296
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1297
    SRpcMsg rsp = { .code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info };
1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1314
  SRpcMsg rsp = {
1315
      .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
1316
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1317
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1318 1319
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1320
  return -1;
L
Liu Jicong 已提交
1321
}
L
Liu Jicong 已提交
1322

1323
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1324

1325
int32_t tqStartStreamTasks(STQ* pTq) {
1326
  int32_t      vgId = TD_VID(pTq->pVnode);
1327
  SStreamMeta* pMeta = pTq->pStreamMeta;
1328

1329
  taosWLockLatch(&pMeta->lock);
1330

1331
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
1332 1333 1334 1335 1336 1337
  if (numOfTasks == 0) {
    tqInfo("vgId:%d no stream tasks exists", vgId);
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1338
  pMeta->walScanCounter += 1;
1339

1340 1341
  if (pMeta->walScanCounter > 1) {
    tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
1342 1343 1344 1345
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1346 1347 1348 1349
  SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
  if (pRunReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tqError("vgId:%d failed restore stream tasks, code:%s", vgId, terrstr(terrno));
1350
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
1351 1352 1353
    return -1;
  }

H
Haojun Liao 已提交
1354
  tqDebug("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks);
1355 1356
  pRunReq->head.vgId = vgId;
  pRunReq->streamId = 0;
1357
  pRunReq->taskId = WAL_READ_TASKS_ID;
1358 1359 1360

  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
  tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
1361
  taosWUnLockLatch(&pTq->pStreamMeta->lock);
1362 1363 1364

  return 0;
}