tq.c 49.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
#define WAL_READ_TASKS_ID (-1)
22

23
static int32_t tqInitialize(STQ* pTq);
dengyihao's avatar
dengyihao 已提交
24

wmmhello's avatar
wmmhello 已提交
25 26 27
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_EXEC;}
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_IDLE;}
wmmhello's avatar
wmmhello 已提交
28

L
Liu Jicong 已提交
29
int32_t tqInit() {
L
Liu Jicong 已提交
30 31 32 33 34 35
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

36 37 38 39 40 41
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
42 43 44
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
45
    atomic_store_8(&tqMgmt.inited, 1);
46
  }
47

L
Liu Jicong 已提交
48 49
  return 0;
}
L
Liu Jicong 已提交
50

51
void tqCleanUp() {
L
Liu Jicong 已提交
52 53 54 55 56 57 58 59
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
60
    streamCleanUp();
L
Liu Jicong 已提交
61 62
    atomic_store_8(&tqMgmt.inited, 0);
  }
63
}
L
Liu Jicong 已提交
64

65
static void destroyTqHandle(void* data) {
66 67
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
wmmhello's avatar
wmmhello 已提交
68

69
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
70
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
71
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
72
    tqReaderClose(pData->execHandle.pTqReader);
73 74
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
75
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
76
    walCloseReader(pData->pWalReader);
77
    tqReaderClose(pData->execHandle.pTqReader);
78
  }
79 80 81 82
  if(pData->msg != NULL) {
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
D
dapan1121 已提交
83
  }
L
Liu Jicong 已提交
84 85
}

86 87 88 89 90
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
91
STQ* tqOpen(const char* path, SVnode* pVnode) {
92
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
93
  if (pTq == NULL) {
S
Shengliang Guan 已提交
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
95 96
    return NULL;
  }
97

98
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
99
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
100
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
101

102
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
103
  taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
104

105
  taosInitRWLatch(&pTq->lock);
106
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
107

108
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
109
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
110

111 112 113 114 115 116 117
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
118 119 120
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
121
  if (tqMetaOpen(pTq) < 0) {
122
    return -1;
123 124
  }

L
Liu Jicong 已提交
125 126
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
127
    return -1;
128 129
  }

130
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
131
  if (pTq->pStreamMeta == NULL) {
132
    return -1;
L
Liu Jicong 已提交
133 134
  }

135 136
  // the version is kept in task's meta data
  // todo check if this version is required or not
137 138
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
139 140
  }

141
  return 0;
L
Liu Jicong 已提交
142
}
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
void tqClose(STQ* pTq) {
145 146
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
147
  }
148 149 150 151 152 153 154 155 156

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
157
}
L
Liu Jicong 已提交
158

H
Haojun Liao 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
      tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
172 173 174
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
175
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
176
      int64_t el = taosGetTimestampMs() - st;
H
Haojun Liao 已提交
177
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
178 179 180 181 182 183
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
  }
}

D
dapan1121 已提交
184 185
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
186 187
  int32_t len = 0;
  int32_t code = 0;
D
dapan1121 已提交
188 189

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
190
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
D
dapan1121 已提交
191 192 193
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
194 195 196 197 198 199 200 201 202 203 204

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

D
dapan1121 已提交
205 206 207
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
208 209 210 211 212 213

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);

D
dapan1121 已提交
214
  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
215
    tEncodeMqDataRsp(&encoder, pRsp);
D
dapan1121 已提交
216
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
X
Xiaoyu Wang 已提交
217
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
dengyihao's avatar
dengyihao 已提交
218
  }
L
Liu Jicong 已提交
219

wmmhello's avatar
wmmhello 已提交
220
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
221 222

  SRpcMsg rsp = {
D
dapan1121 已提交
223
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
224 225 226 227
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
L
Liu Jicong 已提交
228

L
Liu Jicong 已提交
229
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
230 231
  return 0;
}
L
Liu Jicong 已提交
232

H
Haojun Liao 已提交
233
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
234 235 236 237
  SMqDataRsp dataRsp = {0};
  dataRsp.head.consumerId = pHandle->consumerId;
  dataRsp.head.epoch = pHandle->epoch;
  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
