tq.c 48.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
#define WAL_READ_TASKS_ID (-1)
22

23
static int32_t tqInitialize(STQ* pTq);
dengyihao's avatar
dengyihao 已提交
24

wmmhello's avatar
wmmhello 已提交
25 26 27
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_EXEC;}
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_IDLE;}
wmmhello's avatar
wmmhello 已提交
28

L
Liu Jicong 已提交
29
int32_t tqInit() {
L
Liu Jicong 已提交
30 31 32 33 34 35
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

36 37 38 39 40 41
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
42 43 44
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
45
    atomic_store_8(&tqMgmt.inited, 1);
46
  }
47

L
Liu Jicong 已提交
48 49
  return 0;
}
L
Liu Jicong 已提交
50

51
void tqCleanUp() {
L
Liu Jicong 已提交
52 53 54 55 56 57 58 59
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
60
    streamCleanUp();
L
Liu Jicong 已提交
61 62
    atomic_store_8(&tqMgmt.inited, 0);
  }
63
}
L
Liu Jicong 已提交
64

65
static void destroyTqHandle(void* data) {
66 67
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
wmmhello's avatar
wmmhello 已提交
68

69
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
70
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
71
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
72
    tqCloseReader(pData->execHandle.pTqReader);
73 74
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
75
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
76
    walCloseReader(pData->pWalReader);
77
    tqCloseReader(pData->execHandle.pTqReader);
78
  }
79 80 81 82
  if(pData->msg != NULL) {
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
D
dapan1121 已提交
83
  }
L
Liu Jicong 已提交
84 85
}

86 87 88 89 90
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
91
STQ* tqOpen(const char* path, SVnode* pVnode) {
92
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
93
  if (pTq == NULL) {
S
Shengliang Guan 已提交
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
95 96
    return NULL;
  }
97

98
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
99
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
100
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
101

102
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
103
  taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
104

105
  taosInitRWLatch(&pTq->lock);
106
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
107

108
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
109
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
110

111 112 113 114 115 116 117
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
118 119 120
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
121
  if (tqMetaOpen(pTq) < 0) {
122
    return -1;
123 124
  }

L
Liu Jicong 已提交
125 126
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
127
    return -1;
128 129
  }

130
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
131
  if (pTq->pStreamMeta == NULL) {
132
    return -1;
L
Liu Jicong 已提交
133 134
  }

135 136
  // the version is kept in task's meta data
  // todo check if this version is required or not
137 138
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
139 140
  }

141
  return 0;
L
Liu Jicong 已提交
142
}
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
void tqClose(STQ* pTq) {
145 146
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
147
  }
148 149 150 151 152 153 154 155 156

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
157
}
L
Liu Jicong 已提交
158

H
Haojun Liao 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
      tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
172 173 174
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
175
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
176
      int64_t el = taosGetTimestampMs() - st;
H
Haojun Liao 已提交
177
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
178 179 180 181 182 183
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
  }
}

D
dapan1121 已提交
184 185
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
186 187
  int32_t len = 0;
  int32_t code = 0;
D
dapan1121 已提交
188 189

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
190
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
D
dapan1121 已提交
191 192 193
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
194 195 196 197 198 199 200 201 202 203 204

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

D
dapan1121 已提交
205 206 207
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
208 209 210 211 212 213

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);

D
dapan1121 已提交
214
  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
215
    tEncodeMqDataRsp(&encoder, pRsp);
D
dapan1121 已提交
216
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
X
Xiaoyu Wang 已提交
217
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
dengyihao's avatar
dengyihao 已提交
218
  }
L
Liu Jicong 已提交
219

wmmhello's avatar
wmmhello 已提交
220
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
221 222

  SRpcMsg rsp = {
D
dapan1121 已提交
223
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
224 225 226 227
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
L
Liu Jicong 已提交
228

L
Liu Jicong 已提交
229
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
230 231
  return 0;
}
L
Liu Jicong 已提交
232

H
Haojun Liao 已提交
233
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
234 235 236 237
  SMqDataRsp dataRsp = {0};
  dataRsp.head.consumerId = pHandle->consumerId;
  dataRsp.head.epoch = pHandle->epoch;
  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
