tq.c 49.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
#define WAL_READ_TASKS_ID (-1)
22

23 24
static int32_t tqInitialize(STQ* pTq);

wmmhello's avatar
wmmhello 已提交
25 26 27
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_EXEC;}
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_IDLE;}
wmmhello's avatar
wmmhello 已提交
28

L
Liu Jicong 已提交
29
int32_t tqInit() {
L
Liu Jicong 已提交
30 31 32 33 34 35
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

36 37 38 39 40 41
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
42 43 44
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
45
    atomic_store_8(&tqMgmt.inited, 1);
46
  }
47

L
Liu Jicong 已提交
48 49
  return 0;
}
L
Liu Jicong 已提交
50

51
void tqCleanUp() {
L
Liu Jicong 已提交
52 53 54 55 56 57 58 59
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
60
    streamCleanUp();
L
Liu Jicong 已提交
61 62
    atomic_store_8(&tqMgmt.inited, 0);
  }
63
}
L
Liu Jicong 已提交
64

65
static void destroyTqHandle(void* data) {
66 67
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
wmmhello's avatar
wmmhello 已提交
68

69
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
70
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
71
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
72
    tqReaderClose(pData->execHandle.pTqReader);
73 74
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
75
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
76
    walCloseReader(pData->pWalReader);
77
    tqReaderClose(pData->execHandle.pTqReader);
78 79
    taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
    nodesDestroyNode(pData->execHandle.execTb.node);
80
  }
81 82 83 84
  if(pData->msg != NULL) {
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
H
Haojun Liao 已提交
85
  }
L
Liu Jicong 已提交
86 87
}

88 89 90 91 92
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
93
STQ* tqOpen(const char* path, SVnode* pVnode) {
94
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
95
  if (pTq == NULL) {
S
Shengliang Guan 已提交
96
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
97 98
    return NULL;
  }
99

100
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
101
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
102
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
103

104
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
105
  taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
106

107
  taosInitRWLatch(&pTq->lock);
108
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
109

110
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
111
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
112

113 114 115 116 117 118 119
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
120 121 122
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
123
  if (tqMetaOpen(pTq) < 0) {
124
    return -1;
125 126
  }

L
Liu Jicong 已提交
127 128
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
129
    return -1;
130 131
  }

132
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
133
  if (pTq->pStreamMeta == NULL) {
134
    return -1;
L
Liu Jicong 已提交
135 136
  }

137 138
  // the version is kept in task's meta data
  // todo check if this version is required or not
139 140
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
141 142
  }

143
  return 0;
L
Liu Jicong 已提交
144
}
L
Liu Jicong 已提交
145

L
Liu Jicong 已提交
146
void tqClose(STQ* pTq) {
147 148
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
149
  }
150 151 152 153 154 155 156 157 158

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
159
}
L
Liu Jicong 已提交
160

H
Haojun Liao 已提交
161 162 163 164 165 166 167 168 169 170 171 172 173
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
      tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
174 175 176
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
177
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
178
      int64_t el = taosGetTimestampMs() - st;
H
Haojun Liao 已提交
179
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
180 181 182 183 184 185
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
  }
}

H
Haojun Liao 已提交
186 187
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
188 189
  int32_t len = 0;
  int32_t code = 0;
H
Haojun Liao 已提交
190 191

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
192
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
H
Haojun Liao 已提交
193 194 195
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
196 197 198 199 200 201 202 203 204 205 206

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

H
Haojun Liao 已提交
207 208 209
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
210 211 212 213 214

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
H
Haojun Liao 已提交
215 216

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
217
    tEncodeMqDataRsp(&encoder, pRsp);
H
Haojun Liao 已提交
218
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
X
Xiaoyu Wang 已提交
219
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
H
Haojun Liao 已提交
220 221
  }

L
Liu Jicong 已提交
222 223 224
  tEncoderClear(&encoder);

  SRpcMsg rsp = {
H
Haojun Liao 已提交
225
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
226 227 228 229 230 231 232 233 234
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };

  tmsgSendRsp(&rsp);
  return 0;
}

H
Haojun Liao 已提交
235
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
236 237 238 239
  SMqDataRsp dataRsp = {0};
  dataRsp.head.consumerId = pHandle->consumerId;
  dataRsp.head.epoch = pHandle->epoch;
  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
240 241

  int64_t sver = 0, ever = 0;
