tq.c 43.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
#define WAL_READ_TASKS_ID       (-1)
22

23 24
static int32_t tqInitialize(STQ* pTq);

wmmhello's avatar
wmmhello 已提交
25 26 27
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_EXEC;}
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_IDLE;}
wmmhello's avatar
wmmhello 已提交
28

L
Liu Jicong 已提交
29
int32_t tqInit() {
L
Liu Jicong 已提交
30 31 32 33 34 35
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

36 37 38 39 40 41
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
42 43 44
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
45
    atomic_store_8(&tqMgmt.inited, 1);
46
  }
47

L
Liu Jicong 已提交
48 49
  return 0;
}
L
Liu Jicong 已提交
50

51
void tqCleanUp() {
L
Liu Jicong 已提交
52 53 54 55 56 57 58 59
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
60
    streamCleanUp();
L
Liu Jicong 已提交
61 62
    atomic_store_8(&tqMgmt.inited, 0);
  }
63
}
L
Liu Jicong 已提交
64

65
static void destroyTqHandle(void* data) {
66 67
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
wmmhello's avatar
wmmhello 已提交
68

69
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
70
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
71
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
72
    tqCloseReader(pData->execHandle.pTqReader);
73 74
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
75
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
76
    walCloseReader(pData->pWalReader);
77
    tqCloseReader(pData->execHandle.pTqReader);
78
  }
79 80 81 82
  if(pData->msg != NULL) {
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
H
Haojun Liao 已提交
83
  }
L
Liu Jicong 已提交
84 85
}

86 87 88 89 90
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
91
STQ* tqOpen(const char* path, SVnode* pVnode) {
92
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
93
  if (pTq == NULL) {
S
Shengliang Guan 已提交
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
95 96
    return NULL;
  }
97

98
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
99
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
100
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
101

102
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
103
  taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
104

105
  taosInitRWLatch(&pTq->lock);
106
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
107

108
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
109
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
110

111 112 113 114 115 116 117
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
118 119 120
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
121
  if (tqMetaOpen(pTq) < 0) {
122
    return -1;
123 124
  }

L
Liu Jicong 已提交
125 126
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
127
    return -1;
128 129
  }

130
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
131
  if (pTq->pStreamMeta == NULL) {
132
    return -1;
L
Liu Jicong 已提交
133 134
  }

135 136
  // the version is kept in task's meta data
  // todo check if this version is required or not
137 138
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
139 140
  }

141
  return 0;
L
Liu Jicong 已提交
142
}
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
void tqClose(STQ* pTq) {
145 146
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
147
  }
148 149 150 151 152 153 154 155 156

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
157
}
L
Liu Jicong 已提交
158

H
Haojun Liao 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
      tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
172 173 174
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
175
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
176 177
      int64_t el = taosGetTimestampMs() - st;
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 "ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
178 179 180 181 182 183
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
  }
}

H
Haojun Liao 已提交
184 185
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
186 187
  int32_t len = 0;
  int32_t code = 0;
H
Haojun Liao 已提交
188 189 190 191 192 193

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
    tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
194 195 196 197 198 199 200 201 202 203 204

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

H
Haojun Liao 已提交
205 206 207
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
208 209 210 211 212

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
H
Haojun Liao 已提交
213 214 215 216

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
    tEncodeSMqDataRsp(&encoder, pRsp);
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
X
Xiaoyu Wang 已提交
217
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
H
Haojun Liao 已提交
218 219
  }

L
Liu Jicong 已提交
220 221 222
  tEncoderClear(&encoder);

  SRpcMsg rsp = {
H
Haojun Liao 已提交
223
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
224 225 226 227 228 229 230 231 232
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };

  tmsgSendRsp(&rsp);
  return 0;
}

233 234 235 236 237 238
int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle) {
  SMqDataRsp dataRsp = {0};
  dataRsp.head.consumerId = pHandle->consumerId;
  dataRsp.head.epoch = pHandle->epoch;
  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
  doSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP);
L
Liu Jicong 已提交
239

wmmhello's avatar
wmmhello 已提交
240 241
  char buf1[80] = {0};
  char buf2[80] = {0};