238 239

  int64_t sver = 0, ever = 0;
H
Haojun Liao 已提交
240 241
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
  tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, ever);
D
dapan1121 已提交
242 243 244

  char buf1[80] = {0};
  char buf2[80] = {0};
245 246
  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
D
dapan1121 已提交
247
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
H
Haojun Liao 已提交
248
          vgId, dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
L
Liu Jicong 已提交
249 250 251
  return 0;
}

252 253 254 255 256 257
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
                      int32_t type, int32_t vgId) {
  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
258

D
dapan1121 已提交
259 260 261 262
  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);
263

X
Xiaoyu Wang 已提交
264
  tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
265
          vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
266 267 268 269

  return 0;
}

270
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
271 272
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
273

X
Xiaoyu Wang 已提交
274 275
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
276
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
277 278
    return -1;
  }
279

280 281
  tDecoderClear(&decoder);

282 283 284
  STqOffset* pOffset = &vgOffset.offset;

  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
285
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
286 287 288 289 290 291
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
            pOffset->val.version);
    if (pOffset->val.version + 1 == sversion) {
      pOffset->val.version += 1;
292
    }
293
  } else {
294
    tqError("invalid commit offset type:%d", pOffset->val.type);
295
    return -1;
296
  }
297

298 299
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
300
    tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
301
            vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
302
    return 0;  // no need to update the offset value
303 304
  }

305
  // save the new offset value
306
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
307
    return -1;
308
  }
309

310 311 312
  return 0;
}

313
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
314 315
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
316 317 318

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
319
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
H
Haojun Liao 已提交
320
    tqError("vgId:%d failed to decode seek msg", vgId);
321 322 323 324 325
    return -1;
  }

  tDecoderClear(&decoder);

H
Haojun Liao 已提交
326 327 328
  tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
          vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);

329 330 331
  STqOffset* pOffset = &vgOffset.offset;
  if (pOffset->val.type != TMQ_OFFSET__LOG) {
    tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
332 333 334
    return -1;
  }

335 336 337 338 339 340
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
  if (pHandle == NULL) {
    tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId,
        pOffset->subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
341 342
  }

343 344 345 346 347 348 349 350
  // 2. check consumer-vg assignment status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != vgOffset.consumerId) {
    tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
351
  }
352 353 354 355 356 357 358 359 360
  taosRUnLockLatch(&pTq->lock);

  //3. check the offset info
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL) {
    if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
      return 0;  // no need to update the offset value
    }
361

362 363 364 365 366 367
    if (pSavedOffset->val.version == pOffset->val.version) {
      tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
              pOffset->val.version, pSavedOffset->val.version);
      return 0;
    }
  }
368 369 370

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
371 372 373 374
  if (pOffset->val.version < sver) {
    pOffset->val.version = sver;
  } else if (pOffset->val.version > ever) {
    pOffset->val.version = ever;
375 376 377
  }

  // save the new offset value
378 379 380 381 382 383
  if (pSavedOffset != NULL) {
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
            pSavedOffset->val.version);
  } else {
    tqDebug("vgId:%d sub:%s seek to:%"PRId64" not saved yet", vgId, pOffset->subKey, pOffset->val.version);
  }
384

385 386
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
    tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
387 388 389
    return -1;
  }

H
Haojun Liao 已提交
390 391 392
  tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
          vgOffset.consumerId, vgOffset.offset.val.version);

393 394 395
  return 0;
}

L
Liu Jicong 已提交
396
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
397
  void* pIter = NULL;
398

L
Liu Jicong 已提交
399
  while (1) {
400
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
401 402 403 404
    if (pIter == NULL) {
      break;
    }

405
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
406

L
Liu Jicong 已提交
407 408
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
409
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
410 411
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
412
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
413 414 415 416 417
          return -1;
        }
      }
    }
  }
418

L
Liu Jicong 已提交
419 420 421
  return 0;
}

D
dapan1121 已提交
422
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
423
  SMqPollReq req = {0};