H
Haojun Liao 已提交
242 243
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
  tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, ever);
L
Liu Jicong 已提交
244

wmmhello's avatar
wmmhello 已提交
245 246
  char buf1[80] = {0};
  char buf2[80] = {0};
247 248
  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
H
Haojun Liao 已提交
249
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
H
Haojun Liao 已提交
250
          vgId, dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
L
Liu Jicong 已提交
251 252 253
  return 0;
}

254 255 256 257 258 259
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
                      int32_t type, int32_t vgId) {
  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
260 261 262 263 264 265

  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);

X
Xiaoyu Wang 已提交
266
  tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
267
          vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
H
Haojun Liao 已提交
268

269 270 271
  return 0;
}

272
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
273 274
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
275

X
Xiaoyu Wang 已提交
276 277
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
278
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
279 280
    return -1;
  }
281

282 283
  tDecoderClear(&decoder);

284 285 286
  STqOffset* pOffset = &vgOffset.offset;

  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
287
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
288 289 290 291 292 293
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
            pOffset->val.version);
    if (pOffset->val.version + 1 == sversion) {
      pOffset->val.version += 1;
294
    }
295
  } else {
296
    tqError("invalid commit offset type:%d", pOffset->val.type);
297
    return -1;
298
  }
299

300 301
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
302
    tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
303
            vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
304
    return 0;  // no need to update the offset value
305 306
  }

307
  // save the new offset value
308
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
309
    return -1;
310
  }
311

312 313 314
  return 0;
}

315
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
316 317
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
318 319 320

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
321
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
H
Haojun Liao 已提交
322
    tqError("vgId:%d failed to decode seek msg", vgId);
323 324 325 326 327
    return -1;
  }

  tDecoderClear(&decoder);

H
Haojun Liao 已提交
328 329 330
  tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
          vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);

331 332 333
  STqOffset* pOffset = &vgOffset.offset;
  if (pOffset->val.type != TMQ_OFFSET__LOG) {
    tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
334 335 336
    return -1;
  }

337 338 339 340 341 342
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
  if (pHandle == NULL) {
    tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId,
        pOffset->subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
343 344
  }

345 346 347 348 349 350 351 352
  // 2. check consumer-vg assignment status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != vgOffset.consumerId) {
    tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
353
  }
354 355 356 357 358 359 360 361
  taosRUnLockLatch(&pTq->lock);

  //3. check the offset info
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL) {
    if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
      return 0;  // no need to update the offset value
362
    }
363

364 365 366 367 368 369
    if (pSavedOffset->val.version == pOffset->val.version) {
      tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
              pOffset->val.version, pSavedOffset->val.version);
      return 0;
    }
  }
370 371 372

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
373 374 375 376
  if (pOffset->val.version < sver) {
    pOffset->val.version = sver;
  } else if (pOffset->val.version > ever) {
    pOffset->val.version = ever;
377 378 379
  }

  // save the new offset value
380 381 382 383 384 385
  if (pSavedOffset != NULL) {
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
            pSavedOffset->val.version);
  } else {
    tqDebug("vgId:%d sub:%s seek to:%"PRId64" not saved yet", vgId, pOffset->subKey, pOffset->val.version);
  }
386

387 388
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
    tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
389
    return -1;
390 391
  }

H
Haojun Liao 已提交
392 393 394
  tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
          vgOffset.consumerId, vgOffset.offset.val.version);

395 396 397
  return 0;
}

L
Liu Jicong 已提交
398
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
399
  void* pIter = NULL;
400

L
Liu Jicong 已提交
401
  while (1) {
402
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
403 404 405 406
    if (pIter == NULL) {
      break;
    }

407
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
408

L
Liu Jicong 已提交
409 410
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
411
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
412 413
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
414
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
415 416 417 418 419
          return -1;
        }
      }
    }
  }
420

L
Liu Jicong 已提交
421 422 423
  return 0;
}

424
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
425
  SMqPollReq req = {0};
426 427 428 429 430 431 432 433 434 435 436
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

wmmhello's avatar
wmmhello 已提交
437
  taosWLockLatch(&pTq->lock);
438 439 440 441 442
  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
wmmhello's avatar
wmmhello 已提交
443
    taosWUnLockLatch(&pTq->lock);
444 445 446
    return -1;
  }