242 243
  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
H
Haojun Liao 已提交
244
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
245
          TD_VID(pTq->pVnode), dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
L
Liu Jicong 已提交
246 247 248
  return 0;
}

H
Haojun Liao 已提交
249 250
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type) {
  doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type);
251 252 253 254 255 256

  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);

X
Xiaoyu Wang 已提交
257
  tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
H
Haojun Liao 已提交
258
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
H
Haojun Liao 已提交
259

260 261 262
  return 0;
}

263
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
264
  STqOffset offset = {0};
X
Xiaoyu Wang 已提交
265
  int32_t   vgId = TD_VID(pTq->pVnode);
266

X
Xiaoyu Wang 已提交
267 268
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
269 270 271
  if (tDecodeSTqOffset(&decoder, &offset) < 0) {
    return -1;
  }
272

273 274
  tDecoderClear(&decoder);

wmmhello's avatar
wmmhello 已提交
275
  if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
276
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
H
Haojun Liao 已提交
277
            offset.subKey, vgId, offset.val.uid, offset.val.ts);
L
Liu Jicong 已提交
278
  } else if (offset.val.type == TMQ_OFFSET__LOG) {
X
Xiaoyu Wang 已提交
279 280
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey, vgId,
            offset.val.version);
281
    if (offset.val.version + 1 == sversion) {
282 283
      offset.val.version += 1;
    }
284
  } else {
285 286
    tqError("invalid commit offset type:%d", offset.val.type);
    return -1;
287
  }
288 289 290 291

  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(&offset, pSavedOffset)) {
    return 0;  // no need to update the offset value
292 293
  }

294
  // save the new offset value
295 296
  if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
    return -1;
297
  }
298 299

  if (offset.val.type == TMQ_OFFSET__LOG) {
300
    STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
wmmhello's avatar
wmmhello 已提交
301
    if (pHandle && (walSetRefVer(pHandle->pRef, offset.val.version) < 0)) {
302
      return -1;
303 304 305
    }
  }

306 307 308
  return 0;
}

L
Liu Jicong 已提交
309
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
310
  void* pIter = NULL;
311

L
Liu Jicong 已提交
312
  while (1) {
313
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
314 315 316 317
    if (pIter == NULL) {
      break;
    }

318
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
319

L
Liu Jicong 已提交
320 321
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
322
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
323 324
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
325
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
326 327 328 329 330
          return -1;
        }
      }
    }
  }
331

L
Liu Jicong 已提交
332 333 334
  return 0;
}

335
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
336
  SMqPollReq req = {0};
wmmhello's avatar
wmmhello 已提交
337
  int code = 0;
338 339 340 341 342 343 344 345 346 347
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);
wmmhello's avatar
wmmhello 已提交
348
  STqHandle*   pHandle = NULL;
349

wmmhello's avatar
wmmhello 已提交
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
  while (1) {
    taosWLockLatch(&pTq->lock);
    // 1. find handle
    pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
    if (pHandle == NULL) {
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_MSG;
      taosWUnLockLatch(&pTq->lock);
      return -1;
    }

    bool exec = tqIsHandleExec(pHandle);
    if(!exec) {
      tqSetHandleExec(pHandle);
      taosWUnLockLatch(&pTq->lock);
      break;
    }
wmmhello's avatar
wmmhello 已提交
367
    taosWUnLockLatch(&pTq->lock);
368

wmmhello's avatar
wmmhello 已提交
369
    tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
370
    taosMsleep(10);
wmmhello's avatar
wmmhello 已提交
371 372
  }

373
  // 2. check re-balance status
374 375 376 377
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
wmmhello's avatar
wmmhello 已提交
378 379
    code = -1;
    goto end;
380
  }
wmmhello's avatar
wmmhello 已提交
381

wmmhello's avatar
wmmhello 已提交
382
  tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
383

384
  // 3. update the epoch value
H
Haojun Liao 已提交
385 386
  int32_t savedEpoch = pHandle->epoch;
  if (savedEpoch < reqEpoch) {
X
Xiaoyu Wang 已提交
387 388
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
            reqEpoch);
389
    pHandle->epoch = reqEpoch;