D
dapan1121 已提交
424 425 426 427 428 429 430 431 432 433 434
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

wmmhello's avatar
wmmhello 已提交
435
  taosWLockLatch(&pTq->lock);
D
dapan1121 已提交
436 437 438 439 440
  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
wmmhello's avatar
wmmhello 已提交
441
    taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
442 443 444
    return -1;
  }

wmmhello's avatar
wmmhello 已提交
445 446 447 448 449
  while (tqIsHandleExec(pHandle)) {
    tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", consumerId, vgId, req.subKey);
    taosMsleep(5);
  }

D
dapan1121 已提交
450 451 452
  // 2. check re-balance status
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
453
            consumerId, vgId, req.subKey, pHandle->consumerId);
D
dapan1121 已提交
454
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
455
    taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
456 457
    return -1;
  }
458 459
  tqSetHandleExec(pHandle);
  taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
460 461 462 463

  // 3. update the epoch value
  int32_t savedEpoch = pHandle->epoch;
  if (savedEpoch < reqEpoch) {
X
Xiaoyu Wang 已提交
464 465
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
            reqEpoch);
D
dapan1121 已提交
466 467 468 469 470 471 472 473
    pHandle->epoch = reqEpoch;
  }

  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);

474 475 476
  int code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
  tqSetHandleIdle(pHandle);
  return code;
D
dapan1121 已提交
477 478
}

479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  // 2. check re-balance status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, vgId, req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
  }
  taosRUnLockLatch(&pTq->lock);

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

513 514
  int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);

515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);

  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
  if (pOffset != NULL) {
    if (pOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
    dataRsp.rspOffset.version = pOffset->val.version;
  } else {
    if (req.useSnapshot == true) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;

539 540 541 542
    if (reqOffset.type == TMQ_OFFSET__LOG) {
      dataRsp.rspOffset.version = currentVer; // return current consume offset value
    } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
      dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
543 544 545 546 547 548 549 550 551 552 553 554 555 556 557
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      dataRsp.rspOffset.version = ever;
    } else {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
              reqOffset.type);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }
  }

  tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
  return 0;
}

558
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
559
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
560
  int32_t vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
561

562
  tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
563
  int32_t code = 0;
L
Liu Jicong 已提交
564

wmmhello's avatar
wmmhello 已提交
565
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
566 567
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
568
    while (tqIsHandleExec(pHandle)) {
569 570 571
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
      taosMsleep(5);
    }
572

L
Liu Jicong 已提交
573 574 575
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
576

L
Liu Jicong 已提交
577 578 579 580
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
581
  }
582

L
Liu Jicong 已提交
583 584
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
585
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
586
  }
L
Liu Jicong 已提交
587

L
Liu Jicong 已提交
588
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
589
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
590
  }
wmmhello's avatar
wmmhello 已提交
591 592
  taosWUnLockLatch(&pTq->lock);

L
Liu Jicong 已提交
593
  return 0;
L
Liu Jicong 已提交
594 595
}

596
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
597 598
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
599
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
600
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
601 602 603 604
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
605 606 607 608 609
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
610 611 612 613 614 615
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

616
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
617 618 619 620 621 622 623 624 625 626 627
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

628
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
629
  int ret = 0;
L
Liu Jicong 已提交
630
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
631
  tDecodeSMqRebVgReq(msg, &req);
L
Liu Jicong 已提交
632

D
dapan1121 已提交
633 634 635
  SVnode* pVnode = pTq->pVnode;
  int32_t vgId = TD_VID(pVnode);

636
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
D
dapan1121 已提交
637
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
638

wmmhello's avatar
wmmhello 已提交
639
  taosWLockLatch(&pTq->lock);