wmmhello's avatar
wmmhello 已提交
447 448 449 450 451
  while (tqIsHandleExec(pHandle)) {
    tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", consumerId, vgId, req.subKey);
    taosMsleep(5);
  }

452
  // 2. check re-balance status
453 454
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
455
            consumerId, vgId, req.subKey, pHandle->consumerId);
456
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
457
    taosWUnLockLatch(&pTq->lock);
458 459
    return -1;
  }
460 461
  tqSetHandleExec(pHandle);
  taosWUnLockLatch(&pTq->lock);
462

463
  // 3. update the epoch value
H
Haojun Liao 已提交
464 465
  int32_t savedEpoch = pHandle->epoch;
  if (savedEpoch < reqEpoch) {
X
Xiaoyu Wang 已提交
466 467
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
            reqEpoch);
468
    pHandle->epoch = reqEpoch;
H
Haojun Liao 已提交
469
  }
470 471 472

  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
H
Haojun Liao 已提交
473 474
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
475

476 477 478
  int code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
  tqSetHandleIdle(pHandle);
  return code;
D
dapan1121 已提交
479 480
}

481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  // 2. check re-balance status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, vgId, req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
  }
  taosRUnLockLatch(&pTq->lock);

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);

  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
  if (pOffset != NULL) {
    if (pOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
    dataRsp.rspOffset.version = pOffset->val.version;
  } else {
    if (req.useSnapshot == true) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;

539
    if (reqOffset.type == TMQ_OFFSET__LOG) {
540 541 542 543 544 545
      int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
      if (currentVer == -1) { // not start to read data from wal yet, return req offset directly
        dataRsp.rspOffset.version = reqOffset.version;
      } else {
        dataRsp.rspOffset.version = currentVer;  // return current consume offset value
      }
546 547
    } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
      dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
548 549 550 551 552 553 554 555 556 557 558 559 560
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      dataRsp.rspOffset.version = ever;
    } else {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
              reqOffset.type);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }
  }

  tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
  return 0;
561 562
}

563
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
564
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
565
  int32_t vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
566

567
  tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
568
  int32_t code = 0;
L
Liu Jicong 已提交
569

wmmhello's avatar
wmmhello 已提交
570
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
571 572
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
573
    while (tqIsHandleExec(pHandle)) {
574 575 576
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
      taosMsleep(5);
    }
577

L
Liu Jicong 已提交
578 579 580
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
581

L
Liu Jicong 已提交
582 583 584 585
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
586
  }
587

L
Liu Jicong 已提交
588 589
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
590
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
591
  }
L
Liu Jicong 已提交
592

L
Liu Jicong 已提交
593
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
594
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
595
  }
wmmhello's avatar
wmmhello 已提交
596 597
  taosWUnLockLatch(&pTq->lock);

L
Liu Jicong 已提交
598
  return 0;
L
Liu Jicong 已提交
599 600
}

601
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
602 603
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
604
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
605
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
606 607 608 609
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
610 611 612 613 614
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
615 616 617 618 619 620
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

621
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
622 623 624 625 626 627 628 629 630 631 632
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

633
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
634
  int ret = 0;
L
Liu Jicong 已提交
635
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
636
  tDecodeSMqRebVgReq(msg, &req);
L
Liu Jicong 已提交
637

638 639 640
  SVnode* pVnode = pTq->pVnode;
  int32_t vgId = TD_VID(pVnode);

641
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
642
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
643

wmmhello's avatar
wmmhello 已提交
644
  taosWLockLatch(&pTq->lock);