H
Haojun Liao 已提交
390
  }
391 392 393

  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
H
Haojun Liao 已提交
394 395
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
396

wmmhello's avatar
wmmhello 已提交
397 398 399
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);

end:
400
  tqSetHandleIdle(pHandle);
wmmhello's avatar
wmmhello 已提交
401
  tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
402
  return code;
403 404
}

405
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
406
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
407
  int32_t        vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
408

L
Liu Jicong 已提交
409
  tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
410
  int32_t code = 0;
L
Liu Jicong 已提交
411

wmmhello's avatar
wmmhello 已提交
412
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
413 414
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
415
    while (tqIsHandleExec(pHandle)) {
wmmhello's avatar
wmmhello 已提交
416 417
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle);
      taosMsleep(10);
418
    }
wmmhello's avatar
wmmhello 已提交
419 420 421
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
L
Liu Jicong 已提交
422 423 424 425
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
426
  }
427

L
Liu Jicong 已提交
428 429
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
430
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
431
  }
L
Liu Jicong 已提交
432

L
Liu Jicong 已提交
433
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
434
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
435
  }
wmmhello's avatar
wmmhello 已提交
436 437
  taosWUnLockLatch(&pTq->lock);

L
Liu Jicong 已提交
438
  return 0;
L
Liu Jicong 已提交
439 440
}

441
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
442 443
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
444
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
445
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
446 447 448 449
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
450 451 452 453 454
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
455 456 457 458 459 460
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

461
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
462 463 464 465 466 467 468 469 470 471 472
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

473
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
474
  int ret = 0;
L
Liu Jicong 已提交
475
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
476
  tDecodeSMqRebVgReq(msg, &req);
L
Liu Jicong 已提交
477

478 479 480
  SVnode* pVnode = pTq->pVnode;
  int32_t vgId = TD_VID(pVnode);

481
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
482
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
483

484
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
485
  if (pHandle == NULL) {
L
Liu Jicong 已提交
486
    if (req.oldConsumerId != -1) {
487
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
488
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
489
    }
490

L
Liu Jicong 已提交
491
    if (req.newConsumerId == -1) {
492
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
493
      goto end;
L
Liu Jicong 已提交
494
    }
495

L
Liu Jicong 已提交
496 497
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
498

H
Haojun Liao 已提交
499
    uint64_t oldConsumerId = pHandle->consumerId;
L
Liu Jicong 已提交
500 501 502
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
503

L
Liu Jicong 已提交
504
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
505
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
506

507
    // TODO version should be assigned and refed during preprocess
508
    SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
509
    if (pRef == NULL) {
510 511
      ret = -1;
      goto end;
512
    }
H
Haojun Liao 已提交
513

514 515
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
516

517
    SReadHandle handle = {
518
        .meta = pVnode->pMeta, .vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
wmmhello's avatar
wmmhello 已提交
519
    pHandle->snapshotVer = ver;
520

L
Liu Jicong 已提交
521
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
522
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
523
      req.qmsg = NULL;
524

X
Xiaoyu Wang 已提交
525 526
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId,
                                                          &pHandle->execHandle.numOfCols, req.newConsumerId);
L
Liu Jicong 已提交
527
      void* scanner = NULL;
528
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
529
      pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
L
Liu Jicong 已提交
530
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
531
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
532
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
533

L
Liu Jicong 已提交
534
      pHandle->execHandle.execDb.pFilterOutTbUid =
wmmhello's avatar
wmmhello 已提交
535
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
536 537
      buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
538

539
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
540
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
541
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
542 543
      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
544
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
545 546
      vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
547 548
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
549
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
L
Liu Jicong 已提交
550
      }
551
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
552
      tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
L
Liu Jicong 已提交
553
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
554

L
Liu Jicong 已提交
555 556
      buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
557
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
558
    }
H
Haojun Liao 已提交
559

560
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
561 562
    tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
            pHandle->consumerId, oldConsumerId);