640
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
641
  if (pHandle == NULL) {
L
Liu Jicong 已提交
642
    if (req.oldConsumerId != -1) {
643
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
644
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
645
    }
D
dapan1121 已提交
646

L
Liu Jicong 已提交
647
    if (req.newConsumerId == -1) {
648
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
649
      goto end;
L
Liu Jicong 已提交
650
    }
D
dapan1121 已提交
651

L
Liu Jicong 已提交
652 653
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
654

H
Haojun Liao 已提交
655
    uint64_t oldConsumerId = pHandle->consumerId;
L
Liu Jicong 已提交
656 657 658
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
659

L
Liu Jicong 已提交
660
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
661
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
662

663
    // TODO version should be assigned and refed during preprocess
D
dapan1121 已提交
664
    SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
665
    if (pRef == NULL) {
666 667
      ret = -1;
      goto end;
668
    }
D
dapan1121 已提交
669

670 671
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
672

673
    SReadHandle handle = {
674
        .meta = pVnode->pMeta, .vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
wmmhello's avatar
wmmhello 已提交
675
    pHandle->snapshotVer = ver;
676

L
Liu Jicong 已提交
677
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
678
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
679
      req.qmsg = NULL;
680

X
Xiaoyu Wang 已提交
681 682
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId,
                                                          &pHandle->execHandle.numOfCols, req.newConsumerId);
L
Liu Jicong 已提交
683
      void* scanner = NULL;
684
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
685
      pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
L
Liu Jicong 已提交
686
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
D
dapan1121 已提交
687
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
688
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
D
dapan1121 已提交
689

L
Liu Jicong 已提交
690
      pHandle->execHandle.execDb.pFilterOutTbUid =
wmmhello's avatar
wmmhello 已提交
691
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
692 693
      buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
694

695
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
696
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
D
dapan1121 已提交
697
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
698 699
      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
700
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
D
dapan1121 已提交
701 702
      vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
703 704
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
D
dapan1121 已提交
705
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
L
Liu Jicong 已提交
706
      }
707
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
708
      tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
L
Liu Jicong 已提交
709
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
710

L
Liu Jicong 已提交
711 712
      buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
713
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
714
    }
H
Haojun Liao 已提交
715

716
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
dengyihao's avatar
dengyihao 已提交
717 718
    tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
            pHandle->consumerId, oldConsumerId);
719 720
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
721
  } else {
wmmhello's avatar
wmmhello 已提交
722 723 724 725 726
    while (tqIsHandleExec(pHandle)) {
      tqDebug("sub req vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
      taosMsleep(5);
    }

D
dapan1121 已提交
727 728 729
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
      atomic_add_fetch_32(&pHandle->epoch, 1);
730

731 732 733
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
734
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
wmmhello's avatar
wmmhello 已提交
735
      atomic_store_32(&pHandle->epoch, 0);
736 737 738 739 740 741 742 743
    }
    // kill executing task
    qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
    if (pTaskInfo != NULL) {
      qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
    }
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
      qStreamCloseTsdbReader(pTaskInfo);
L
Liu Jicong 已提交
744
    }
wmmhello's avatar
wmmhello 已提交
745 746
    // remove if it has been register in the push manager, and return one empty block to consumer
    tqUnregisterPushHandle(pTq, pHandle);
747 748
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
749
  }
L
Liu Jicong 已提交
750

751
end:
wmmhello's avatar
wmmhello 已提交
752
  taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
753
  taosMemoryFree(req.qmsg);
754
  return ret;
L
Liu Jicong 已提交
755
}
756

L
liuyao 已提交
757 758 759 760
void freePtr(void *ptr) {
  taosMemoryFree(*(void**)ptr);
}

761
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
D
dapan1121 已提交
762
  int32_t vgId = TD_VID(pTq->pVnode);
763

764
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
765
  pTask->refCnt = 1;
766
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
dengyihao's avatar
dengyihao 已提交
767 768
  pTask->inputQueue = streamQueueOpen(512 << 10);
  pTask->outputQueue = streamQueueOpen(512 << 10);
L
Liu Jicong 已提交
769 770

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
771
    return -1;
L
Liu Jicong 已提交
772 773
  }

L
Liu Jicong 已提交
774 775
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
776
  pTask->pMsgCb = &pTq->pVnode->msgCb;
777
  pTask->pMeta = pTq->pStreamMeta;
778
  pTask->chkInfo.version = ver;
