tq.c 48.6 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
#define WAL_READ_TASKS_ID (-1)
22

23
static int32_t tqInitialize(STQ* pTq);
dengyihao's avatar
dengyihao 已提交
24

wmmhello's avatar
wmmhello 已提交
25 26 27
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_EXEC;}
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_IDLE;}
wmmhello's avatar
wmmhello 已提交
28

L
Liu Jicong 已提交
29
int32_t tqInit() {
L
Liu Jicong 已提交
30 31 32 33 34 35
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

36 37 38 39 40 41
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
42 43 44
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
45
    atomic_store_8(&tqMgmt.inited, 1);
46
  }
47

L
Liu Jicong 已提交
48 49
  return 0;
}
L
Liu Jicong 已提交
50

51
void tqCleanUp() {
L
Liu Jicong 已提交
52 53 54 55 56 57 58 59
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
60
    streamCleanUp();
L
Liu Jicong 已提交
61 62
    atomic_store_8(&tqMgmt.inited, 0);
  }
63
}
L
Liu Jicong 已提交
64

65
static void destroyTqHandle(void* data) {
66 67
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
wmmhello's avatar
wmmhello 已提交
68

69
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
70
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
71
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
72
    tqReaderClose(pData->execHandle.pTqReader);
73 74
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
75
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
76
    walCloseReader(pData->pWalReader);
77
    tqReaderClose(pData->execHandle.pTqReader);
78
  }
79 80 81 82
  if(pData->msg != NULL) {
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
D
dapan1121 已提交
83
  }
L
Liu Jicong 已提交
84 85
}

86 87 88 89 90
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
91
STQ* tqOpen(const char* path, SVnode* pVnode) {
92
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
93
  if (pTq == NULL) {
S
Shengliang Guan 已提交
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
95 96
    return NULL;
  }
97

98
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
99
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
100
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
101

102
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
103
  taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
104

105
  taosInitRWLatch(&pTq->lock);
106
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
107

108
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
109
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
110

111 112 113 114 115 116 117
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
118 119 120
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
121
  if (tqMetaOpen(pTq) < 0) {
122
    return -1;
123 124
  }

L
Liu Jicong 已提交
125 126
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
127
    return -1;
128 129
  }

130
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
131
  if (pTq->pStreamMeta == NULL) {
132
    return -1;
L
Liu Jicong 已提交
133 134
  }

135 136
  // the version is kept in task's meta data
  // todo check if this version is required or not
137 138
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
139 140
  }

141
  return 0;
L
Liu Jicong 已提交
142
}
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
void tqClose(STQ* pTq) {
145 146
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
147
  }
148 149 150 151 152 153 154 155 156

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
157
}
L
Liu Jicong 已提交
158

H
Haojun Liao 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
      tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
172 173 174
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
175
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
176
      int64_t el = taosGetTimestampMs() - st;
H
Haojun Liao 已提交
177
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
178 179 180 181 182 183
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
  }
}

D
dapan1121 已提交
184 185
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
186 187
  int32_t len = 0;
  int32_t code = 0;
D
dapan1121 已提交
188 189

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
190
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
D
dapan1121 已提交
191 192 193
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
194 195 196 197 198 199 200 201 202 203 204

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

D
dapan1121 已提交
205 206 207
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
208 209 210 211 212 213

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);

D
dapan1121 已提交
214
  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
215
    tEncodeMqDataRsp(&encoder, pRsp);
D
dapan1121 已提交
216
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
X
Xiaoyu Wang 已提交
217
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
dengyihao's avatar
dengyihao 已提交
218
  }
L
Liu Jicong 已提交
219

wmmhello's avatar
wmmhello 已提交
220
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
221 222

  SRpcMsg rsp = {
D
dapan1121 已提交
223
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
224 225 226 227
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
L
Liu Jicong 已提交
228

L
Liu Jicong 已提交
229
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
230 231
  return 0;
}
L
Liu Jicong 已提交
232

H
Haojun Liao 已提交
233
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
234 235 236 237
  SMqDataRsp dataRsp = {0};
  dataRsp.head.consumerId = pHandle->consumerId;
  dataRsp.head.epoch = pHandle->epoch;
  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