563 564
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
565
  } else {
566
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
567
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId, req.newConsumerId);
wmmhello's avatar
wmmhello 已提交
568
//      atomic_add_fetch_32(&pHandle->epoch, 1);
569 570 571
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
572
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
wmmhello's avatar
wmmhello 已提交
573
//      atomic_store_32(&pHandle->epoch, 0);
574 575 576 577 578 579 580 581 582 583 584 585 586
      // kill executing task
      qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
      if (pTaskInfo != NULL) {
        qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
      }
      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
        qStreamCloseTsdbReader(pTaskInfo);
      }
      // remove if it has been register in the push manager, and return one empty block to consumer
      taosWLockLatch(&pTq->lock);
      tqUnregisterPushHandle(pTq, pHandle);
      taosWUnLockLatch(&pTq->lock);
      ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
587
    }
L
Liu Jicong 已提交
588
  }
L
Liu Jicong 已提交
589

590
end:
H
Haojun Liao 已提交
591
  taosMemoryFree(req.qmsg);
592
  return ret;
L
Liu Jicong 已提交
593
}
594

595
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
596
  int32_t vgId = TD_VID(pTq->pVnode);
597
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
598
  pTask->refCnt = 1;
599
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
L
Liu Jicong 已提交
600 601
  pTask->inputQueue = streamQueueOpen();
  pTask->outputQueue = streamQueueOpen();
L
Liu Jicong 已提交
602 603

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
604
    return -1;
L
Liu Jicong 已提交
605 606
  }

L
Liu Jicong 已提交
607 608
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
609
  pTask->pMsgCb = &pTq->pVnode->msgCb;
610
  pTask->pMeta = pTq->pStreamMeta;
611
  pTask->chkInfo.version = ver;
612
  pTask->chkInfo.currentVer = ver;
613

614
  // expand executor
615
  pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL;
616

617
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
618
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
619 620 621 622
    if (pTask->pState == NULL) {
      return -1;
    }

623
    SReadHandle handle = {
624
        .meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
625

626 627
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
628 629
      return -1;
    }
630

631
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
632
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
633 634 635
    if (pTask->pState == NULL) {
      return -1;
    }
636

637 638 639 640 641
    int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo);
    SReadHandle mgHandle = { .vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};

    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
642 643
      return -1;
    }
L
Liu Jicong 已提交
644
  }
L
Liu Jicong 已提交
645 646

  // sink
L
Liu Jicong 已提交
647
  /*pTask->ahandle = pTq->pVnode;*/
648
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
649
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
650
    pTask->smaSink.smaSink = smaHandleRes;
651
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
652
    pTask->tbSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
653
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
L
Liu Jicong 已提交
654

X
Xiaoyu Wang 已提交
655
    int32_t   ver1 = 1;
5
54liuyao 已提交
656
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
657
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
658
    if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
659
      ver1 = info.skmVer;
5
54liuyao 已提交
660
    }
L
Liu Jicong 已提交
661

662 663
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
wmmhello's avatar
wmmhello 已提交
664
    if(pTask->tbSink.pTSchema == NULL) {
wmmhello's avatar
wmmhello 已提交
665
      return -1;
wmmhello's avatar
wmmhello 已提交
666
    }
L
Liu Jicong 已提交
667
  }
668

669
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
670 671
    SWalFilterCond cond = {.deleteMsg = 1};
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
672 673
  }

674
  streamSetupTrigger(pTask);
675

676
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
677
         pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
678 679 680

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
681
  return 0;
L
Liu Jicong 已提交
682
}
L
Liu Jicong 已提交
683

684 685 686 687 688 689
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
  char*               msgStr = pMsg->pCont;
  char*               msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t             msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamTaskCheckReq req;
  SDecoder            decoder;
X
Xiaoyu Wang 已提交
690
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
691 692 693 694 695 696 697 698 699 700 701 702
  tDecodeSStreamTaskCheckReq(&decoder, &req);
  tDecoderClear(&decoder);
  int32_t             taskId = req.downstreamTaskId;
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
703

L
Liu Jicong 已提交
704
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
705