238 239

  int64_t sver = 0, ever = 0;
H
Haojun Liao 已提交
240 241
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
  tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, ever);
D
dapan1121 已提交
242 243 244

  char buf1[80] = {0};
  char buf2[80] = {0};
245 246
  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
D
dapan1121 已提交
247
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
H
Haojun Liao 已提交
248
          vgId, dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
L
Liu Jicong 已提交
249 250 251
  return 0;
}

252 253 254 255 256 257
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
                      int32_t type, int32_t vgId) {
  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
258

D
dapan1121 已提交
259 260 261 262
  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);
263

X
Xiaoyu Wang 已提交
264
  tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
265
          vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
266 267 268 269

  return 0;
}

270
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
271 272
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
273

X
Xiaoyu Wang 已提交
274 275
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
276
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
277 278
    return -1;
  }
279

280 281
  tDecoderClear(&decoder);

282 283 284
  STqOffset* pOffset = &vgOffset.offset;

  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
285
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
286 287 288 289 290 291
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
            pOffset->val.version);
    if (pOffset->val.version + 1 == sversion) {
      pOffset->val.version += 1;
292
    }
293
  } else {
294
    tqError("invalid commit offset type:%d", pOffset->val.type);
295
    return -1;
296
  }
297

298 299
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
300
    tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
301
            vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
302
    return 0;  // no need to update the offset value
303 304
  }

305
  // save the new offset value
306
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
307
    return -1;
308
  }
309

310 311 312
  return 0;
}

313
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
314 315
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
316 317 318

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
319
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
H
Haojun Liao 已提交
320
    tqError("vgId:%d failed to decode seek msg", vgId);
321 322 323 324 325
    return -1;
  }

  tDecoderClear(&decoder);

H
Haojun Liao 已提交
326 327 328
  tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
          vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);

329 330 331
  STqOffset* pOffset = &vgOffset.offset;
  if (pOffset->val.type != TMQ_OFFSET__LOG) {
    tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
332 333 334
    return -1;
  }

335 336 337 338 339 340
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
  if (pHandle == NULL) {
    tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId,
        pOffset->subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
341 342
  }

343 344 345 346 347 348 349 350
  // 2. check consumer-vg assignment status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != vgOffset.consumerId) {
    tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
351
  }
352 353 354 355 356 357 358 359 360
  taosRUnLockLatch(&pTq->lock);

  //3. check the offset info
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL) {
    if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
      return 0;  // no need to update the offset value
    }
361

362 363 364 365 366 367
    if (pSavedOffset->val.version == pOffset->val.version) {
      tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
              pOffset->val.version, pSavedOffset->val.version);
      return 0;
    }
  }
368 369 370

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
371 372 373 374
  if (pOffset->val.version < sver) {
    pOffset->val.version = sver;
  } else if (pOffset->val.version > ever) {
    pOffset->val.version = ever;
375 376 377
  }

  // save the new offset value
378 379 380 381 382 383
  if (pSavedOffset != NULL) {
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
            pSavedOffset->val.version);
  } else {
    tqDebug("vgId:%d sub:%s seek to:%"PRId64" not saved yet", vgId, pOffset->subKey, pOffset->val.version);
  }
384

385 386
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
    tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
387 388 389
    return -1;
  }

H
Haojun Liao 已提交
390 391 392
  tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
          vgOffset.consumerId, vgOffset.offset.val.version);

393 394 395
  return 0;
}

L
Liu Jicong 已提交
396
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
397
  void* pIter = NULL;
398

L
Liu Jicong 已提交
399
  while (1) {
400
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
401 402 403 404
    if (pIter == NULL) {
      break;
    }

405
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
406

L
Liu Jicong 已提交
407 408
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
409
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
410 411
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
412
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
413 414 415 416 417
          return -1;
        }
      }
    }
  }
418

L
Liu Jicong 已提交
419 420 421
  return 0;
}

D
dapan1121 已提交
422
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
423
  SMqPollReq req = {0};
D
dapan1121 已提交
424 425 426 427 428 429 430 431 432 433 434
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

wmmhello's avatar
wmmhello 已提交
435
  taosWLockLatch(&pTq->lock);