238 239

  int64_t sver = 0, ever = 0;
H
Haojun Liao 已提交
240 241
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
  tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, ever);
D
dapan1121 已提交
242 243 244

  char buf1[80] = {0};
  char buf2[80] = {0};
245 246
  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
D
dapan1121 已提交
247
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
H
Haojun Liao 已提交
248
          vgId, dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
L
Liu Jicong 已提交
249 250 251
  return 0;
}

252 253 254 255 256 257
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
                      int32_t type, int32_t vgId) {
  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
258

D
dapan1121 已提交
259 260 261 262
  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);
263

X
Xiaoyu Wang 已提交
264
  tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
265
          vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
266 267 268 269

  return 0;
}

270
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
271 272
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
273

X
Xiaoyu Wang 已提交
274 275
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
276
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
277 278
    return -1;
  }
279

280 281
  tDecoderClear(&decoder);

282 283 284
  STqOffset* pOffset = &vgOffset.offset;

  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
285
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
286 287 288 289 290 291
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
            pOffset->val.version);
    if (pOffset->val.version + 1 == sversion) {
      pOffset->val.version += 1;
292
    }
293
  } else {
294
    tqError("invalid commit offset type:%d", pOffset->val.type);
295
    return -1;
296
  }
297

298 299
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
300
    tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
301
            vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
302
    return 0;  // no need to update the offset value
303 304
  }

305
  // save the new offset value
306
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
307
    return -1;
308
  }
309

310 311 312
  return 0;
}

313
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
314 315
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
316 317 318

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
319
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
H
Haojun Liao 已提交
320
    tqError("vgId:%d failed to decode seek msg", vgId);
321 322 323 324 325
    return -1;
  }

  tDecoderClear(&decoder);

H
Haojun Liao 已提交
326 327 328
  tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
          vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);

329 330 331
  STqOffset* pOffset = &vgOffset.offset;
  if (pOffset->val.type != TMQ_OFFSET__LOG) {
    tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
332 333 334
    return -1;
  }

335 336 337 338 339 340
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
  if (pHandle == NULL) {
    tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId,
        pOffset->subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
341 342
  }

343 344 345 346 347 348 349 350
  // 2. check consumer-vg assignment status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != vgOffset.consumerId) {
    tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
351
  }
352 353 354 355 356 357 358 359 360
  taosRUnLockLatch(&pTq->lock);

  //3. check the offset info
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL) {
    if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
      return 0;  // no need to update the offset value
    }
361

362 363 364 365 366 367
    if (pSavedOffset->val.version == pOffset->val.version) {
      tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
              pOffset->val.version, pSavedOffset->val.version);
      return 0;
    }
  }
368 369 370

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
371 372 373 374
  if (pOffset->val.version < sver) {
    pOffset->val.version = sver;
  } else if (pOffset->val.version > ever) {
    pOffset->val.version = ever;
375 376 377
  }

  // save the new offset value
378 379 380 381 382 383
  if (pSavedOffset != NULL) {
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
            pSavedOffset->val.version);
  } else {
    tqDebug("vgId:%d sub:%s seek to:%"PRId64" not saved yet", vgId, pOffset->subKey, pOffset->val.version);
  }
384

385 386
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
    tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
387 388 389
    return -1;
  }

H
Haojun Liao 已提交
390 391 392
  tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
          vgOffset.consumerId, vgOffset.offset.val.version);

393 394 395
  return 0;
}

L
Liu Jicong 已提交
396
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
397
  void* pIter = NULL;
398

L
Liu Jicong 已提交
399
  while (1) {
400
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
401 402 403 404
    if (pIter == NULL) {
      break;
    }

405
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
406

L
Liu Jicong 已提交
407 408
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
409
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
410 411
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
412
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
413 414 415 416 417
          return -1;
        }
      }
    }
  }
418

L
Liu Jicong 已提交
419 420 421
  return 0;
}

D
dapan1121 已提交
422
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
423
  SMqPollReq req = {0};
D
dapan1121 已提交
424 425 426 427 428 429 430 431 432 433 434
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