645
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
646
  if (pHandle == NULL) {
L
Liu Jicong 已提交
647
    if (req.oldConsumerId != -1) {
648
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
649
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
650
    }
651

L
Liu Jicong 已提交
652
    if (req.newConsumerId == -1) {
653
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
654
      goto end;
L
Liu Jicong 已提交
655
    }
656

L
Liu Jicong 已提交
657 658
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
659

L
Liu Jicong 已提交
660 661 662
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
663

L
Liu Jicong 已提交
664
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
665
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
666

667
    // TODO version should be assigned and refed during preprocess
668
    SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
669
    if (pRef == NULL) {
670 671
      ret = -1;
      goto end;
672
    }
H
Haojun Liao 已提交
673

674 675
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
676

677
    SReadHandle handle = {.vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
H
Haojun Liao 已提交
678 679
    initStorageAPI(&handle.api);

wmmhello's avatar
wmmhello 已提交
680
    pHandle->snapshotVer = ver;
681

L
Liu Jicong 已提交
682
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
683
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
684
      req.qmsg = NULL;
685

X
Xiaoyu Wang 已提交
686 687
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId,
                                                          &pHandle->execHandle.numOfCols, req.newConsumerId);
L
Liu Jicong 已提交
688
      void* scanner = NULL;
689
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
690
      pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
L
Liu Jicong 已提交
691
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
692
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
693
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
694

L
Liu Jicong 已提交
695
      pHandle->execHandle.execDb.pFilterOutTbUid =
wmmhello's avatar
wmmhello 已提交
696
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
697
      buildSnapContext(handle.vnode, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
698
                       (SSnapContext**)(&handle.sContext));
699

700
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
701
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
702
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
703
      pHandle->execHandle.execTb.suid = req.suid;
704 705 706
      pHandle->execHandle.execTb.qmsg = req.qmsg;
      req.qmsg = NULL;

707 708 709 710 711 712
      if(strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) {
        if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) {
          tqError("nodesStringToNode error in sub stable, since %s, vgId:%d, subkey:%s consumer:0x%" PRIx64, terrstr(),
                  pVnode->config.vgId, req.subKey, pHandle->consumerId);
          return -1;
        }
713
      }
wmmhello's avatar
wmmhello 已提交
714

715
      SArray* tbUidList = NULL;
wmmhello's avatar
wmmhello 已提交
716
      ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList);
717
      if(ret != TDB_CODE_SUCCESS) {
718
        tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, pHandle->consumerId);
719 720
        taosArrayDestroy(tbUidList);
        goto end;
L
Liu Jicong 已提交
721
      }
722
      tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64, pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid);
723
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
724
      tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
L
Liu Jicong 已提交
725
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
726

727
      buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
L
Liu Jicong 已提交
728
                       (SSnapContext**)(&handle.sContext));
729
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
730
    }
H
Haojun Liao 已提交
731

732
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
733 734
    tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey,
            pHandle->consumerId);
735 736
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
737
  } else {
wmmhello's avatar
wmmhello 已提交
738 739 740 741 742
    while (tqIsHandleExec(pHandle)) {
      tqDebug("sub req vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
      taosMsleep(5);
    }

743 744 745
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
      atomic_add_fetch_32(&pHandle->epoch, 1);
746

747 748 749
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
750
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
wmmhello's avatar
wmmhello 已提交
751
      atomic_store_32(&pHandle->epoch, 0);
752 753 754 755 756 757 758 759
    }
    // kill executing task
    qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
    if (pTaskInfo != NULL) {
      qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
    }
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
      qStreamCloseTsdbReader(pTaskInfo);
L
Liu Jicong 已提交
760
    }
wmmhello's avatar
wmmhello 已提交
761 762
    // remove if it has been register in the push manager, and return one empty block to consumer
    tqUnregisterPushHandle(pTq, pHandle);
763 764
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
765
  }
L
Liu Jicong 已提交
766

767
end:
wmmhello's avatar
wmmhello 已提交
768
  taosWUnLockLatch(&pTq->lock);
H
Haojun Liao 已提交
769
  taosMemoryFree(req.qmsg);
770
  return ret;
L
Liu Jicong 已提交
771
}
772

L
liuyao 已提交
773 774 775 776
void freePtr(void *ptr) {
  taosMemoryFree(*(void**)ptr);
}

777
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
778
  int32_t vgId = TD_VID(pTq->pVnode);
779

780
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
781
  pTask->refCnt = 1;
782
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
dengyihao's avatar
dengyihao 已提交
783 784
  pTask->inputQueue = streamQueueOpen(512 << 10);
  pTask->outputQueue = streamQueueOpen(512 << 10);
L
Liu Jicong 已提交
785 786

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
787
    return -1;
L
Liu Jicong 已提交
788 789
  }

L
Liu Jicong 已提交
790 791
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
792
  pTask->pMsgCb = &pTq->pVnode->msgCb;
793
  pTask->pMeta = pTq->pStreamMeta;
794
  pTask->chkInfo.version = ver;
795
  pTask->chkInfo.currentVer = ver;
796

797
  // expand executor
798
  pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL;
799

800
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
801
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
802 803 804 805
    if (pTask->pState == NULL) {
      return -1;
    }