D
dapan1121 已提交
436 437 438 439 440
  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
wmmhello's avatar
wmmhello 已提交
441
    taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
442 443 444
    return -1;
  }

wmmhello's avatar
wmmhello 已提交
445 446 447 448 449
  while (tqIsHandleExec(pHandle)) {
    tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", consumerId, vgId, req.subKey);
    taosMsleep(5);
  }

D
dapan1121 已提交
450 451 452
  // 2. check re-balance status
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
453
            consumerId, vgId, req.subKey, pHandle->consumerId);
D
dapan1121 已提交
454
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
455
    taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
456 457
    return -1;
  }
458 459
  tqSetHandleExec(pHandle);
  taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
460 461 462 463

  // 3. update the epoch value
  int32_t savedEpoch = pHandle->epoch;
  if (savedEpoch < reqEpoch) {
X
Xiaoyu Wang 已提交
464 465
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
            reqEpoch);
D
dapan1121 已提交
466 467 468 469 470 471 472 473
    pHandle->epoch = reqEpoch;
  }

  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);

474 475 476
  int code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
  tqSetHandleIdle(pHandle);
  return code;
D
dapan1121 已提交
477 478
}

479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  // 2. check re-balance status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, vgId, req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
  }
  taosRUnLockLatch(&pTq->lock);

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);

  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
  if (pOffset != NULL) {
    if (pOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
    dataRsp.rspOffset.version = pOffset->val.version;
  } else {
    if (req.useSnapshot == true) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;

537
    if (reqOffset.type == TMQ_OFFSET__LOG) {
538 539 540 541 542 543
      int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
      if (currentVer == -1) { // not start to read data from wal yet, return req offset directly
        dataRsp.rspOffset.version = reqOffset.version;
      } else {
        dataRsp.rspOffset.version = currentVer;  // return current consume offset value
      }
544 545
    } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
      dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
546 547 548 549 550 551 552 553 554 555 556 557 558 559 560
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      dataRsp.rspOffset.version = ever;
    } else {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
              reqOffset.type);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }
  }

  tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
  return 0;
}

561
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
562
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
563
  int32_t vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
564

565
  tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
566
  int32_t code = 0;
L
Liu Jicong 已提交
567

wmmhello's avatar
wmmhello 已提交
568
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
569 570
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
571
    while (tqIsHandleExec(pHandle)) {
572 573 574
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
      taosMsleep(5);
    }
575

L
Liu Jicong 已提交
576 577 578
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
579

L
Liu Jicong 已提交
580 581 582 583
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
584
  }
585

L
Liu Jicong 已提交
586 587
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
588
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
589
  }
L
Liu Jicong 已提交
590

L
Liu Jicong 已提交
591
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
592
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
593
  }
wmmhello's avatar
wmmhello 已提交
594 595
  taosWUnLockLatch(&pTq->lock);

L
Liu Jicong 已提交
596
  return 0;
L
Liu Jicong 已提交
597 598
}

599
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
600 601
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
602
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
603
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
604 605 606 607
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
608 609 610 611 612
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
613 614 615 616 617 618
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

619
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
620 621 622 623 624 625 626 627 628 629 630
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

631
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
632
  int ret = 0;
L
Liu Jicong 已提交
633
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
634
  tDecodeSMqRebVgReq(msg, &req);
L
Liu Jicong 已提交
635

D
dapan1121 已提交
636 637 638
  SVnode* pVnode = pTq->pVnode;
  int32_t vgId = TD_VID(pVnode);

639
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
D
dapan1121 已提交
640
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
641

wmmhello's avatar
wmmhello 已提交
642
  taosWLockLatch(&pTq->lock);