wmmhello's avatar
wmmhello 已提交
435
  taosWLockLatch(&pTq->lock);
D
dapan1121 已提交
436 437 438 439 440
  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
wmmhello's avatar
wmmhello 已提交
441
    taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
442 443 444
    return -1;
  }

wmmhello's avatar
wmmhello 已提交
445 446 447 448 449
  while (tqIsHandleExec(pHandle)) {
    tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", consumerId, vgId, req.subKey);
    taosMsleep(5);
  }

D
dapan1121 已提交
450 451 452
  // 2. check re-balance status
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
453
            consumerId, vgId, req.subKey, pHandle->consumerId);
D
dapan1121 已提交
454
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
455
    taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
456 457
    return -1;
  }
458 459
  tqSetHandleExec(pHandle);
  taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
460 461 462 463

  // 3. update the epoch value
  int32_t savedEpoch = pHandle->epoch;
  if (savedEpoch < reqEpoch) {
X
Xiaoyu Wang 已提交
464 465
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
            reqEpoch);
D
dapan1121 已提交
466 467 468 469 470 471 472 473
    pHandle->epoch = reqEpoch;
  }

  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);

474 475 476
  int code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
  tqSetHandleIdle(pHandle);
  return code;
D
dapan1121 已提交
477 478
}

479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  // 2. check re-balance status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, vgId, req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
  }
  taosRUnLockLatch(&pTq->lock);

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

513 514
  int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);

515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);

  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
  if (pOffset != NULL) {
    if (pOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
    dataRsp.rspOffset.version = pOffset->val.version;
  } else {
    if (req.useSnapshot == true) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;

539 540 541 542
    if (reqOffset.type == TMQ_OFFSET__LOG) {
      dataRsp.rspOffset.version = currentVer; // return current consume offset value
    } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
      dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
543 544 545 546 547 548 549 550 551 552 553 554 555 556 557
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      dataRsp.rspOffset.version = ever;
    } else {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
              reqOffset.type);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }
  }

  tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
  return 0;
}

558
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
559
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
560
  int32_t vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
561

562
  tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
563
  int32_t code = 0;
L
Liu Jicong 已提交
564

wmmhello's avatar
wmmhello 已提交
565
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
566 567
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
568
    while (tqIsHandleExec(pHandle)) {
569 570 571
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
      taosMsleep(5);
    }
572

L
Liu Jicong 已提交
573 574 575
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
576

L
Liu Jicong 已提交
577 578 579 580
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
581
  }
582

L
Liu Jicong 已提交
583 584
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
585
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
586
  }
L
Liu Jicong 已提交
587

L
Liu Jicong 已提交
588
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
589
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
590
  }
wmmhello's avatar
wmmhello 已提交
591 592
  taosWUnLockLatch(&pTq->lock);

L
Liu Jicong 已提交
593
  return 0;
L
Liu Jicong 已提交
594 595
}

596
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
597 598
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
599
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
600
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
601 602 603 604
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
605 606 607 608 609
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
610 611 612 613 614 615
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

616
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
617 618 619 620 621 622 623 624 625 626 627
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

628
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
629
  int ret = 0;
L
Liu Jicong 已提交
630
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
631
  tDecodeSMqRebVgReq(msg, &req);
L
Liu Jicong 已提交
632

D
dapan1121 已提交
633 634 635
  SVnode* pVnode = pTq->pVnode;
  int32_t vgId = TD_VID(pVnode);

636
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
D
dapan1121 已提交
637
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
638

wmmhello's avatar
wmmhello 已提交
639
  taosWLockLatch(&pTq->lock);