706
  if (pTask) {
707
    rsp.status = streamTaskCheckStatus(pTask);
708 709 710 711 712 713
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

    tqDebug("tq recv task check req(reqId:0x%" PRIx64
            ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d",
            rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus, rsp.upstreamTaskId,
            rsp.upstreamNodeId, rsp.status);
714 715
  } else {
    rsp.status = 0;
716 717 718 719
    tqDebug("tq recv task check(taskId:%d not built yet) req(reqId:0x%" PRIx64
            ") %d at node %d, check req from task %d at node %d, rsp status %d",
            taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            rsp.status);
720 721 722 723 724 725 726
  }

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
  tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
  if (code < 0) {
L
Liu Jicong 已提交
727
    tqError("unable to encode rsp %d", __LINE__);
L
Liu Jicong 已提交
728
    return -1;
729
  }
L
Liu Jicong 已提交
730

731 732 733 734 735 736 737 738
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
  tEncoderClear(&encoder);

739
  SRpcMsg rspMsg = { .code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info };
740 741 742 743
  tmsgSendRsp(&rspMsg);
  return 0;
}

744
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
745 746 747 748 749 750 751 752 753 754 755
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

756
  tDecoderClear(&decoder);
757
  tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
758 759
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

L
Liu Jicong 已提交
760
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
761 762 763 764
  if (pTask == NULL) {
    return -1;
  }

765
  code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
L
Liu Jicong 已提交
766 767
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
768 769
}

770
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
771 772 773 774 775
  int32_t code;
#if 0
  code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
  if (code < 0) return code;
#endif
5
54liuyao 已提交
776 777 778
  if (tsDisableStream) {
    return 0;
  }
779 780 781 782 783 784

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
785

786 787
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
788
  code = tDecodeStreamTask(&decoder, pTask);
789 790 791 792 793
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
794

795 796
  tDecoderClear(&decoder);

797
  // 2.save task, use the newest commit version as the initial start version of stream task.
798
  taosWLockLatch(&pTq->pStreamMeta->lock);
799
  code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
800
  if (code < 0) {
801 802
    tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr,
            streamMetaGetNumOfTasks(pTq->pStreamMeta));
803
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
804 805 806
    return -1;
  }

807 808
  taosWUnLockLatch(&pTq->pStreamMeta->lock);

809 810
  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
811
    streamTaskCheckDownstream(pTask, sversion);
812 813
  }

814 815
  tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", TD_VID(pTq->pVnode),
          pTask->id.idStr, pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta));
816 817 818
  return 0;
}

L
Liu Jicong 已提交
819 820 821 822 823
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
  int32_t code;
  char*   msg = pMsg->pCont;
  int32_t msgLen = pMsg->contLen;

824
  SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
L
Liu Jicong 已提交
825
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
826 827 828 829 830
  if (pTask == NULL) {
    return -1;
  }

  // check param
831
  int64_t fillVer1 = pTask->chkInfo.version;
832
  if (fillVer1 <= 0) {
L
Liu Jicong 已提交
833
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
834 835 836 837
    return -1;
  }

  // do recovery step 1
H
Haojun Liao 已提交
838 839
  tqDebug("s-task:%s start recover step 1 scan", pTask->id.idStr);
  int64_t st = taosGetTimestampMs();
840

H
Haojun Liao 已提交
841
  streamSourceRecoverScanStep1(pTask);
842
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
843 844 845 846
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

H
Haojun Liao 已提交
847 848 849
  double el = (taosGetTimestampMs() - st) / 1000.0;
  tqDebug("s-task:%s recover step 1 ended, elapsed time:%.2fs", pTask->id.idStr, el);

850 851 852 853
  // build msg to launch next step
  SStreamRecoverStep2Req req;
  code = streamBuildSourceRecover2Req(pTask, &req);
  if (code < 0) {
L
Liu Jicong 已提交
854
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
855 856 857
    return -1;
  }

L
Liu Jicong 已提交
858
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
859

860
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
861 862 863
    return 0;
  }

864
  // serialize msg
L
Liu Jicong 已提交
865 866 867 868 869 870 871 872
  int32_t len = sizeof(SStreamRecoverStep1Req);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);
873 874

  // dispatch msg
875
  tqDebug("s-task:%s start to recover blocking stage", pTask->id.idStr);
876