806
    SReadHandle handle = {.vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
807
    initStorageAPI(&handle.api);
808

809 810
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
811 812
      return -1;
    }
813

814
    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
815
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
816
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
817 818 819
    if (pTask->pState == NULL) {
      return -1;
    }
820

821
    int32_t     numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo);
822 823
    SReadHandle handle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};
    initStorageAPI(&handle.api);
824

825
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
826
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
827 828
      return -1;
    }
829 830

    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
L
Liu Jicong 已提交
831
  }
L
Liu Jicong 已提交
832 833

  // sink
834
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
835
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
836
    pTask->smaSink.smaSink = smaHandleRes;
837
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
838
    pTask->tbSink.vnode = pTq->pVnode;
H
Haojun Liao 已提交
839
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
L
Liu Jicong 已提交
840

X
Xiaoyu Wang 已提交
841
    int32_t   ver1 = 1;
5
54liuyao 已提交
842
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
843
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
844
    if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
845
      ver1 = info.skmVer;
5
54liuyao 已提交
846
    }
L
Liu Jicong 已提交
847

848 849
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
850
    if (pTask->tbSink.pTSchema == NULL) {
wmmhello's avatar
wmmhello 已提交
851
      return -1;
wmmhello's avatar
wmmhello 已提交
852
    }
L
liuyao 已提交
853 854
    pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
    tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
L
Liu Jicong 已提交
855
  }
856

857
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
858
    SWalFilterCond cond = {.deleteMsg = 1};  // delete msg also extract from wal files
859
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
860 861
  }

862
  streamSetupTrigger(pTask);
863

864
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
865
         pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
866 867 868

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
869
  return 0;
L
Liu Jicong 已提交
870
}
L
Liu Jicong 已提交
871

872
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
873 874 875 876
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

877 878
  SStreamTaskCheckReq req;
  SDecoder            decoder;
879

X
Xiaoyu Wang 已提交
880
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
881 882
  tDecodeSStreamTaskCheckReq(&decoder, &req);
  tDecoderClear(&decoder);
883

884 885 886 887 888 889 890 891 892 893
  int32_t             taskId = req.downstreamTaskId;
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
894

L
Liu Jicong 已提交
895
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
896

897
  if (pTask != NULL) {
898
    rsp.status = streamTaskCheckStatus(pTask);
899 900
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

901
    tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64
902
            ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d",
903 904
            pTask->id.idStr, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus,
            rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
905 906
  } else {
    rsp.status = 0;
907 908
    tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
            ") %d at node %d, check req from task:0x%x at node %d, rsp status %d",
909 910
            taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            rsp.status);
911 912 913 914 915
  }

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
916

917 918
  tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
  if (code < 0) {
919
    tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId);
L
Liu Jicong 已提交
920
    return -1;
921
  }
L
Liu Jicong 已提交
922

923 924 925 926 927 928 929 930
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
  tEncoderClear(&encoder);

931
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
932

933 934 935 936
  tmsgSendRsp(&rspMsg);
  return 0;
}

937
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
938 939 940 941 942 943
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
944

945 946 947 948 949
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

950
  tDecoderClear(&decoder);
951
  tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task:0x%x at node %d, status %d",
952 953
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

L
Liu Jicong 已提交
954
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
955
  if (pTask == NULL) {
956 957
    tqError("tq failed to locate the stream task:0x%x vgId:%d, it may have been destroyed", rsp.upstreamTaskId,
            pTq->pStreamMeta->vgId);
958 959 960
    return -1;
  }

961
  code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
L
Liu Jicong 已提交
962 963
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
964 965
}

966
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
967 968 969
  int32_t code = 0;
  int32_t vgId = TD_VID(pTq->pVnode);

5
54liuyao 已提交
970 971 972
  if (tsDisableStream) {
    return 0;
  }
973 974 975 976

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
977 978
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, (int32_t) sizeof(SStreamTask));
979 980
    return -1;
  }
981

982 983
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
984
  code = tDecodeStreamTask(&decoder, pTask);
985 986 987 988 989
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
990

991 992
  tDecoderClear(&decoder);

993
  // 2.save task, use the newest commit version as the initial start version of stream task.
994
  taosWLockLatch(&pTq->pStreamMeta->lock);
995
  code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
996
  int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
997
  if (code < 0) {
998
    tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
999
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
1000 1001 1002
    return -1;
  }

1003 1004
  taosWUnLockLatch(&pTq->pStreamMeta->lock);