640
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
641
  if (pHandle == NULL) {
L
Liu Jicong 已提交
642
    if (req.oldConsumerId != -1) {
643
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
644
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
645
    }
D
dapan1121 已提交
646

L
Liu Jicong 已提交
647
    if (req.newConsumerId == -1) {
648
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
649
      goto end;
L
Liu Jicong 已提交
650
    }
D
dapan1121 已提交
651

L
Liu Jicong 已提交
652 653
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
654

H
Haojun Liao 已提交
655
    uint64_t oldConsumerId = pHandle->consumerId;
L
Liu Jicong 已提交
656 657 658
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
659

L
Liu Jicong 已提交
660
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
661
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
662

663
    // TODO version should be assigned and refed during preprocess
D
dapan1121 已提交
664
    SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
665
    if (pRef == NULL) {
666 667
      ret = -1;
      goto end;
668
    }
D
dapan1121 已提交
669

670 671
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
672

673
    SReadHandle handle = {.vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
H
Haojun Liao 已提交
674 675
    initStorageAPI(&handle.api);

wmmhello's avatar
wmmhello 已提交
676
    pHandle->snapshotVer = ver;
677

L
Liu Jicong 已提交
678
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
679
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
680
      req.qmsg = NULL;
681

X
Xiaoyu Wang 已提交
682 683
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId,
                                                          &pHandle->execHandle.numOfCols, req.newConsumerId);
L
Liu Jicong 已提交
684
      void* scanner = NULL;
685
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
686
      pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
L
Liu Jicong 已提交
687
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
D
dapan1121 已提交
688
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
689
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
D
dapan1121 已提交
690

L
Liu Jicong 已提交
691
      pHandle->execHandle.execDb.pFilterOutTbUid =
wmmhello's avatar
wmmhello 已提交
692
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
693
      buildSnapContext(handle.vnode, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
694
                       (SSnapContext**)(&handle.sContext));
695

696
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
697
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
D
dapan1121 已提交
698
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
699 700
      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
701
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
D
dapan1121 已提交
702 703
      vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
704 705
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
D
dapan1121 已提交
706
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
L
Liu Jicong 已提交
707
      }
708
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
709
      tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
L
Liu Jicong 已提交
710
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
711

712
      buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
L
Liu Jicong 已提交
713
                       (SSnapContext**)(&handle.sContext));
714
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
715
    }
H
Haojun Liao 已提交
716

717
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
dengyihao's avatar
dengyihao 已提交
718 719
    tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
            pHandle->consumerId, oldConsumerId);
720 721
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
722
  } else {
wmmhello's avatar
wmmhello 已提交
723 724 725 726 727
    while (tqIsHandleExec(pHandle)) {
      tqDebug("sub req vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
      taosMsleep(5);
    }

D
dapan1121 已提交
728 729 730
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
      atomic_add_fetch_32(&pHandle->epoch, 1);
731

732 733 734
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
735
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
wmmhello's avatar
wmmhello 已提交
736
      atomic_store_32(&pHandle->epoch, 0);
737 738 739 740 741 742 743 744
    }
    // kill executing task
    qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
    if (pTaskInfo != NULL) {
      qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
    }
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
      qStreamCloseTsdbReader(pTaskInfo);
L
Liu Jicong 已提交
745
    }
wmmhello's avatar
wmmhello 已提交
746 747
    // remove if it has been register in the push manager, and return one empty block to consumer
    tqUnregisterPushHandle(pTq, pHandle);
748 749
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
750
  }
L
Liu Jicong 已提交
751

752
end:
wmmhello's avatar
wmmhello 已提交
753
  taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
754
  taosMemoryFree(req.qmsg);
755
  return ret;
L
Liu Jicong 已提交
756
}
757

L
liuyao 已提交
758 759 760 761
void freePtr(void *ptr) {
  taosMemoryFree(*(void**)ptr);
}

762
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
D
dapan1121 已提交
763
  int32_t vgId = TD_VID(pTq->pVnode);
764

765
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
766
  pTask->refCnt = 1;
767
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
dengyihao's avatar
dengyihao 已提交
768 769
  pTask->inputQueue = streamQueueOpen(512 << 10);
  pTask->outputQueue = streamQueueOpen(512 << 10);
L
Liu Jicong 已提交
770 771

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
772
    return -1;
L
Liu Jicong 已提交
773 774
  }

L
Liu Jicong 已提交
775 776
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
777
  pTask->pMsgCb = &pTq->pVnode->msgCb;
778
  pTask->pMeta = pTq->pStreamMeta;