H
Haojun Liao 已提交
877 878
  SRpcMsg rpcMsg = {
      .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
879 880 881 882
  tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
  return 0;
}

883
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
H
Haojun Liao 已提交
884 885
  int32_t code = 0;

886
  SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
L
Liu Jicong 已提交
887
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
888 889 890 891 892
  if (pTask == NULL) {
    return -1;
  }

  // do recovery step 2
893
  code = streamSourceRecoverScanStep2(pTask, sversion);
894
  if (code < 0) {
L
Liu Jicong 已提交
895
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
896 897 898
    return -1;
  }

899 900 901
  qDebug("s-task:%s set the start wal offset to be:%"PRId64, pTask->id.idStr, sversion);
  walReaderSeekVer(pTask->exec.pWalReader, sversion);

902
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
903 904 905 906
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

907 908 909
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
910
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
911 912 913 914 915 916
    return -1;
  }

  // set status normal
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
917
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
918 919 920 921 922 923
    return -1;
  }

  // dispatch recover finish req to all related downstream task
  code = streamDispatchRecoverFinishReq(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
924
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
925 926 927
    return -1;
  }

L
Liu Jicong 已提交
928 929 930
  atomic_store_8(&pTask->fillHistory, 0);
  streamMetaSaveTask(pTq->pStreamMeta, pTask);

L
Liu Jicong 已提交
931 932
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);

933 934 935
  return 0;
}

L
Liu Jicong 已提交
936 937 938
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
939 940

  // deserialize
941 942 943
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
944
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
945 946 947
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

948
  // find task
L
Liu Jicong 已提交
949
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
950 951 952
  if (pTask == NULL) {
    return -1;
  }
953
  // do process request
954
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
955
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
956 957 958
    return -1;
  }

L
Liu Jicong 已提交
959
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
960
  return 0;
L
Liu Jicong 已提交
961
}
L
Liu Jicong 已提交
962

L
Liu Jicong 已提交
963 964 965 966 967
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

968 969 970 971
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

H
Haojun Liao 已提交
972 973
  *pRefBlock = NULL;

974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tDecoderInit(pCoder, (uint8_t*)pData, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
  if (numOfTables == 0 || pRes->affectedRows == 0) {
    taosArrayDestroy(pRes->uidList);
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, numOfTables);
  pDelBlock->info.rows = numOfTables;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < numOfTables; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataSetVal(pUidCol, i, (const char*)pUid, false);

    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

  taosArrayDestroy(pRes->uidList);
  *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
  if (pRefBlock == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
  (*pRefBlock)->pBlock = pDelBlock;
  return TSDB_CODE_SUCCESS;
}

int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
L
Liu Jicong 已提交
1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036
  bool        failed = false;
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    failed = true;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t sz = taosArrayGetSize(pRes->uidList);
L
Liu Jicong 已提交
1037
  if (sz == 0 || pRes->affectedRows == 0) {
L
Liu Jicong 已提交
1038 1039 1040
    taosArrayDestroy(pRes->uidList);
    return 0;
  }
1041

L
Liu Jicong 已提交
1042 1043 1044 1045 1046 1047 1048 1049
  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, sz);
  pDelBlock->info.rows = sz;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < sz; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
1050
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
L
Liu Jicong 已提交
1051
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
1052
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
L
Liu Jicong 已提交
1053 1054 1055
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
1056
    colDataSetVal(pUidCol, i, (const char*)pUid, false);
L
Liu Jicong 已提交
1057

1058 1059 1060
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
L
Liu Jicong 已提交
1061 1062
  }

L
Liu Jicong 已提交
1063 1064
  taosArrayDestroy(pRes->uidList);

L
Liu Jicong 已提交
1065 1066 1067
  int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
  *pRef = 1;

1068 1069
  taosWLockLatch(&pTq->pStreamMeta->lock);

L
Liu Jicong 已提交
1070 1071 1072
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
1073 1074 1075 1076
    if (pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
1077
    SStreamTask* pTask = *(SStreamTask**)pIter;
1078 1079 1080
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
      continue;
    }
L
Liu Jicong 已提交
1081

1082
    qDebug("s-task:%s delete req enqueue, ver: %" PRId64, pTask->id.idStr, ver);