1005 1006
  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
1007
    streamTaskCheckDownstream(pTask, sversion);
1008 1009
  }

1010
  tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr,
1011
          pTask->status.taskStatus, numOfTasks);
1012 1013 1014
  return 0;
}

L
Liu Jicong 已提交
1015 1016 1017 1018 1019
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
  int32_t code;
  char*   msg = pMsg->pCont;
  int32_t msgLen = pMsg->contLen;

1020
  SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
L
Liu Jicong 已提交
1021
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1022 1023 1024 1025 1026
  if (pTask == NULL) {
    return -1;
  }

  // check param
1027
  int64_t fillVer1 = pTask->chkInfo.version;
1028
  if (fillVer1 <= 0) {
L
Liu Jicong 已提交
1029
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1030 1031 1032 1033
    return -1;
  }

  // do recovery step 1
H
Haojun Liao 已提交
1034
  tqDebug("s-task:%s start non-blocking recover stage(step 1) scan", pTask->id.idStr);
H
Haojun Liao 已提交
1035
  int64_t st = taosGetTimestampMs();
1036

H
Haojun Liao 已提交
1037
  streamSourceRecoverScanStep1(pTask);
1038
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
H
Haojun Liao 已提交
1039 1040
    tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);

L
Liu Jicong 已提交
1041 1042 1043 1044
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

H
Haojun Liao 已提交
1045
  double el = (taosGetTimestampMs() - st) / 1000.0;
H
Haojun Liao 已提交
1046
  tqDebug("s-task:%s non-blocking recover stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1047

1048 1049 1050 1051
  // build msg to launch next step
  SStreamRecoverStep2Req req;
  code = streamBuildSourceRecover2Req(pTask, &req);
  if (code < 0) {
L
Liu Jicong 已提交
1052
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1053 1054 1055
    return -1;
  }

L
Liu Jicong 已提交
1056
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1057
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1058 1059 1060
    return 0;
  }

1061
  // serialize msg
L
Liu Jicong 已提交
1062 1063 1064 1065
  int32_t len = sizeof(SStreamRecoverStep1Req);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
H
Haojun Liao 已提交
1066
    tqError("s-task:%s failed to prepare the step2 stage, out of memory", pTask->id.idStr);
L
Liu Jicong 已提交
1067 1068 1069 1070
    return -1;
  }

  memcpy(serializedReq, &req, len);
1071 1072

  // dispatch msg
H
Haojun Liao 已提交
1073
  tqDebug("s-task:%s start recover block stage", pTask->id.idStr);
1074

H
Haojun Liao 已提交
1075 1076
  SRpcMsg rpcMsg = {
      .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
1077 1078 1079 1080
  tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
  return 0;
}

1081
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
H
Haojun Liao 已提交
1082 1083
  int32_t code = 0;

1084
  SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
H
Haojun Liao 已提交
1085 1086

  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1087 1088 1089 1090 1091
  if (pTask == NULL) {
    return -1;
  }

  // do recovery step 2
H
Haojun Liao 已提交
1092 1093 1094
  int64_t st = taosGetTimestampMs();
  tqDebug("s-task:%s start step2 recover, ts:%"PRId64, pTask->id.idStr, st);

1095
  code = streamSourceRecoverScanStep2(pTask, sversion);
1096
  if (code < 0) {
L
Liu Jicong 已提交
1097
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1098 1099 1100
    return -1;
  }

1101 1102
  qDebug("s-task:%s set the start wal offset to be:%"PRId64, pTask->id.idStr, sversion);
  walReaderSeekVer(pTask->exec.pWalReader, sversion);
L
liuyao 已提交
1103
  pTask->chkInfo.currentVer = sversion;
1104

1105
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1106 1107 1108 1109
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1110 1111 1112
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1113
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1114 1115 1116 1117
    return -1;
  }

  // set status normal
H
Haojun Liao 已提交
1118
  tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr);
1119 1120
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1121
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1122 1123 1124
    return -1;
  }

H
Haojun Liao 已提交
1125
  double el = (taosGetTimestampMs() - st)/ 1000.0;
H
Haojun Liao 已提交
1126
  tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1127

1128 1129 1130
  // dispatch recover finish req to all related downstream task
  code = streamDispatchRecoverFinishReq(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1131
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1132 1133 1134
    return -1;
  }