779
  pTask->chkInfo.version = ver;
780
  pTask->chkInfo.currentVer = ver;
781

782
  // expand executor
783
  pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL;
784

785
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
786
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
787 788 789 790
    if (pTask->pState == NULL) {
      return -1;
    }

791
    SReadHandle handle = {.vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
792
    initStorageAPI(&handle.api);
793

794 795
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
796 797
      return -1;
    }
798

799
    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
800
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
801
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
802 803 804
    if (pTask->pState == NULL) {
      return -1;
    }
805

806
    int32_t     numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo);
807 808
    SReadHandle handle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};
    initStorageAPI(&handle.api);
809

810
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
811
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
812 813
      return -1;
    }
814 815

    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
L
Liu Jicong 已提交
816
  }
L
Liu Jicong 已提交
817 818

  // sink
819
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
820
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
821
    pTask->smaSink.smaSink = smaHandleRes;
822
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
823
    pTask->tbSink.vnode = pTq->pVnode;
H
Haojun Liao 已提交
824
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
L
Liu Jicong 已提交
825

X
Xiaoyu Wang 已提交
826
    int32_t   ver1 = 1;
5
54liuyao 已提交
827
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
828
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
829
    if (code == TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
830
      ver1 = info.skmVer;
5
54liuyao 已提交
831
    }
L
Liu Jicong 已提交
832

833 834
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
835
    if (pTask->tbSink.pTSchema == NULL) {
D
dapan1121 已提交
836 837
      return -1;
    }
L
liuyao 已提交
838 839
    pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
    tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
L
Liu Jicong 已提交
840
  }
841

842
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
843
    SWalFilterCond cond = {.deleteMsg = 1};  // delete msg also extract from wal files
844
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
845 846
  }

847
  streamSetupTrigger(pTask);
848

849
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
850
         pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
851 852 853

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
854
  return 0;
L
Liu Jicong 已提交
855
}
L
Liu Jicong 已提交
856

857
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
858 859 860 861
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

862 863
  SStreamTaskCheckReq req;
  SDecoder            decoder;
864

X
Xiaoyu Wang 已提交
865
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
866 867
  tDecodeSStreamTaskCheckReq(&decoder, &req);
  tDecoderClear(&decoder);
868

869 870 871 872 873 874 875 876 877 878
  int32_t             taskId = req.downstreamTaskId;
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
879

L
Liu Jicong 已提交
880
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
881

882
  if (pTask != NULL) {
883
    rsp.status = streamTaskCheckStatus(pTask);
884 885
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

886
    tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64
887
            ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d",
888 889
            pTask->id.idStr, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus,
            rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
890 891
  } else {
    rsp.status = 0;
892 893
    tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
            ") %d at node %d, check req from task:0x%x at node %d, rsp status %d",
894 895
            taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            rsp.status);
896 897 898 899 900
  }

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
901

902 903
  tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
  if (code < 0) {
904
    tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId);
L
Liu Jicong 已提交
905
    return -1;
906
  }
L
Liu Jicong 已提交
907

908 909 910 911 912 913 914 915
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
  tEncoderClear(&encoder);

916
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
917

918 919 920 921
  tmsgSendRsp(&rspMsg);
  return 0;
}

922
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
923 924 925 926 927 928
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
929

930 931 932 933 934
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

935
  tDecoderClear(&decoder);
936
  tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task:0x%x at node %d, status %d",
937 938
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

L
Liu Jicong 已提交
939
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
940
  if (pTask == NULL) {
941 942
    tqError("tq failed to locate the stream task:0x%x vgId:%d, it may have been destroyed", rsp.upstreamTaskId,
            pTq->pStreamMeta->vgId);
943 944 945
    return -1;
  }

946
  code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
L
Liu Jicong 已提交
947 948
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
949 950
}

951
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
952 953 954
  int32_t code = 0;
  int32_t vgId = TD_VID(pTq->pVnode);

5
54liuyao 已提交
955 956 957
  if (tsDisableStream) {
    return 0;
  }
958 959 960 961

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
962 963
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, (int32_t) sizeof(SStreamTask));
964 965
    return -1;
  }