L
Liu Jicong 已提交
1083

L
Liu Jicong 已提交
1084
    if (!failed) {
S
Shengliang Guan 已提交
1085
      SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1086 1087 1088
      pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
      pRefBlock->pBlock = pDelBlock;

1089
      if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
L
Liu Jicong 已提交
1090
        atomic_sub_fetch_32(pRef, 1);
L
Liu Jicong 已提交
1091
        taosFreeQitem(pRefBlock);
L
Liu Jicong 已提交
1092 1093
        continue;
      }
L
Liu Jicong 已提交
1094

L
Liu Jicong 已提交
1095
      if (streamSchedExec(pTask) < 0) {
1096
        qError("s-task:%s stream task launch failed", pTask->id.idStr);
L
Liu Jicong 已提交
1097 1098
        continue;
      }
L
Liu Jicong 已提交
1099

L
Liu Jicong 已提交
1100 1101 1102 1103
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1104

1105 1106
  taosWUnLockLatch(&pTq->pStreamMeta->lock);

L
Liu Jicong 已提交
1107 1108
  int32_t ref = atomic_sub_fetch_32(pRef, 1);
  if (ref == 0) {
L
Liu Jicong 已提交
1109
    blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1110 1111 1112 1113
    taosMemoryFree(pRef);
  }

#if 0
S
Shengliang Guan 已提交
1114
    SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1115 1116 1117 1118 1119 1120 1121 1122
    pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
    pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
    SSDataBlock block = {0};
    assignOneDataBlock(&block, pDelBlock);
    block.info.type = STREAM_DELETE_DATA;
    taosArrayPush(pStreamBlock->blocks, &block);

    if (!failed) {
1123
      if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
1124
        qError("stream task input del failed, task id %d", pTask->id.taskId);
L
Liu Jicong 已提交
1125 1126 1127 1128
        continue;
      }

      if (streamSchedExec(pTask) < 0) {
1129
        qError("stream task launch failed, task id %d", pTask->id.taskId);
L
Liu Jicong 已提交
1130 1131 1132 1133 1134 1135
        continue;
      }
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1136
  blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1137
#endif
L
Liu Jicong 已提交
1138 1139 1140
  return 0;
}

1141 1142 1143 1144
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
  int32_t vgId = TD_VID(pTq->pVnode);

  taosWLockLatch(&pTq->lock);
1145 1146 1147 1148 1149

  if (taosHashGetSize(pTq->pPushMgr) > 0) {
    void* pIter = taosHashIterate(pTq->pPushMgr, NULL);

    while (pIter) {
wmmhello's avatar
wmmhello 已提交
1150
      STqHandle* pHandle = *(STqHandle**)pIter;
1151 1152 1153
      tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);

      if (ASSERT(pHandle->msg != NULL)) {
wmmhello's avatar
wmmhello 已提交
1154 1155 1156 1157 1158 1159 1160 1161
        tqError("pHandle->msg should not be null");
        break;
      }else{
        SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
        tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
        taosMemoryFree(pHandle->msg);
        pHandle->msg = NULL;
      }
1162

wmmhello's avatar
wmmhello 已提交
1163
      pIter = taosHashIterate(pTq->pPushMgr, pIter);
1164
    }
1165

wmmhello's avatar
wmmhello 已提交
1166
    taosHashClear(pTq->pPushMgr);
1167
  }
1168

1169 1170
  // unlock
  taosWUnLockLatch(&pTq->lock);
1171
  return 0;
L
Liu Jicong 已提交
1172 1173
}

L
Liu Jicong 已提交
1174 1175
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1176 1177 1178 1179

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1180 1181
  if (taskId == WAL_READ_TASKS_ID) {  // all tasks are extracted submit data from the wal
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1182
    return 0;
1183
  }
1184

1185
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1186 1187 1188 1189 1190
  if (pTask != NULL) {
    if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
      tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId,
              pTask->id.idStr, pTask->chkInfo.version);
      streamProcessRunReq(pTask);
1191
    } else {
1192
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
1193
    }