779
  pTask->chkInfo.currentVer = ver;
780

781
  // expand executor
782
  pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL;
783

784
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
785
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
786 787 788 789
    if (pTask->pState == NULL) {
      return -1;
    }

790
    SReadHandle handle = {
791
        .meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
792

793 794
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
795 796
      return -1;
    }
797

798
    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
799
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
800
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
801 802 803
    if (pTask->pState == NULL) {
      return -1;
    }
804

805 806
    int32_t     numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo);
    SReadHandle mgHandle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};
807 808 809

    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
810 811
      return -1;
    }
812 813

    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
L
Liu Jicong 已提交
814
  }
L
Liu Jicong 已提交
815 816

  // sink
817
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
818
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
819
    pTask->smaSink.smaSink = smaHandleRes;
820
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
821
    pTask->tbSink.vnode = pTq->pVnode;
H
Haojun Liao 已提交
822
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
L
Liu Jicong 已提交
823

X
Xiaoyu Wang 已提交
824
    int32_t   ver1 = 1;
5
54liuyao 已提交
825
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
826
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
827
    if (code == TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
828
      ver1 = info.skmVer;
5
54liuyao 已提交
829
    }
L
Liu Jicong 已提交
830

831 832
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
833
    if (pTask->tbSink.pTSchema == NULL) {
D
dapan1121 已提交
834 835
      return -1;
    }
L
liuyao 已提交
836 837
    pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
    tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
L
Liu Jicong 已提交
838
  }
839

840
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
841
    SWalFilterCond cond = {.deleteMsg = 1};  // delete msg also extract from wal files
842
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
843 844
  }

845
  streamSetupTrigger(pTask);
846

847
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
848
         pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
849 850 851

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
852
  return 0;
L
Liu Jicong 已提交
853
}
L
Liu Jicong 已提交
854

855
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
856 857 858 859
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

860 861
  SStreamTaskCheckReq req;
  SDecoder            decoder;
862

X
Xiaoyu Wang 已提交
863
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
864 865
  tDecodeSStreamTaskCheckReq(&decoder, &req);
  tDecoderClear(&decoder);
866

867 868 869 870 871 872 873 874 875 876
  int32_t             taskId = req.downstreamTaskId;
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
877

L
Liu Jicong 已提交
878
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
879

880
  if (pTask != NULL) {
881
    rsp.status = streamTaskCheckStatus(pTask);
882 883
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

884
    tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64
885
            ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d",
886 887
            pTask->id.idStr, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus,
            rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
888 889
  } else {
    rsp.status = 0;
890 891
    tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
            ") %d at node %d, check req from task:0x%x at node %d, rsp status %d",
892 893
            taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            rsp.status);
894 895 896 897 898
  }

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
899

900 901
  tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
  if (code < 0) {
902
    tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId);
L
Liu Jicong 已提交
903
    return -1;
904
  }
L
Liu Jicong 已提交
905

906 907 908 909 910 911 912 913
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
  tEncoderClear(&encoder);

914
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
915

916 917 918 919
  tmsgSendRsp(&rspMsg);
  return 0;
}

920
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
921 922 923 924 925 926
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
927

928 929 930 931 932
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

933
  tDecoderClear(&decoder);
934
  tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task:0x%x at node %d, status %d",
935 936
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

L
Liu Jicong 已提交
937
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
938
  if (pTask == NULL) {
939 940
    tqError("tq failed to locate the stream task:0x%x vgId:%d, it may have been destroyed", rsp.upstreamTaskId,
            pTq->pStreamMeta->vgId);
941 942 943
    return -1;
  }

944
  code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
L
Liu Jicong 已提交
945 946
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
947 948
}

949
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
950 951 952
  int32_t code = 0;
  int32_t vgId = TD_VID(pTq->pVnode);

5
54liuyao 已提交
953 954 955
  if (tsDisableStream) {
    return 0;
  }
956 957 958 959

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
960 961
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, (int32_t) sizeof(SStreamTask));
962 963
    return -1;
  }
964

965 966
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
967
  code = tDecodeStreamTask(&decoder, pTask);