966

967 968
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
969
  code = tDecodeStreamTask(&decoder, pTask);
970 971 972 973 974
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
975

976 977
  tDecoderClear(&decoder);

978
  // 2.save task, use the newest commit version as the initial start version of stream task.
979
  taosWLockLatch(&pTq->pStreamMeta->lock);
980
  code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
981
  int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
982
  if (code < 0) {
983
    tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
984
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
985 986 987
    return -1;
  }

988 989
  taosWUnLockLatch(&pTq->pStreamMeta->lock);

990 991
  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
992
    streamTaskCheckDownstream(pTask, sversion);
993 994
  }

995
  tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr,
996
          pTask->status.taskStatus, numOfTasks);
997 998 999
  return 0;
}

L
Liu Jicong 已提交
1000 1001 1002 1003 1004
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
  int32_t code;
  char*   msg = pMsg->pCont;
  int32_t msgLen = pMsg->contLen;

1005
  SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
L
Liu Jicong 已提交
1006
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1007 1008 1009 1010 1011
  if (pTask == NULL) {
    return -1;
  }

  // check param
1012
  int64_t fillVer1 = pTask->chkInfo.version;
1013
  if (fillVer1 <= 0) {
L
Liu Jicong 已提交
1014
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1015 1016 1017 1018
    return -1;
  }

  // do recovery step 1
H
Haojun Liao 已提交
1019
  tqDebug("s-task:%s start non-blocking recover stage(step 1) scan", pTask->id.idStr);
H
Haojun Liao 已提交
1020
  int64_t st = taosGetTimestampMs();
1021

H
Haojun Liao 已提交
1022
  streamSourceRecoverScanStep1(pTask);
1023
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
H
Haojun Liao 已提交
1024 1025
    tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);

L
Liu Jicong 已提交
1026 1027 1028 1029
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

H
Haojun Liao 已提交
1030
  double el = (taosGetTimestampMs() - st) / 1000.0;
H
Haojun Liao 已提交
1031
  tqDebug("s-task:%s non-blocking recover stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1032

1033 1034 1035 1036
  // build msg to launch next step
  SStreamRecoverStep2Req req;
  code = streamBuildSourceRecover2Req(pTask, &req);
  if (code < 0) {
L
Liu Jicong 已提交
1037
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1038 1039 1040
    return -1;
  }

L
Liu Jicong 已提交
1041
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1042
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1043 1044 1045
    return 0;
  }

1046
  // serialize msg
L
Liu Jicong 已提交
1047 1048 1049 1050
  int32_t len = sizeof(SStreamRecoverStep1Req);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
H
Haojun Liao 已提交
1051
    tqError("s-task:%s failed to prepare the step2 stage, out of memory", pTask->id.idStr);
L
Liu Jicong 已提交
1052 1053 1054 1055
    return -1;
  }

  memcpy(serializedReq, &req, len);
1056 1057

  // dispatch msg
H
Haojun Liao 已提交
1058
  tqDebug("s-task:%s start recover block stage", pTask->id.idStr);
1059

H
Haojun Liao 已提交
1060 1061
  SRpcMsg rpcMsg = {
      .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
1062 1063 1064 1065
  tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
  return 0;
}

1066
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
H
Haojun Liao 已提交
1067 1068
  int32_t code = 0;

1069
  SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
H
Haojun Liao 已提交
1070 1071

  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1072 1073 1074 1075 1076
  if (pTask == NULL) {
    return -1;
  }

  // do recovery step 2
H
Haojun Liao 已提交
1077 1078 1079
  int64_t st = taosGetTimestampMs();
  tqDebug("s-task:%s start step2 recover, ts:%"PRId64, pTask->id.idStr, st);

1080
  code = streamSourceRecoverScanStep2(pTask, sversion);
1081
  if (code < 0) {
L
Liu Jicong 已提交
1082
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1083 1084 1085
    return -1;
  }

1086 1087 1088
  qDebug("s-task:%s set the start wal offset to be:%"PRId64, pTask->id.idStr, sversion);
  walReaderSeekVer(pTask->exec.pWalReader, sversion);