643
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
644
  if (pHandle == NULL) {
L
Liu Jicong 已提交
645
    if (req.oldConsumerId != -1) {
646
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
647
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
648
    }
D
dapan1121 已提交
649

L
Liu Jicong 已提交
650
    if (req.newConsumerId == -1) {
651
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
652
      goto end;
L
Liu Jicong 已提交
653
    }
D
dapan1121 已提交
654

L
Liu Jicong 已提交
655 656
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
657

H
Haojun Liao 已提交
658
    uint64_t oldConsumerId = pHandle->consumerId;
L
Liu Jicong 已提交
659 660 661
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
662

L
Liu Jicong 已提交
663
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
664
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
665

666
    // TODO version should be assigned and refed during preprocess
D
dapan1121 已提交
667
    SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
668
    if (pRef == NULL) {
669 670
      ret = -1;
      goto end;
671
    }
D
dapan1121 已提交
672

673 674
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
675

676
    SReadHandle handle = {.vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
H
Haojun Liao 已提交
677 678
    initStorageAPI(&handle.api);

wmmhello's avatar
wmmhello 已提交
679
    pHandle->snapshotVer = ver;
680

L
Liu Jicong 已提交
681
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
682
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
683
      req.qmsg = NULL;
684

X
Xiaoyu Wang 已提交
685 686
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId,
                                                          &pHandle->execHandle.numOfCols, req.newConsumerId);
L
Liu Jicong 已提交
687
      void* scanner = NULL;
688
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
689
      pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
L
Liu Jicong 已提交
690
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
D
dapan1121 已提交
691
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
692
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
D
dapan1121 已提交
693

L
Liu Jicong 已提交
694
      pHandle->execHandle.execDb.pFilterOutTbUid =
wmmhello's avatar
wmmhello 已提交
695
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
696
      buildSnapContext(handle.vnode, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
697
                       (SSnapContext**)(&handle.sContext));
698

699
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
700
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
D
dapan1121 已提交
701
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
702 703
      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
704
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
D
dapan1121 已提交
705 706
      vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
707 708
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
D
dapan1121 已提交
709
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
L
Liu Jicong 已提交
710
      }
711
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
712
      tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
L
Liu Jicong 已提交
713
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
714

715
      buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
L
Liu Jicong 已提交
716
                       (SSnapContext**)(&handle.sContext));
717
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
718
    }
H
Haojun Liao 已提交
719

720
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
dengyihao's avatar
dengyihao 已提交
721 722
    tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
            pHandle->consumerId, oldConsumerId);
723 724
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
725
  } else {
wmmhello's avatar
wmmhello 已提交
726 727 728 729 730
    while (tqIsHandleExec(pHandle)) {
      tqDebug("sub req vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
      taosMsleep(5);
    }

D
dapan1121 已提交
731 732 733
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
      atomic_add_fetch_32(&pHandle->epoch, 1);
734

735 736 737
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
738
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
wmmhello's avatar
wmmhello 已提交
739
      atomic_store_32(&pHandle->epoch, 0);
740 741 742 743 744 745 746 747
    }
    // kill executing task
    qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
    if (pTaskInfo != NULL) {
      qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
    }
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
      qStreamCloseTsdbReader(pTaskInfo);
L
Liu Jicong 已提交
748
    }
wmmhello's avatar
wmmhello 已提交
749 750
    // remove if it has been register in the push manager, and return one empty block to consumer
    tqUnregisterPushHandle(pTq, pHandle);
751 752
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
753
  }
L
Liu Jicong 已提交
754

755
end:
wmmhello's avatar
wmmhello 已提交
756
  taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
757
  taosMemoryFree(req.qmsg);
758
  return ret;
L
Liu Jicong 已提交
759
}
760

L
liuyao 已提交
761 762 763 764
void freePtr(void *ptr) {
  taosMemoryFree(*(void**)ptr);
}

765
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
D
dapan1121 已提交
766
  int32_t vgId = TD_VID(pTq->pVnode);
767

768
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
769
  pTask->refCnt = 1;
770
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
dengyihao's avatar
dengyihao 已提交
771 772
  pTask->inputQueue = streamQueueOpen(512 << 10);
  pTask->outputQueue = streamQueueOpen(512 << 10);
L
Liu Jicong 已提交
773 774

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
775
    return -1;
L
Liu Jicong 已提交
776 777
  }

L
Liu Jicong 已提交
778 779
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
780
  pTask->pMsgCb = &pTq->pVnode->msgCb;
781
  pTask->pMeta = pTq->pStreamMeta;
782
  pTask->chkInfo.version = ver;