968 969 970 971 972
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
973

974 975
  tDecoderClear(&decoder);

976
  // 2.save task, use the newest commit version as the initial start version of stream task.
977
  taosWLockLatch(&pTq->pStreamMeta->lock);
978
  code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
979
  int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
980
  if (code < 0) {
981
    tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
982
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
983 984 985
    return -1;
  }

986 987
  taosWUnLockLatch(&pTq->pStreamMeta->lock);

988 989
  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
990
    streamTaskCheckDownstream(pTask, sversion);
991 992
  }

993
  tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr,
994
          pTask->status.taskStatus, numOfTasks);
995 996 997
  return 0;
}

L
Liu Jicong 已提交
998 999 1000 1001 1002
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
  int32_t code;
  char*   msg = pMsg->pCont;
  int32_t msgLen = pMsg->contLen;

1003
  SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
L
Liu Jicong 已提交
1004
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1005 1006 1007 1008 1009
  if (pTask == NULL) {
    return -1;
  }

  // check param
1010
  int64_t fillVer1 = pTask->chkInfo.version;
1011
  if (fillVer1 <= 0) {
L
Liu Jicong 已提交
1012
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1013 1014 1015 1016
    return -1;
  }

  // do recovery step 1
H
Haojun Liao 已提交
1017
  tqDebug("s-task:%s start non-blocking recover stage(step 1) scan", pTask->id.idStr);
H
Haojun Liao 已提交
1018
  int64_t st = taosGetTimestampMs();
1019

H
Haojun Liao 已提交
1020
  streamSourceRecoverScanStep1(pTask);
1021
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
H
Haojun Liao 已提交
1022 1023
    tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);

L
Liu Jicong 已提交
1024 1025 1026 1027
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

H
Haojun Liao 已提交
1028
  double el = (taosGetTimestampMs() - st) / 1000.0;
H
Haojun Liao 已提交
1029
  tqDebug("s-task:%s non-blocking recover stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1030

1031 1032 1033 1034
  // build msg to launch next step
  SStreamRecoverStep2Req req;
  code = streamBuildSourceRecover2Req(pTask, &req);
  if (code < 0) {
L
Liu Jicong 已提交
1035
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1036 1037 1038
    return -1;
  }

L
Liu Jicong 已提交
1039
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1040
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1041 1042 1043
    return 0;
  }

1044
  // serialize msg
L
Liu Jicong 已提交
1045 1046 1047 1048
  int32_t len = sizeof(SStreamRecoverStep1Req);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
H
Haojun Liao 已提交
1049
    tqError("s-task:%s failed to prepare the step2 stage, out of memory", pTask->id.idStr);
L
Liu Jicong 已提交
1050 1051 1052 1053
    return -1;
  }

  memcpy(serializedReq, &req, len);
1054 1055

  // dispatch msg
H
Haojun Liao 已提交
1056
  tqDebug("s-task:%s start recover block stage", pTask->id.idStr);
1057

H
Haojun Liao 已提交
1058 1059
  SRpcMsg rpcMsg = {
      .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
1060 1061 1062 1063
  tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
  return 0;
}

1064
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
H
Haojun Liao 已提交
1065 1066
  int32_t code = 0;

1067
  SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
H
Haojun Liao 已提交
1068 1069

  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1070 1071 1072 1073 1074
  if (pTask == NULL) {
    return -1;
  }

  // do recovery step 2
H
Haojun Liao 已提交
1075 1076 1077
  int64_t st = taosGetTimestampMs();
  tqDebug("s-task:%s start step2 recover, ts:%"PRId64, pTask->id.idStr, st);

1078
  code = streamSourceRecoverScanStep2(pTask, sversion);
1079
  if (code < 0) {
L
Liu Jicong 已提交
1080
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1081 1082 1083
    return -1;
  }

1084 1085 1086
  qDebug("s-task:%s set the start wal offset to be:%"PRId64, pTask->id.idStr, sversion);
  walReaderSeekVer(pTask->exec.pWalReader, sversion);