L
Liu Jicong 已提交
1135 1136 1137
  atomic_store_8(&pTask->fillHistory, 0);
  streamMetaSaveTask(pTq->pStreamMeta, pTask);

L
Liu Jicong 已提交
1138
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1139 1140 1141
  return 0;
}

L
Liu Jicong 已提交
1142 1143 1144
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1145 1146

  // deserialize
1147 1148 1149
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1150
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1151 1152 1153
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

1154
  // find task
L
Liu Jicong 已提交
1155
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1156 1157 1158
  if (pTask == NULL) {
    return -1;
  }
1159
  // do process request
1160
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
1161
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1162 1163 1164
    return -1;
  }

L
Liu Jicong 已提交
1165
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1166
  return 0;
L
Liu Jicong 已提交
1167
}
L
Liu Jicong 已提交
1168

L
Liu Jicong 已提交
1169 1170 1171 1172 1173
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

1174
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
L
Liu Jicong 已提交
1175 1176 1177
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

H
Haojun Liao 已提交
1178 1179
  *pRefBlock = NULL;

L
Liu Jicong 已提交
1180 1181
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
1182
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1183 1184
  }

1185
  tDecoderInit(pCoder, (uint8_t*)pData, len);
L
Liu Jicong 已提交
1186 1187 1188
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

1189 1190
  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
  if (numOfTables == 0 || pRes->affectedRows == 0) {
L
Liu Jicong 已提交
1191
    taosArrayDestroy(pRes->uidList);
1192
    return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1193
  }
1194

L
Liu Jicong 已提交
1195
  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
1196 1197
  blockDataEnsureCapacity(pDelBlock, numOfTables);
  pDelBlock->info.rows = numOfTables;
L
Liu Jicong 已提交
1198 1199
  pDelBlock->info.version = ver;

1200
  for (int32_t i = 0; i < numOfTables; i++) {
L
Liu Jicong 已提交
1201 1202
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
1203
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
L
Liu Jicong 已提交
1204
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
1205
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
L
Liu Jicong 已提交
1206 1207 1208
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
1209
    colDataSetVal(pUidCol, i, (const char*)pUid, false);
L
Liu Jicong 已提交
1210

1211 1212 1213
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
L
Liu Jicong 已提交
1214 1215
  }

L
Liu Jicong 已提交
1216
  taosArrayDestroy(pRes->uidList);
1217 1218 1219
  *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
  if (pRefBlock == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1220
  }
L
Liu Jicong 已提交
1221

1222 1223 1224
  (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
  (*pRefBlock)->pBlock = pDelBlock;
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1225 1226
}

L
Liu Jicong 已提交
1227 1228
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1229 1230 1231 1232

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1233 1234
  if (taskId == WAL_READ_TASKS_ID) {  // all tasks are extracted submit data from the wal
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1235
    return 0;
1236
  }
1237

1238
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1239 1240 1241 1242 1243
  if (pTask != NULL) {
    if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
      tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId,
              pTask->id.idStr, pTask->chkInfo.version);
      streamProcessRunReq(pTask);
1244
    } else {
L
liuyao 已提交
1245 1246 1247
      if (streamTaskShouldPause(&pTask->status)) {
        atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__INACTIVE);
      }
1248
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
1249
    }
1250 1251 1252 1253 1254 1255 1256

    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    tqStartStreamTasks(pTq);
    return 0;
  } else {
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
    return -1;
L
Liu Jicong 已提交
1257
  }
L
Liu Jicong 已提交
1258 1259
}

L
Liu Jicong 已提交
1260
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
1261 1262 1263
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
1264 1265 1266 1267

  SStreamDispatchReq req = {0};

  SDecoder decoder;
L
Liu Jicong 已提交
1268
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1269
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1270

1271
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1272
  if (pTask) {
1273
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1274
    streamProcessDispatchMsg(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1275
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1276
    return 0;
1277
  } else {
L
liuyao 已提交
1278
    tDeleteStreamDispatchReq(&req);
1279
    return -1;
L
Liu Jicong 已提交
1280
  }
L
Liu Jicong 已提交
1281 1282
}

L
Liu Jicong 已提交
1283 1284
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1285
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1286
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1287
  tqDebug("recv dispatch rsp, code:%x", pMsg->code);
L
Liu Jicong 已提交
1288
  if (pTask) {
1289
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1290
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1291
    return 0;
1292 1293
  } else {
    return -1;
L
Liu Jicong 已提交
1294
  }