783
  pTask->chkInfo.currentVer = ver;
784

785
  // expand executor
786
  pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL;
787

788
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
789
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
790 791 792 793
    if (pTask->pState == NULL) {
      return -1;
    }

794
    SReadHandle handle = {.vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
795
    initStorageAPI(&handle.api);
796

797 798
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
799 800
      return -1;
    }
801

802
    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
803
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
804
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
805 806 807
    if (pTask->pState == NULL) {
      return -1;
    }
808

809
    int32_t     numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo);
810 811
    SReadHandle handle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};
    initStorageAPI(&handle.api);
812

813
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
814
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
815 816
      return -1;
    }
817 818

    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
L
Liu Jicong 已提交
819
  }
L
Liu Jicong 已提交
820 821

  // sink
822
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
823
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
824
    pTask->smaSink.smaSink = smaHandleRes;
825
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
826
    pTask->tbSink.vnode = pTq->pVnode;
H
Haojun Liao 已提交
827
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
L
Liu Jicong 已提交
828

X
Xiaoyu Wang 已提交
829
    int32_t   ver1 = 1;
5
54liuyao 已提交
830
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
831
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
832
    if (code == TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
833
      ver1 = info.skmVer;
5
54liuyao 已提交
834
    }
L
Liu Jicong 已提交
835

836 837
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
838
    if (pTask->tbSink.pTSchema == NULL) {
D
dapan1121 已提交
839 840
      return -1;
    }
L
liuyao 已提交
841 842
    pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
    tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
L
Liu Jicong 已提交
843
  }
844

845
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
846
    SWalFilterCond cond = {.deleteMsg = 1};  // delete msg also extract from wal files
847
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
848 849
  }

850
  streamSetupTrigger(pTask);
851

852
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
853
         pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
854 855 856

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
857
  return 0;
L
Liu Jicong 已提交
858
}
L
Liu Jicong 已提交
859

860
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
861 862 863 864
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

865 866
  SStreamTaskCheckReq req;
  SDecoder            decoder;
867

X
Xiaoyu Wang 已提交
868
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
869 870
  tDecodeSStreamTaskCheckReq(&decoder, &req);
  tDecoderClear(&decoder);
871

872 873 874 875 876 877 878 879 880 881
  int32_t             taskId = req.downstreamTaskId;
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
882

L
Liu Jicong 已提交
883
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
884

885
  if (pTask != NULL) {
886
    rsp.status = streamTaskCheckStatus(pTask);
887 888
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

889
    tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64
890
            ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d",
891 892
            pTask->id.idStr, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus,
            rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
893 894
  } else {
    rsp.status = 0;
895 896
    tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
            ") %d at node %d, check req from task:0x%x at node %d, rsp status %d",
897 898
            taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            rsp.status);
899 900 901 902 903
  }

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
904

905 906
  tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
  if (code < 0) {
907
    tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId);
L
Liu Jicong 已提交
908
    return -1;
909
  }
L
Liu Jicong 已提交
910

911 912 913 914 915 916 917 918
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
  tEncoderClear(&encoder);

919
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
920

921 922 923 924
  tmsgSendRsp(&rspMsg);
  return 0;
}

925
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
926 927 928 929 930 931
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
932

933 934 935 936 937
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

938
  tDecoderClear(&decoder);
939
  tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task:0x%x at node %d, status %d",
940 941
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

L
Liu Jicong 已提交
942
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
943
  if (pTask == NULL) {
944 945
    tqError("tq failed to locate the stream task:0x%x vgId:%d, it may have been destroyed", rsp.upstreamTaskId,
            pTq->pStreamMeta->vgId);
946 947 948
    return -1;
  }

949
  code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
L
Liu Jicong 已提交
950 951
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
952 953
}

954
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
955 956 957
  int32_t code = 0;
  int32_t vgId = TD_VID(pTq->pVnode);

5
54liuyao 已提交
958 959 960
  if (tsDisableStream) {
    return 0;
  }
961 962 963 964

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
965 966
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, (int32_t) sizeof(SStreamTask));
967 968
    return -1;
  }
969

970 971
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
972
  code = tDecodeStreamTask(&decoder, pTask);