1087
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1088 1089 1090 1091
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1092 1093 1094
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1095
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1096 1097 1098 1099
    return -1;
  }

  // set status normal
H
Haojun Liao 已提交
1100
  tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr);
1101 1102
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1103
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1104 1105 1106
    return -1;
  }

H
Haojun Liao 已提交
1107
  double el = (taosGetTimestampMs() - st)/ 1000.0;
H
Haojun Liao 已提交
1108
  tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1109

1110 1111 1112
  // dispatch recover finish req to all related downstream task
  code = streamDispatchRecoverFinishReq(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1113
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1114 1115 1116
    return -1;
  }

L
Liu Jicong 已提交
1117 1118 1119
  atomic_store_8(&pTask->fillHistory, 0);
  streamMetaSaveTask(pTq->pStreamMeta, pTask);

L
Liu Jicong 已提交
1120
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1121 1122 1123
  return 0;
}

L
Liu Jicong 已提交
1124 1125 1126
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1127 1128

  // deserialize
1129 1130 1131
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1132
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1133 1134 1135
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

1136
  // find task
L
Liu Jicong 已提交
1137
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1138 1139 1140
  if (pTask == NULL) {
    return -1;
  }
1141
  // do process request
1142
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
1143
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1144 1145 1146
    return -1;
  }

L
Liu Jicong 已提交
1147
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1148
  return 0;
L
Liu Jicong 已提交
1149
}
L
Liu Jicong 已提交
1150

L
Liu Jicong 已提交
1151 1152 1153 1154 1155
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

1156 1157 1158 1159
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

H
Haojun Liao 已提交
1160 1161
  *pRefBlock = NULL;

1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tDecoderInit(pCoder, (uint8_t*)pData, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
  if (numOfTables == 0 || pRes->affectedRows == 0) {
    taosArrayDestroy(pRes->uidList);
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, numOfTables);
  pDelBlock->info.rows = numOfTables;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < numOfTables; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataSetVal(pUidCol, i, (const char*)pUid, false);

    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

  taosArrayDestroy(pRes->uidList);
  *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
  if (pRefBlock == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
  (*pRefBlock)->pBlock = pDelBlock;
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1209 1210
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1211 1212 1213 1214

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1215 1216
  if (taskId == WAL_READ_TASKS_ID) {  // all tasks are extracted submit data from the wal
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1217
    return 0;
1218
  }
1219

1220
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1221 1222 1223 1224 1225
  if (pTask != NULL) {
    if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
      tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId,
              pTask->id.idStr, pTask->chkInfo.version);
      streamProcessRunReq(pTask);
1226
    } else {
L
liuyao 已提交
1227 1228 1229
      if (streamTaskShouldPause(&pTask->status)) {
        atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__INACTIVE);
      }
1230
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
1231
    }
1232

L
Liu Jicong 已提交
1233
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1234
    tqStartStreamTasks(pTq);
L
Liu Jicong 已提交
1235
    return 0;
1236
  } else {
1237
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
1238
    return -1;
L
Liu Jicong 已提交
1239
  }
L
Liu Jicong 已提交
1240 1241
}

L
Liu Jicong 已提交
1242
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
1243 1244 1245
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
1246 1247 1248 1249

  SStreamDispatchReq req = {0};

  SDecoder decoder;
L
Liu Jicong 已提交
1250
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1251
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1252

1253
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1254
  if (pTask) {
1255
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1256
    streamProcessDispatchMsg(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1257
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1258
    return 0;
1259
  } else {
L
liuyao 已提交
1260
    tDeleteStreamDispatchReq(&req);
1261
    return -1;
L
Liu Jicong 已提交
1262
  }
L
Liu Jicong 已提交
1263 1264
}

L
Liu Jicong 已提交
1265 1266
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1267
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1268
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1269
  tqDebug("recv dispatch rsp, code:%x", pMsg->code);
L
Liu Jicong 已提交
1270
  if (pTask) {
1271
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1272
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1273
    return 0;
1274 1275
  } else {
    return -1;
L
Liu Jicong 已提交
1276
  }