1089
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1090 1091 1092 1093
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1094 1095 1096
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1097
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1098 1099 1100 1101
    return -1;
  }

  // set status normal
H
Haojun Liao 已提交
1102
  tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr);
1103 1104
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1105
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1106 1107 1108
    return -1;
  }

H
Haojun Liao 已提交
1109
  double el = (taosGetTimestampMs() - st)/ 1000.0;
H
Haojun Liao 已提交
1110
  tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1111

1112 1113 1114
  // dispatch recover finish req to all related downstream task
  code = streamDispatchRecoverFinishReq(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1115
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1116 1117 1118
    return -1;
  }

L
Liu Jicong 已提交
1119 1120 1121
  atomic_store_8(&pTask->fillHistory, 0);
  streamMetaSaveTask(pTq->pStreamMeta, pTask);

L
Liu Jicong 已提交
1122
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1123 1124 1125
  return 0;
}

L
Liu Jicong 已提交
1126 1127 1128
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1129 1130

  // deserialize
1131 1132 1133
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1134
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1135 1136 1137
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

1138
  // find task
L
Liu Jicong 已提交
1139
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1140 1141 1142
  if (pTask == NULL) {
    return -1;
  }
1143
  // do process request
1144
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
1145
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1146 1147 1148
    return -1;
  }

L
Liu Jicong 已提交
1149
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1150
  return 0;
L
Liu Jicong 已提交
1151
}
L
Liu Jicong 已提交
1152

L
Liu Jicong 已提交
1153 1154 1155 1156 1157
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

1158 1159 1160 1161
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

H
Haojun Liao 已提交
1162 1163
  *pRefBlock = NULL;

1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tDecoderInit(pCoder, (uint8_t*)pData, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
  if (numOfTables == 0 || pRes->affectedRows == 0) {
    taosArrayDestroy(pRes->uidList);
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, numOfTables);
  pDelBlock->info.rows = numOfTables;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < numOfTables; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataSetVal(pUidCol, i, (const char*)pUid, false);

    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

  taosArrayDestroy(pRes->uidList);
  *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
  if (pRefBlock == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
  (*pRefBlock)->pBlock = pDelBlock;
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1211 1212
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1213 1214 1215 1216

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1217 1218
  if (taskId == WAL_READ_TASKS_ID) {  // all tasks are extracted submit data from the wal
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1219
    return 0;
1220
  }
1221

1222
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1223 1224 1225 1226 1227
  if (pTask != NULL) {
    if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
      tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId,
              pTask->id.idStr, pTask->chkInfo.version);
      streamProcessRunReq(pTask);
1228
    } else {
L
liuyao 已提交
1229 1230 1231
      if (streamTaskShouldPause(&pTask->status)) {
        atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__INACTIVE);
      }
1232
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
1233
    }
1234

L
Liu Jicong 已提交
1235
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1236
    tqStartStreamTasks(pTq);
L
Liu Jicong 已提交
1237
    return 0;
1238
  } else {
1239
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
1240
    return -1;
L
Liu Jicong 已提交
1241
  }
L
Liu Jicong 已提交
1242 1243
}

L
Liu Jicong 已提交
1244
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
1245 1246 1247
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
1248 1249 1250 1251

  SStreamDispatchReq req = {0};

  SDecoder decoder;
L
Liu Jicong 已提交
1252
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1253
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1254

1255
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1256
  if (pTask) {
1257
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1258
    streamProcessDispatchMsg(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1259
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1260
    return 0;
1261
  } else {
L
liuyao 已提交
1262
    tDeleteStreamDispatchReq(&req);
1263
    return -1;
L
Liu Jicong 已提交
1264
  }
L
Liu Jicong 已提交
1265 1266
}

L
Liu Jicong 已提交
1267 1268
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1269
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1270
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1271
  tqDebug("recv dispatch rsp, code:%x", pMsg->code);
L
Liu Jicong 已提交
1272
  if (pTask) {
1273
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1274
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1275
    return 0;
1276 1277
  } else {
    return -1;
L
Liu Jicong 已提交
1278
  }