973 974 975 976 977
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
978

979 980
  tDecoderClear(&decoder);

981
  // 2.save task, use the newest commit version as the initial start version of stream task.
982
  taosWLockLatch(&pTq->pStreamMeta->lock);
983
  code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
984
  int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
985
  if (code < 0) {
986
    tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
987
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
988 989 990
    return -1;
  }

991 992
  taosWUnLockLatch(&pTq->pStreamMeta->lock);

993 994
  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
995
    streamTaskCheckDownstream(pTask, sversion);
996 997
  }

998
  tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr,
999
          pTask->status.taskStatus, numOfTasks);
1000 1001 1002
  return 0;
}

L
Liu Jicong 已提交
1003 1004 1005 1006 1007
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
  int32_t code;
  char*   msg = pMsg->pCont;
  int32_t msgLen = pMsg->contLen;

1008
  SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
L
Liu Jicong 已提交
1009
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1010 1011 1012 1013 1014
  if (pTask == NULL) {
    return -1;
  }

  // check param
1015
  int64_t fillVer1 = pTask->chkInfo.version;
1016
  if (fillVer1 <= 0) {
L
Liu Jicong 已提交
1017
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1018 1019 1020 1021
    return -1;
  }

  // do recovery step 1
H
Haojun Liao 已提交
1022
  tqDebug("s-task:%s start non-blocking recover stage(step 1) scan", pTask->id.idStr);
H
Haojun Liao 已提交
1023
  int64_t st = taosGetTimestampMs();
1024

H
Haojun Liao 已提交
1025
  streamSourceRecoverScanStep1(pTask);
1026
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
H
Haojun Liao 已提交
1027 1028
    tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);

L
Liu Jicong 已提交
1029 1030 1031 1032
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

H
Haojun Liao 已提交
1033
  double el = (taosGetTimestampMs() - st) / 1000.0;
H
Haojun Liao 已提交
1034
  tqDebug("s-task:%s non-blocking recover stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1035

1036 1037 1038 1039
  // build msg to launch next step
  SStreamRecoverStep2Req req;
  code = streamBuildSourceRecover2Req(pTask, &req);
  if (code < 0) {
L
Liu Jicong 已提交
1040
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1041 1042 1043
    return -1;
  }

L
Liu Jicong 已提交
1044
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1045
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1046 1047 1048
    return 0;
  }

1049
  // serialize msg
L
Liu Jicong 已提交
1050 1051 1052 1053
  int32_t len = sizeof(SStreamRecoverStep1Req);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
H
Haojun Liao 已提交
1054
    tqError("s-task:%s failed to prepare the step2 stage, out of memory", pTask->id.idStr);
L
Liu Jicong 已提交
1055 1056 1057 1058
    return -1;
  }

  memcpy(serializedReq, &req, len);
1059 1060

  // dispatch msg
H
Haojun Liao 已提交
1061
  tqDebug("s-task:%s start recover block stage", pTask->id.idStr);
1062

H
Haojun Liao 已提交
1063 1064
  SRpcMsg rpcMsg = {
      .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
1065 1066 1067 1068
  tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
  return 0;
}

1069
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
H
Haojun Liao 已提交
1070 1071
  int32_t code = 0;

1072
  SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
H
Haojun Liao 已提交
1073 1074

  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1075 1076 1077 1078 1079
  if (pTask == NULL) {
    return -1;
  }

  // do recovery step 2
H
Haojun Liao 已提交
1080 1081 1082
  int64_t st = taosGetTimestampMs();
  tqDebug("s-task:%s start step2 recover, ts:%"PRId64, pTask->id.idStr, st);

1083
  code = streamSourceRecoverScanStep2(pTask, sversion);
1084
  if (code < 0) {
L
Liu Jicong 已提交
1085
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1086 1087 1088
    return -1;
  }

1089 1090
  qDebug("s-task:%s set the start wal offset to be:%"PRId64, pTask->id.idStr, sversion);
  walReaderSeekVer(pTask->exec.pWalReader, sversion);
L
liuyao 已提交
1091
  pTask->chkInfo.currentVer = sversion;
1092