L
Liu Jicong 已提交
1277
}
L
Liu Jicong 已提交
1278

1279
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1280
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1281
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1282
  return 0;
L
Liu Jicong 已提交
1283
}
L
Liu Jicong 已提交
1284

5
54liuyao 已提交
1285 1286
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
L
liuyao 已提交
1287 1288 1289
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask) {
    tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
L
liuyao 已提交
1290
    atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
L
liuyao 已提交
1291 1292 1293
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  }
5
54liuyao 已提交
1294 1295 1296 1297 1298
  return 0;
}

int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
L
liuyao 已提交
1299 1300
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask) {
L
liuyao 已提交
1301
    atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
1302 1303

    // no lock needs to secure the access of the version
L
liuyao 已提交
1304
    if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) {  // discard all the data  when the stream task is suspended.
1305
      pTask->chkInfo.currentVer = sversion;
1306
      walReaderSeekVer(pTask->exec.pWalReader, sversion);
L
liuyao 已提交
1307 1308
      tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId,
              pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
1309
    } else {  // from the previous paused version and go on
L
liuyao 已提交
1310 1311
      tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId,
              pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
1312 1313
    }

L
liuyao 已提交
1314
    if (pTask->taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
L
liuyao 已提交
1315 1316 1317 1318
      tqStartStreamTasks(pTq);
    } else {
      streamSchedExec(pTask);
    }
L
liuyao 已提交
1319
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
liuyao 已提交
1320
  }
1321

5
54liuyao 已提交
1322 1323 1324
  return 0;
}

L
Liu Jicong 已提交
1325 1326 1327 1328 1329 1330
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1331
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1332
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1333
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1334
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1335
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1336
  if (pTask) {
1337
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
1338
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1339
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1340
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1341
    return 0;
L
Liu Jicong 已提交
1342
  } else {
L
liuyao 已提交
1343
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1344
    return -1;
L
Liu Jicong 已提交
1345 1346 1347 1348 1349 1350 1351
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1352

1353 1354 1355 1356 1357 1358
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1359 1360 1361

  SStreamDispatchReq req;
  SDecoder           decoder;
1362
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1363 1364
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1365
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1366 1367
    goto FAIL;
  }
L
Liu Jicong 已提交
1368
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1369

L
Liu Jicong 已提交
1370
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1371

L
Liu Jicong 已提交
1372
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1373
  if (pTask) {
1374
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1375
    streamProcessDispatchMsg(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1376
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1377 1378
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1379
    return 0;
5
54liuyao 已提交
1380 1381
  } else {
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1382
  }
L
Liu Jicong 已提交
1383

1384 1385
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1386
FAIL:
1387 1388 1389 1390
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1391
    SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info};
1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1408
  SRpcMsg rsp = {
1409
      .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
1410
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1411
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1412 1413
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1414
  return -1;
L
Liu Jicong 已提交
1415
}
L
Liu Jicong 已提交
1416

1417
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1418

1419
int32_t tqStartStreamTasks(STQ* pTq) {
1420
  int32_t      vgId = TD_VID(pTq->pVnode);
1421
  SStreamMeta* pMeta = pTq->pStreamMeta;
1422

1423
  taosWLockLatch(&pMeta->lock);
1424

1425
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
1426
  if (numOfTasks == 0) {
1427
    tqInfo("vgId:%d no stream tasks exist", vgId);
1428 1429 1430 1431
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1432
  pMeta->walScanCounter += 1;
1433

1434 1435
  if (pMeta->walScanCounter > 1) {
    tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
1436 1437 1438 1439
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1440 1441 1442
  SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
  if (pRunReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1443
    tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
1444
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
1445 1446 1447
    return -1;
  }

H
Haojun Liao 已提交
1448
  tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks);
1449 1450
  pRunReq->head.vgId = vgId;
  pRunReq->streamId = 0;
1451
  pRunReq->taskId = WAL_READ_TASKS_ID;
1452 1453 1454

  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
  tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
1455
  taosWUnLockLatch(&pTq->pStreamMeta->lock);
1456 1457 1458

  return 0;
}