L
Liu Jicong 已提交
1279
}
L
Liu Jicong 已提交
1280

1281
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1282
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1283 1284
  tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);

1285
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1286
  return 0;
L
Liu Jicong 已提交
1287
}
L
Liu Jicong 已提交
1288

5
54liuyao 已提交
1289 1290
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
L
liuyao 已提交
1291 1292 1293
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask) {
    tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
L
liuyao 已提交
1294
    atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
L
liuyao 已提交
1295 1296 1297
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  }
5
54liuyao 已提交
1298 1299 1300 1301 1302
  return 0;
}

int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
L
liuyao 已提交
1303 1304
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask) {
L
liuyao 已提交
1305
    atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
1306 1307

    // no lock needs to secure the access of the version
L
liuyao 已提交
1308
    if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) {  // discard all the data  when the stream task is suspended.
1309
      pTask->chkInfo.currentVer = sversion;
1310
      walReaderSeekVer(pTask->exec.pWalReader, sversion);
L
liuyao 已提交
1311 1312
      tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId,
              pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
1313
    } else {  // from the previous paused version and go on
L
liuyao 已提交
1314 1315
      tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId,
              pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
1316 1317
    }

L
liuyao 已提交
1318
    if (pTask->taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
L
liuyao 已提交
1319 1320 1321 1322
      tqStartStreamTasks(pTq);
    } else {
      streamSchedExec(pTask);
    }
L
liuyao 已提交
1323
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
liuyao 已提交
1324
  }
1325

5
54liuyao 已提交
1326 1327 1328
  return 0;
}

L
Liu Jicong 已提交
1329 1330 1331 1332 1333 1334
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1335
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1336
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1337
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1338
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1339
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1340
  if (pTask) {
1341
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
1342
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1343
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1344
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1345
    return 0;
L
Liu Jicong 已提交
1346
  } else {
L
liuyao 已提交
1347
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1348
    return -1;
L
Liu Jicong 已提交
1349 1350 1351 1352 1353 1354 1355
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1356

1357 1358 1359 1360 1361 1362
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1363 1364 1365

  SStreamDispatchReq req;
  SDecoder           decoder;
1366
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1367 1368
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1369
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1370 1371
    goto FAIL;
  }
L
Liu Jicong 已提交
1372
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1373

L
Liu Jicong 已提交
1374
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1375

L
Liu Jicong 已提交
1376
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1377
  if (pTask) {
1378
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1379
    streamProcessDispatchMsg(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1380
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1381 1382
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1383
    return 0;
5
54liuyao 已提交
1384 1385
  } else {
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1386
  }
L
Liu Jicong 已提交
1387

1388 1389
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1390
FAIL:
1391 1392 1393 1394
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1395
    SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info};
1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1412
  SRpcMsg rsp = {
1413
      .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
1414
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1415
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1416 1417
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1418
  return -1;
L
Liu Jicong 已提交
1419
}
L
Liu Jicong 已提交
1420

1421
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1422

1423
int32_t tqStartStreamTasks(STQ* pTq) {
1424
  int32_t      vgId = TD_VID(pTq->pVnode);
1425
  SStreamMeta* pMeta = pTq->pStreamMeta;
1426

1427
  taosWLockLatch(&pMeta->lock);
1428

1429
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
1430
  if (numOfTasks == 0) {
1431
    tqInfo("vgId:%d no stream tasks exist", vgId);
1432 1433 1434 1435
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1436
  pMeta->walScanCounter += 1;
1437

1438 1439
  if (pMeta->walScanCounter > 1) {
    tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
1440 1441 1442 1443
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1444 1445 1446
  SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
  if (pRunReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1447
    tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
1448
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
1449 1450 1451
    return -1;
  }

H
Haojun Liao 已提交
1452
  tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks);
1453 1454
  pRunReq->head.vgId = vgId;
  pRunReq->streamId = 0;
1455
  pRunReq->taskId = WAL_READ_TASKS_ID;
1456 1457 1458

  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
  tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
1459
  taosWUnLockLatch(&pTq->pStreamMeta->lock);
1460 1461 1462

  return 0;
}