1194 1195 1196 1197 1198 1199 1200

    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    tqStartStreamTasks(pTq);
    return 0;
  } else {
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
    return -1;
L
Liu Jicong 已提交
1201
  }
L
Liu Jicong 已提交
1202 1203
}

L
Liu Jicong 已提交
1204
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
1205 1206 1207 1208 1209
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamDispatchReq req;
  SDecoder           decoder;
L
Liu Jicong 已提交
1210
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1211
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1212

1213
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1214
  if (pTask) {
1215
    SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
L
Liu Jicong 已提交
1216
    streamProcessDispatchReq(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1217
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1218
    return 0;
1219
  } else {
L
liuyao 已提交
1220
    tDeleteStreamDispatchReq(&req);
1221
    return -1;
L
Liu Jicong 已提交
1222
  }
L
Liu Jicong 已提交
1223 1224
}

L
Liu Jicong 已提交
1225 1226
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1227
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1228
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1229
  tqDebug("recv dispatch rsp, code:%x", pMsg->code);
L
Liu Jicong 已提交
1230
  if (pTask) {
1231
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1232
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1233
    return 0;
1234 1235
  } else {
    return -1;
L
Liu Jicong 已提交
1236
  }
L
Liu Jicong 已提交
1237
}
L
Liu Jicong 已提交
1238

1239
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1240
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1241
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1242
  return 0;
L
Liu Jicong 已提交
1243
}
L
Liu Jicong 已提交
1244 1245 1246 1247 1248 1249 1250

int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1251
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1252
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1253
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1254
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1255
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1256
  if (pTask) {
1257
    SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
L
Liu Jicong 已提交
1258
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1259
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1260
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1261
    return 0;
L
Liu Jicong 已提交
1262
  } else {
L
liuyao 已提交
1263
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1264
    return -1;
L
Liu Jicong 已提交
1265 1266 1267 1268 1269 1270 1271
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1272

1273 1274 1275 1276 1277 1278
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1279 1280 1281

  SStreamDispatchReq req;
  SDecoder           decoder;
1282
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1283 1284
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1285
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1286 1287
    goto FAIL;
  }
L
Liu Jicong 已提交
1288
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1289

L
Liu Jicong 已提交
1290
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1291

L
Liu Jicong 已提交
1292
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1293
  if (pTask) {
1294
    SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
L
Liu Jicong 已提交
1295
    streamProcessDispatchReq(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1296
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1297 1298
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1299
    return 0;
5
54liuyao 已提交
1300 1301
  } else {
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1302
  }
L
Liu Jicong 已提交
1303

1304 1305
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1306
FAIL:
1307 1308 1309 1310
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1311
    SRpcMsg rsp = { .code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info };
1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1328
  SRpcMsg rsp = {
1329
      .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
1330
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1331
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1332 1333
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1334
  return -1;
L
Liu Jicong 已提交
1335
}
L
Liu Jicong 已提交
1336

1337
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1338

1339
int32_t tqStartStreamTasks(STQ* pTq) {
1340
  int32_t      vgId = TD_VID(pTq->pVnode);
1341
  SStreamMeta* pMeta = pTq->pStreamMeta;
1342

1343
  taosWLockLatch(&pMeta->lock);
1344

1345
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
1346 1347 1348 1349 1350 1351
  if (numOfTasks == 0) {
    tqInfo("vgId:%d no stream tasks exists", vgId);
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1352
  pMeta->walScanCounter += 1;
1353

1354 1355
  if (pMeta->walScanCounter > 1) {
    tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
1356 1357 1358 1359
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1360 1361 1362 1363
  SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
  if (pRunReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tqError("vgId:%d failed restore stream tasks, code:%s", vgId, terrstr(terrno));
1364
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
1365 1366 1367
    return -1;
  }

H
Haojun Liao 已提交
1368
  tqDebug("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks);
1369 1370
  pRunReq->head.vgId = vgId;
  pRunReq->streamId = 0;
1371
  pRunReq->taskId = WAL_READ_TASKS_ID;
1372 1373 1374

  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
  tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
1375
  taosWUnLockLatch(&pTq->pStreamMeta->lock);
1376 1377 1378

  return 0;
}