1093
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1094 1095 1096 1097
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1098 1099 1100
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1101
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1102 1103 1104 1105
    return -1;
  }

  // set status normal
H
Haojun Liao 已提交
1106
  tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr);
1107 1108
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1109
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1110 1111 1112
    return -1;
  }

H
Haojun Liao 已提交
1113
  double el = (taosGetTimestampMs() - st)/ 1000.0;
H
Haojun Liao 已提交
1114
  tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1115

1116 1117 1118
  // dispatch recover finish req to all related downstream task
  code = streamDispatchRecoverFinishReq(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1119
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1120 1121 1122
    return -1;
  }

L
Liu Jicong 已提交
1123 1124 1125
  atomic_store_8(&pTask->fillHistory, 0);
  streamMetaSaveTask(pTq->pStreamMeta, pTask);

L
Liu Jicong 已提交
1126
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1127 1128 1129
  return 0;
}

L
Liu Jicong 已提交
1130 1131 1132
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1133 1134

  // deserialize
1135 1136 1137
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1138
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1139 1140 1141
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

1142
  // find task
L
Liu Jicong 已提交
1143
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1144 1145 1146
  if (pTask == NULL) {
    return -1;
  }
1147
  // do process request
1148
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
1149
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1150 1151 1152
    return -1;
  }

L
Liu Jicong 已提交
1153
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1154
  return 0;
L
Liu Jicong 已提交
1155
}
L
Liu Jicong 已提交
1156

L
Liu Jicong 已提交
1157 1158 1159 1160 1161
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

1162 1163 1164 1165
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

H
Haojun Liao 已提交
1166 1167
  *pRefBlock = NULL;

1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tDecoderInit(pCoder, (uint8_t*)pData, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
  if (numOfTables == 0 || pRes->affectedRows == 0) {
    taosArrayDestroy(pRes->uidList);
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, numOfTables);
  pDelBlock->info.rows = numOfTables;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < numOfTables; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataSetVal(pUidCol, i, (const char*)pUid, false);

    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

  taosArrayDestroy(pRes->uidList);
  *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
  if (pRefBlock == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
  (*pRefBlock)->pBlock = pDelBlock;
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1215 1216
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1217 1218 1219 1220

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1221 1222
  if (taskId == WAL_READ_TASKS_ID) {  // all tasks are extracted submit data from the wal
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1223
    return 0;
1224
  }
1225

1226
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1227 1228 1229 1230 1231
  if (pTask != NULL) {
    if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
      tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId,
              pTask->id.idStr, pTask->chkInfo.version);
      streamProcessRunReq(pTask);
1232
    } else {
L
liuyao 已提交
1233
      if (streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
1234
        atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
L
liuyao 已提交
1235
      }
1236
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
1237
    }
1238

L
Liu Jicong 已提交
1239
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1240
    tqStartStreamTasks(pTq);
L
Liu Jicong 已提交
1241
    return 0;
1242
  } else {
1243
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
1244
    return -1;
L
Liu Jicong 已提交
1245
  }
L
Liu Jicong 已提交
1246 1247
}

L
Liu Jicong 已提交
1248
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
1249 1250 1251
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
1252 1253 1254 1255

  SStreamDispatchReq req = {0};

  SDecoder decoder;
L
Liu Jicong 已提交
1256
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1257
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1258

1259
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1260
  if (pTask) {
1261
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1262
    streamProcessDispatchMsg(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1263
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1264
    return 0;
1265
  } else {
L
liuyao 已提交
1266
    tDeleteStreamDispatchReq(&req);
1267
    return -1;
L
Liu Jicong 已提交
1268
  }
L
Liu Jicong 已提交
1269 1270
}

L
Liu Jicong 已提交
1271 1272
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1273
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1274
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1275 1276

  int32_t vgId = pTq->pStreamMeta->vgId;
L
Liu Jicong 已提交
1277
  if (pTask) {
1278
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1279
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1280
    return 0;
1281
  } else {
1282
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
1283
    return TSDB_CODE_INVALID_MSG;
L
Liu Jicong 已提交
1284
  }
L
Liu Jicong 已提交
1285
}
L
Liu Jicong 已提交
1286

1287
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1288
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1289 1290
  tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);