L
Liu Jicong 已提交
1295
}
L
Liu Jicong 已提交
1296

1297
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1298
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1299 1300
  tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);

1301
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1302
  return 0;
L
Liu Jicong 已提交
1303
}
L
Liu Jicong 已提交
1304

5
54liuyao 已提交
1305 1306
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
L
liuyao 已提交
1307 1308 1309
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask) {
    tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
L
liuyao 已提交
1310
    atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
L
liuyao 已提交
1311 1312 1313
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  }
5
54liuyao 已提交
1314 1315 1316 1317 1318
  return 0;
}

int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
1319 1320

  int32_t      vgId = pTq->pStreamMeta->vgId;
L
liuyao 已提交
1321 1322
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask) {
L
liuyao 已提交
1323
    atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
1324 1325

    // no lock needs to secure the access of the version
1326 1327 1328 1329 1330 1331
    if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) {
      // discard all the data  when the stream task is suspended.
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
              ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
1332
    } else {  // from the previous paused version and go on
1333 1334
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
1335 1336
    }

L
liuyao 已提交
1337
    if (pTask->taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
L
liuyao 已提交
1338 1339 1340 1341
      tqStartStreamTasks(pTq);
    } else {
      streamSchedExec(pTask);
    }
L
liuyao 已提交
1342
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1343 1344
  } else {
    tqError("vgId:%d failed to find the s-task:0x%x for resume stream task", vgId, pReq->taskId);
L
liuyao 已提交
1345
  }
1346

5
54liuyao 已提交
1347 1348 1349
  return 0;
}

L
Liu Jicong 已提交
1350 1351 1352 1353 1354 1355
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1356
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1357
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1358
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1359
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1360
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1361
  if (pTask) {
1362
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
1363
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1364
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1365
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1366
    return 0;
L
Liu Jicong 已提交
1367
  } else {
L
liuyao 已提交
1368
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1369
    return -1;
L
Liu Jicong 已提交
1370 1371 1372 1373 1374 1375 1376
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1377

1378 1379 1380 1381 1382 1383
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1384 1385 1386

  SStreamDispatchReq req;
  SDecoder           decoder;
1387
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1388 1389
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1390
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1391 1392
    goto FAIL;
  }
L
Liu Jicong 已提交
1393
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1394

L
Liu Jicong 已提交
1395
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1396

L
Liu Jicong 已提交
1397
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1398
  if (pTask) {
1399
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1400
    streamProcessDispatchMsg(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1401
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1402 1403
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1404
    return 0;
5
54liuyao 已提交
1405 1406
  } else {
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1407
  }
L
Liu Jicong 已提交
1408

1409 1410
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1411
FAIL:
1412 1413 1414 1415
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1416
    SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info};
1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1433
  SRpcMsg rsp = {
1434
      .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
1435
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1436
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1437 1438
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1439
  return -1;
L
Liu Jicong 已提交
1440
}
L
Liu Jicong 已提交
1441

1442
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1443

1444
int32_t tqStartStreamTasks(STQ* pTq) {
1445
  int32_t      vgId = TD_VID(pTq->pVnode);
1446
  SStreamMeta* pMeta = pTq->pStreamMeta;
1447

1448
  taosWLockLatch(&pMeta->lock);
1449

1450
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
1451
  if (numOfTasks == 0) {
1452
    tqInfo("vgId:%d no stream tasks exist", vgId);
1453
    taosWUnLockLatch(&pMeta->lock);
1454 1455 1456
    return 0;
  }

1457
  pMeta->walScanCounter += 1;
1458

1459 1460
  if (pMeta->walScanCounter > 1) {
    tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
1461
    taosWUnLockLatch(&pMeta->lock);
1462 1463 1464
    return 0;
  }

1465 1466 1467
  SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
  if (pRunReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1468
    tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
1469
    taosWUnLockLatch(&pMeta->lock);
1470 1471 1472
    return -1;
  }

H
Haojun Liao 已提交
1473
  tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks);
1474 1475
  pRunReq->head.vgId = vgId;
  pRunReq->streamId = 0;
1476
  pRunReq->taskId = WAL_READ_TASKS_ID;
1477 1478 1479

  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
  tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
1480
  taosWUnLockLatch(&pMeta->lock);
1481 1482 1483

  return 0;
}