1291
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1292
  return 0;
L
Liu Jicong 已提交
1293
}
L
Liu Jicong 已提交
1294

5
54liuyao 已提交
1295 1296
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
L
liuyao 已提交
1297 1298 1299
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask) {
    tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
L
liuyao 已提交
1300
    atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
L
liuyao 已提交
1301 1302 1303
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  }
5
54liuyao 已提交
1304 1305 1306 1307 1308
  return 0;
}

int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
1309 1310

  int32_t      vgId = pTq->pStreamMeta->vgId;
L
liuyao 已提交
1311 1312
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask) {
L
liuyao 已提交
1313
    atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
1314 1315

    // no lock needs to secure the access of the version
1316 1317 1318 1319 1320 1321
    if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) {
      // discard all the data  when the stream task is suspended.
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
              ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
1322
    } else {  // from the previous paused version and go on
1323 1324
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
1325 1326
    }

L
liuyao 已提交
1327
    if (pTask->taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
L
liuyao 已提交
1328 1329 1330 1331
      tqStartStreamTasks(pTq);
    } else {
      streamSchedExec(pTask);
    }
L
liuyao 已提交
1332
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1333 1334
  } else {
    tqError("vgId:%d failed to find the s-task:0x%x for resume stream task", vgId, pReq->taskId);
L
liuyao 已提交
1335
  }
1336

5
54liuyao 已提交
1337 1338 1339
  return 0;
}

L
Liu Jicong 已提交
1340 1341 1342 1343 1344 1345
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1346
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1347
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1348
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1349
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1350
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1351
  if (pTask) {
1352
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
1353
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1354
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1355
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1356
    return 0;
L
Liu Jicong 已提交
1357
  } else {
L
liuyao 已提交
1358
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1359
    return -1;
L
Liu Jicong 已提交
1360 1361 1362 1363 1364 1365 1366
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1367

1368 1369 1370 1371 1372 1373
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1374 1375 1376

  SStreamDispatchReq req;
  SDecoder           decoder;
1377
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1378 1379
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1380
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1381 1382
    goto FAIL;
  }
L
Liu Jicong 已提交
1383
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1384

L
Liu Jicong 已提交
1385
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1386

L
Liu Jicong 已提交
1387
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1388
  if (pTask) {
1389
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1390
    streamProcessDispatchMsg(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1391
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1392 1393
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1394
    return 0;
5
54liuyao 已提交
1395 1396
  } else {
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1397
  }
L
Liu Jicong 已提交
1398

1399 1400
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1401
FAIL:
1402 1403 1404 1405
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1406
    SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info};
1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1423
  SRpcMsg rsp = {
1424
      .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
1425
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1426
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1427 1428
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1429
  return -1;
L
Liu Jicong 已提交
1430
}
L
Liu Jicong 已提交
1431

1432
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1433

1434
int32_t tqStartStreamTasks(STQ* pTq) {
1435
  int32_t      vgId = TD_VID(pTq->pVnode);
1436
  SStreamMeta* pMeta = pTq->pStreamMeta;
1437

1438
  taosWLockLatch(&pMeta->lock);
1439

1440
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
1441
  if (numOfTasks == 0) {
1442
    tqInfo("vgId:%d no stream tasks exist", vgId);
1443
    taosWUnLockLatch(&pMeta->lock);
1444 1445 1446
    return 0;
  }

1447
  pMeta->walScanCounter += 1;
1448

1449 1450
  if (pMeta->walScanCounter > 1) {
    tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
1451
    taosWUnLockLatch(&pMeta->lock);
1452 1453 1454
    return 0;
  }

1455 1456 1457
  SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
  if (pRunReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1458
    tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
1459
    taosWUnLockLatch(&pMeta->lock);
1460 1461 1462
    return -1;
  }

H
Haojun Liao 已提交
1463
  tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks);
1464 1465
  pRunReq->head.vgId = vgId;
  pRunReq->streamId = 0;
1466
  pRunReq->taskId = WAL_READ_TASKS_ID;
1467 1468 1469

  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
  tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
1470
  taosWUnLockLatch(&pMeta->lock);
1471 1472 1473

  return 0;
}