tq.c 48.8 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
#define WAL_READ_TASKS_ID (-1)
22

23
static int32_t tqInitialize(STQ* pTq);
dengyihao's avatar
dengyihao 已提交
24

wmmhello's avatar
wmmhello 已提交
25 26 27
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_EXEC;}
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_IDLE;}
wmmhello's avatar
wmmhello 已提交
28

L
Liu Jicong 已提交
29
int32_t tqInit() {
L
Liu Jicong 已提交
30 31 32 33 34 35
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

36 37 38 39 40 41
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
42 43 44
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
45
    atomic_store_8(&tqMgmt.inited, 1);
46
  }
47

L
Liu Jicong 已提交
48 49
  return 0;
}
L
Liu Jicong 已提交
50

51
void tqCleanUp() {
L
Liu Jicong 已提交
52 53 54 55 56 57 58 59
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
60
    streamCleanUp();
L
Liu Jicong 已提交
61 62
    atomic_store_8(&tqMgmt.inited, 0);
  }
63
}
L
Liu Jicong 已提交
64

65
static void destroyTqHandle(void* data) {
66 67
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
wmmhello's avatar
wmmhello 已提交
68

69
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
70
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
71
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
72
    tqReaderClose(pData->execHandle.pTqReader);
73 74
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
75
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
76
    walCloseReader(pData->pWalReader);
77
    tqReaderClose(pData->execHandle.pTqReader);
78
  }
79 80 81 82
  if(pData->msg != NULL) {
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
D
dapan1121 已提交
83
  }
L
Liu Jicong 已提交
84 85
}

86 87 88 89 90
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
91
STQ* tqOpen(const char* path, SVnode* pVnode) {
92
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
93
  if (pTq == NULL) {
S
Shengliang Guan 已提交
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
95 96
    return NULL;
  }
97

98
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
99
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
100
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
101

102
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
103
  taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
104

105
  taosInitRWLatch(&pTq->lock);
106
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
107

108
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
109
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
110

111 112 113 114 115 116 117
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
118 119 120
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
121
  if (tqMetaOpen(pTq) < 0) {
122
    return -1;
123 124
  }

L
Liu Jicong 已提交
125 126
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
127
    return -1;
128 129
  }

130
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
131
  if (pTq->pStreamMeta == NULL) {
132
    return -1;
L
Liu Jicong 已提交
133 134
  }

135 136
  // the version is kept in task's meta data
  // todo check if this version is required or not
137 138
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
139 140
  }

141
  return 0;
L
Liu Jicong 已提交
142
}
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
void tqClose(STQ* pTq) {
145 146
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
147
  }
148 149 150 151 152 153 154 155 156

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
157
}
L
Liu Jicong 已提交
158

H
Haojun Liao 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
      tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
172 173 174
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
175
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
176
      int64_t el = taosGetTimestampMs() - st;
H
Haojun Liao 已提交
177
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
178 179 180 181 182 183
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
  }
}

D
dapan1121 已提交
184 185
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
186 187
  int32_t len = 0;
  int32_t code = 0;
D
dapan1121 已提交
188 189

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
190
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
D
dapan1121 已提交
191 192 193
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
194 195 196 197 198 199 200 201 202 203 204

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

D
dapan1121 已提交
205 206 207
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
208 209 210 211 212 213

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);

D
dapan1121 已提交
214
  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
215
    tEncodeMqDataRsp(&encoder, pRsp);
D
dapan1121 已提交
216
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
X
Xiaoyu Wang 已提交
217
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
dengyihao's avatar
dengyihao 已提交
218
  }
L
Liu Jicong 已提交
219

wmmhello's avatar
wmmhello 已提交
220
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
221 222

  SRpcMsg rsp = {
D
dapan1121 已提交
223
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
224 225 226 227
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
L
Liu Jicong 已提交
228

L
Liu Jicong 已提交
229
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
230 231
  return 0;
}
L
Liu Jicong 已提交
232

H
Haojun Liao 已提交
233
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
234 235 236 237
  SMqDataRsp dataRsp = {0};
  dataRsp.head.consumerId = pHandle->consumerId;
  dataRsp.head.epoch = pHandle->epoch;
  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
238 239

  int64_t sver = 0, ever = 0;
H
Haojun Liao 已提交
240 241
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
  tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, ever);
D
dapan1121 已提交
242 243 244

  char buf1[80] = {0};
  char buf2[80] = {0};
245 246
  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
D
dapan1121 已提交
247
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
H
Haojun Liao 已提交
248
          vgId, dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
L
Liu Jicong 已提交
249 250 251
  return 0;
}

252 253 254 255 256 257
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
                      int32_t type, int32_t vgId) {
  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
258

D
dapan1121 已提交
259 260 261 262
  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);
263

X
Xiaoyu Wang 已提交
264
  tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
265
          vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
266 267 268 269

  return 0;
}

270
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
271 272
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
273

X
Xiaoyu Wang 已提交
274 275
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
276
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
277 278
    return -1;
  }
279

280 281
  tDecoderClear(&decoder);

282 283 284
  STqOffset* pOffset = &vgOffset.offset;

  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
285
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
286 287 288 289 290 291
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
            pOffset->val.version);
    if (pOffset->val.version + 1 == sversion) {
      pOffset->val.version += 1;
292
    }
293
  } else {
294
    tqError("invalid commit offset type:%d", pOffset->val.type);
295
    return -1;
296
  }
297

298 299
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
300
    tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
301
            vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
302
    return 0;  // no need to update the offset value
303 304
  }

305
  // save the new offset value
306
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
307
    return -1;
308
  }
309

310 311 312
  return 0;
}

313
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
314 315
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
316 317 318

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
319
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
H
Haojun Liao 已提交
320
    tqError("vgId:%d failed to decode seek msg", vgId);
321 322 323 324 325
    return -1;
  }

  tDecoderClear(&decoder);

H
Haojun Liao 已提交
326 327 328
  tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
          vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);

329 330 331
  STqOffset* pOffset = &vgOffset.offset;
  if (pOffset->val.type != TMQ_OFFSET__LOG) {
    tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
332 333 334
    return -1;
  }

335 336 337 338 339 340
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
  if (pHandle == NULL) {
    tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId,
        pOffset->subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
341 342
  }

343 344 345 346 347 348 349 350
  // 2. check consumer-vg assignment status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != vgOffset.consumerId) {
    tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
351
  }
352 353 354 355 356 357 358 359 360
  taosRUnLockLatch(&pTq->lock);

  //3. check the offset info
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL) {
    if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
      return 0;  // no need to update the offset value
    }
361

362 363 364 365 366 367
    if (pSavedOffset->val.version == pOffset->val.version) {
      tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
              pOffset->val.version, pSavedOffset->val.version);
      return 0;
    }
  }
368 369 370

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
371 372 373 374
  if (pOffset->val.version < sver) {
    pOffset->val.version = sver;
  } else if (pOffset->val.version > ever) {
    pOffset->val.version = ever;
375 376 377
  }

  // save the new offset value
378 379 380 381 382 383
  if (pSavedOffset != NULL) {
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
            pSavedOffset->val.version);
  } else {
    tqDebug("vgId:%d sub:%s seek to:%"PRId64" not saved yet", vgId, pOffset->subKey, pOffset->val.version);
  }
384

385 386
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
    tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
387 388 389
    return -1;
  }

H
Haojun Liao 已提交
390 391 392
  tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
          vgOffset.consumerId, vgOffset.offset.val.version);

393 394 395
  return 0;
}

L
Liu Jicong 已提交
396
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
397
  void* pIter = NULL;
398

L
Liu Jicong 已提交
399
  while (1) {
400
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
401 402 403 404
    if (pIter == NULL) {
      break;
    }

405
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
406

L
Liu Jicong 已提交
407 408
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
409
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
410 411
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
412
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
413 414 415 416 417
          return -1;
        }
      }
    }
  }
418

L
Liu Jicong 已提交
419 420 421
  return 0;
}

D
dapan1121 已提交
422
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
423
  SMqPollReq req = {0};
D
dapan1121 已提交
424 425 426 427 428 429 430 431 432 433 434
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

wmmhello's avatar
wmmhello 已提交
435
  taosWLockLatch(&pTq->lock);
D
dapan1121 已提交
436 437 438 439 440
  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
wmmhello's avatar
wmmhello 已提交
441
    taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
442 443 444
    return -1;
  }

wmmhello's avatar
wmmhello 已提交
445 446 447 448 449
  while (tqIsHandleExec(pHandle)) {
    tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", consumerId, vgId, req.subKey);
    taosMsleep(5);
  }

D
dapan1121 已提交
450 451 452
  // 2. check re-balance status
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
453
            consumerId, vgId, req.subKey, pHandle->consumerId);
D
dapan1121 已提交
454
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
455
    taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
456 457
    return -1;
  }
458 459
  tqSetHandleExec(pHandle);
  taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
460 461 462 463

  // 3. update the epoch value
  int32_t savedEpoch = pHandle->epoch;
  if (savedEpoch < reqEpoch) {
X
Xiaoyu Wang 已提交
464 465
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
            reqEpoch);
D
dapan1121 已提交
466 467 468 469 470 471 472 473
    pHandle->epoch = reqEpoch;
  }

  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);

474 475 476
  int code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
  tqSetHandleIdle(pHandle);
  return code;
D
dapan1121 已提交
477 478
}

479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  // 2. check re-balance status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, vgId, req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
  }
  taosRUnLockLatch(&pTq->lock);

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);

  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
  if (pOffset != NULL) {
    if (pOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
    dataRsp.rspOffset.version = pOffset->val.version;
  } else {
    if (req.useSnapshot == true) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;

537
    if (reqOffset.type == TMQ_OFFSET__LOG) {
538 539 540 541 542 543
      int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
      if (currentVer == -1) { // not start to read data from wal yet, return req offset directly
        dataRsp.rspOffset.version = reqOffset.version;
      } else {
        dataRsp.rspOffset.version = currentVer;  // return current consume offset value
      }
544 545
    } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
      dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
546 547 548 549 550 551 552 553 554 555 556 557 558 559 560
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      dataRsp.rspOffset.version = ever;
    } else {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
              reqOffset.type);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }
  }

  tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
  return 0;
}

561
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
562
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
563
  int32_t vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
564

565
  tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
566
  int32_t code = 0;
L
Liu Jicong 已提交
567

wmmhello's avatar
wmmhello 已提交
568
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
569 570
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
571
    while (tqIsHandleExec(pHandle)) {
572 573 574
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
      taosMsleep(5);
    }
575

L
Liu Jicong 已提交
576 577 578
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
579

L
Liu Jicong 已提交
580 581 582 583
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
584
  }
585

L
Liu Jicong 已提交
586 587
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
588
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
589
  }
L
Liu Jicong 已提交
590

L
Liu Jicong 已提交
591
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
592
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
593
  }
wmmhello's avatar
wmmhello 已提交
594 595
  taosWUnLockLatch(&pTq->lock);

L
Liu Jicong 已提交
596
  return 0;
L
Liu Jicong 已提交
597 598
}

599
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
600 601
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
602
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
603
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
604 605 606 607
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
608 609 610 611 612
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
613 614 615 616 617 618
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

619
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
620 621 622 623 624 625 626 627 628 629 630
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

631
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
632
  int ret = 0;
L
Liu Jicong 已提交
633
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
634
  tDecodeSMqRebVgReq(msg, &req);
L
Liu Jicong 已提交
635

D
dapan1121 已提交
636 637 638
  SVnode* pVnode = pTq->pVnode;
  int32_t vgId = TD_VID(pVnode);

639
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
D
dapan1121 已提交
640
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
641

wmmhello's avatar
wmmhello 已提交
642
  taosWLockLatch(&pTq->lock);
643
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
644
  if (pHandle == NULL) {
L
Liu Jicong 已提交
645
    if (req.oldConsumerId != -1) {
646
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
647
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
648
    }
D
dapan1121 已提交
649

L
Liu Jicong 已提交
650
    if (req.newConsumerId == -1) {
651
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
652
      goto end;
L
Liu Jicong 已提交
653
    }
D
dapan1121 已提交
654

L
Liu Jicong 已提交
655 656
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
657

H
Haojun Liao 已提交
658
    uint64_t oldConsumerId = pHandle->consumerId;
L
Liu Jicong 已提交
659 660 661
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
662

L
Liu Jicong 已提交
663
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
664
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
665

666
    // TODO version should be assigned and refed during preprocess
D
dapan1121 已提交
667
    SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
668
    if (pRef == NULL) {
669 670
      ret = -1;
      goto end;
671
    }
D
dapan1121 已提交
672

673 674
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
675

676
    SReadHandle handle = {.vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
H
Haojun Liao 已提交
677 678
    initStorageAPI(&handle.api);

wmmhello's avatar
wmmhello 已提交
679
    pHandle->snapshotVer = ver;
680

L
Liu Jicong 已提交
681
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
682
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
683
      req.qmsg = NULL;
684

X
Xiaoyu Wang 已提交
685 686
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId,
                                                          &pHandle->execHandle.numOfCols, req.newConsumerId);
L
Liu Jicong 已提交
687
      void* scanner = NULL;
688
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
689
      pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
L
Liu Jicong 已提交
690
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
D
dapan1121 已提交
691
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
692
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
D
dapan1121 已提交
693

L
Liu Jicong 已提交
694
      pHandle->execHandle.execDb.pFilterOutTbUid =
wmmhello's avatar
wmmhello 已提交
695
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
696
      buildSnapContext(handle.vnode, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
697
                       (SSnapContext**)(&handle.sContext));
698

699
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
700
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
D
dapan1121 已提交
701
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
702 703
      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
704
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
D
dapan1121 已提交
705 706
      vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
707 708
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
D
dapan1121 已提交
709
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
L
Liu Jicong 已提交
710
      }
711
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
712
      tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
L
Liu Jicong 已提交
713
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
714

715
      buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
L
Liu Jicong 已提交
716
                       (SSnapContext**)(&handle.sContext));
717
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
718
    }
H
Haojun Liao 已提交
719

720
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
dengyihao's avatar
dengyihao 已提交
721 722
    tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
            pHandle->consumerId, oldConsumerId);
723 724
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
725
  } else {
wmmhello's avatar
wmmhello 已提交
726 727 728 729 730
    while (tqIsHandleExec(pHandle)) {
      tqDebug("sub req vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
      taosMsleep(5);
    }

D
dapan1121 已提交
731 732 733
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
      atomic_add_fetch_32(&pHandle->epoch, 1);
734

735 736 737
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
738
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
wmmhello's avatar
wmmhello 已提交
739
      atomic_store_32(&pHandle->epoch, 0);
740 741 742 743 744 745 746 747
    }
    // kill executing task
    qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
    if (pTaskInfo != NULL) {
      qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
    }
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
      qStreamCloseTsdbReader(pTaskInfo);
L
Liu Jicong 已提交
748
    }
wmmhello's avatar
wmmhello 已提交
749 750
    // remove if it has been register in the push manager, and return one empty block to consumer
    tqUnregisterPushHandle(pTq, pHandle);
751 752
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
753
  }
L
Liu Jicong 已提交
754

755
end:
wmmhello's avatar
wmmhello 已提交
756
  taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
757
  taosMemoryFree(req.qmsg);
758
  return ret;
L
Liu Jicong 已提交
759
}
760

L
liuyao 已提交
761 762 763 764
void freePtr(void *ptr) {
  taosMemoryFree(*(void**)ptr);
}

765
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
D
dapan1121 已提交
766
  int32_t vgId = TD_VID(pTq->pVnode);
767

768
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
769
  pTask->refCnt = 1;
770
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
dengyihao's avatar
dengyihao 已提交
771 772
  pTask->inputQueue = streamQueueOpen(512 << 10);
  pTask->outputQueue = streamQueueOpen(512 << 10);
L
Liu Jicong 已提交
773 774

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
775
    return -1;
L
Liu Jicong 已提交
776 777
  }

L
Liu Jicong 已提交
778 779
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
780
  pTask->pMsgCb = &pTq->pVnode->msgCb;
781
  pTask->pMeta = pTq->pStreamMeta;
782
  pTask->chkInfo.version = ver;
783
  pTask->chkInfo.currentVer = ver;
784

785
  // expand executor
786
  pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL;
787

788
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
789
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
790 791 792 793
    if (pTask->pState == NULL) {
      return -1;
    }

794
    SReadHandle handle = {.vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
795
    initStorageAPI(&handle.api);
796

797 798
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
799 800
      return -1;
    }
801

802
    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
803
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
804
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
805 806 807
    if (pTask->pState == NULL) {
      return -1;
    }
808

809
    int32_t     numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo);
810 811
    SReadHandle handle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};
    initStorageAPI(&handle.api);
812

813
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
814
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
815 816
      return -1;
    }
817 818

    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
L
Liu Jicong 已提交
819
  }
L
Liu Jicong 已提交
820 821

  // sink
822
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
823
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
824
    pTask->smaSink.smaSink = smaHandleRes;
825
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
826
    pTask->tbSink.vnode = pTq->pVnode;
H
Haojun Liao 已提交
827
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
L
Liu Jicong 已提交
828

X
Xiaoyu Wang 已提交
829
    int32_t   ver1 = 1;
5
54liuyao 已提交
830
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
831
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
832
    if (code == TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
833
      ver1 = info.skmVer;
5
54liuyao 已提交
834
    }
L
Liu Jicong 已提交
835

836 837
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
838
    if (pTask->tbSink.pTSchema == NULL) {
D
dapan1121 已提交
839 840
      return -1;
    }
L
liuyao 已提交
841 842
    pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
    tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
L
Liu Jicong 已提交
843
  }
844

845
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
846
    SWalFilterCond cond = {.deleteMsg = 1};  // delete msg also extract from wal files
847
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
848 849
  }

850
  streamSetupTrigger(pTask);
851

852
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
853
         pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
854 855 856

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
857
  return 0;
L
Liu Jicong 已提交
858
}
L
Liu Jicong 已提交
859

860
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
861 862 863 864
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

865 866
  SStreamTaskCheckReq req;
  SDecoder            decoder;
867

X
Xiaoyu Wang 已提交
868
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
869 870
  tDecodeSStreamTaskCheckReq(&decoder, &req);
  tDecoderClear(&decoder);
871

872 873 874 875 876 877 878 879 880 881
  int32_t             taskId = req.downstreamTaskId;
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
882

L
Liu Jicong 已提交
883
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
884

885
  if (pTask != NULL) {
886
    rsp.status = streamTaskCheckStatus(pTask);
887 888
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

889
    tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64
890
            ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d",
891 892
            pTask->id.idStr, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus,
            rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
893 894
  } else {
    rsp.status = 0;
895 896
    tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
            ") %d at node %d, check req from task:0x%x at node %d, rsp status %d",
897 898
            taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            rsp.status);
899 900 901 902 903
  }

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
904

905 906
  tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
  if (code < 0) {
907
    tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId);
L
Liu Jicong 已提交
908
    return -1;
909
  }
L
Liu Jicong 已提交
910

911 912 913 914 915 916 917 918
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
  tEncoderClear(&encoder);

919
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
920

921 922 923 924
  tmsgSendRsp(&rspMsg);
  return 0;
}

925
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
926 927 928 929 930 931
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
932

933 934 935 936 937
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

938
  tDecoderClear(&decoder);
939
  tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task:0x%x at node %d, status %d",
940 941
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

L
Liu Jicong 已提交
942
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
943
  if (pTask == NULL) {
944 945
    tqError("tq failed to locate the stream task:0x%x vgId:%d, it may have been destroyed", rsp.upstreamTaskId,
            pTq->pStreamMeta->vgId);
946 947 948
    return -1;
  }

949
  code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
L
Liu Jicong 已提交
950 951
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
952 953
}

954
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
955 956 957
  int32_t code = 0;
  int32_t vgId = TD_VID(pTq->pVnode);

5
54liuyao 已提交
958 959 960
  if (tsDisableStream) {
    return 0;
  }
961 962 963 964

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
965 966
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, (int32_t) sizeof(SStreamTask));
967 968
    return -1;
  }
969

970 971
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
972
  code = tDecodeStreamTask(&decoder, pTask);
973 974 975 976 977
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
978

979 980
  tDecoderClear(&decoder);

981
  // 2.save task, use the newest commit version as the initial start version of stream task.
982
  taosWLockLatch(&pTq->pStreamMeta->lock);
983
  code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
984
  int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
985
  if (code < 0) {
986
    tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
987
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
988 989 990
    return -1;
  }

991 992
  taosWUnLockLatch(&pTq->pStreamMeta->lock);

993 994
  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
995
    streamTaskCheckDownstream(pTask, sversion);
996 997
  }

998
  tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr,
999
          pTask->status.taskStatus, numOfTasks);
1000 1001 1002
  return 0;
}

L
Liu Jicong 已提交
1003 1004 1005 1006 1007
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
  int32_t code;
  char*   msg = pMsg->pCont;
  int32_t msgLen = pMsg->contLen;

1008
  SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
L
Liu Jicong 已提交
1009
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1010 1011 1012 1013 1014
  if (pTask == NULL) {
    return -1;
  }

  // check param
1015
  int64_t fillVer1 = pTask->chkInfo.version;
1016
  if (fillVer1 <= 0) {
L
Liu Jicong 已提交
1017
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1018 1019 1020 1021
    return -1;
  }

  // do recovery step 1
H
Haojun Liao 已提交
1022
  tqDebug("s-task:%s start non-blocking recover stage(step 1) scan", pTask->id.idStr);
H
Haojun Liao 已提交
1023
  int64_t st = taosGetTimestampMs();
1024

H
Haojun Liao 已提交
1025
  streamSourceRecoverScanStep1(pTask);
1026
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
H
Haojun Liao 已提交
1027 1028
    tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);

L
Liu Jicong 已提交
1029 1030 1031 1032
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

H
Haojun Liao 已提交
1033
  double el = (taosGetTimestampMs() - st) / 1000.0;
H
Haojun Liao 已提交
1034
  tqDebug("s-task:%s non-blocking recover stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1035

1036 1037 1038 1039
  // build msg to launch next step
  SStreamRecoverStep2Req req;
  code = streamBuildSourceRecover2Req(pTask, &req);
  if (code < 0) {
L
Liu Jicong 已提交
1040
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1041 1042 1043
    return -1;
  }

L
Liu Jicong 已提交
1044
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1045
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1046 1047 1048
    return 0;
  }

1049
  // serialize msg
L
Liu Jicong 已提交
1050 1051 1052 1053
  int32_t len = sizeof(SStreamRecoverStep1Req);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
H
Haojun Liao 已提交
1054
    tqError("s-task:%s failed to prepare the step2 stage, out of memory", pTask->id.idStr);
L
Liu Jicong 已提交
1055 1056 1057 1058
    return -1;
  }

  memcpy(serializedReq, &req, len);
1059 1060

  // dispatch msg
H
Haojun Liao 已提交
1061
  tqDebug("s-task:%s start recover block stage", pTask->id.idStr);
1062

H
Haojun Liao 已提交
1063 1064
  SRpcMsg rpcMsg = {
      .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
1065 1066 1067 1068
  tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
  return 0;
}

1069
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
H
Haojun Liao 已提交
1070 1071
  int32_t code = 0;

1072
  SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
H
Haojun Liao 已提交
1073 1074

  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1075 1076 1077 1078 1079
  if (pTask == NULL) {
    return -1;
  }

  // do recovery step 2
H
Haojun Liao 已提交
1080 1081 1082
  int64_t st = taosGetTimestampMs();
  tqDebug("s-task:%s start step2 recover, ts:%"PRId64, pTask->id.idStr, st);

1083
  code = streamSourceRecoverScanStep2(pTask, sversion);
1084
  if (code < 0) {
L
Liu Jicong 已提交
1085
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1086 1087 1088
    return -1;
  }

1089 1090 1091
  qDebug("s-task:%s set the start wal offset to be:%"PRId64, pTask->id.idStr, sversion);
  walReaderSeekVer(pTask->exec.pWalReader, sversion);

1092
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1093 1094 1095 1096
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1097 1098 1099
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1100
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1101 1102 1103 1104
    return -1;
  }

  // set status normal
H
Haojun Liao 已提交
1105
  tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr);
1106 1107
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1108
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1109 1110 1111
    return -1;
  }

H
Haojun Liao 已提交
1112
  double el = (taosGetTimestampMs() - st)/ 1000.0;
H
Haojun Liao 已提交
1113
  tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1114

1115 1116 1117
  // dispatch recover finish req to all related downstream task
  code = streamDispatchRecoverFinishReq(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1118
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1119 1120 1121
    return -1;
  }

L
Liu Jicong 已提交
1122 1123 1124
  atomic_store_8(&pTask->fillHistory, 0);
  streamMetaSaveTask(pTq->pStreamMeta, pTask);

L
Liu Jicong 已提交
1125
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1126 1127 1128
  return 0;
}

L
Liu Jicong 已提交
1129 1130 1131
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1132 1133

  // deserialize
1134 1135 1136
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1137
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1138 1139 1140
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

1141
  // find task
L
Liu Jicong 已提交
1142
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1143 1144 1145
  if (pTask == NULL) {
    return -1;
  }
1146
  // do process request
1147
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
1148
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1149 1150 1151
    return -1;
  }

L
Liu Jicong 已提交
1152
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1153
  return 0;
L
Liu Jicong 已提交
1154
}
L
Liu Jicong 已提交
1155

L
Liu Jicong 已提交
1156 1157 1158 1159 1160
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

1161 1162 1163 1164
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

H
Haojun Liao 已提交
1165 1166
  *pRefBlock = NULL;

1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tDecoderInit(pCoder, (uint8_t*)pData, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
  if (numOfTables == 0 || pRes->affectedRows == 0) {
    taosArrayDestroy(pRes->uidList);
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, numOfTables);
  pDelBlock->info.rows = numOfTables;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < numOfTables; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataSetVal(pUidCol, i, (const char*)pUid, false);

    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

  taosArrayDestroy(pRes->uidList);
  *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
  if (pRefBlock == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
  (*pRefBlock)->pBlock = pDelBlock;
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1214 1215
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1216 1217 1218 1219

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1220 1221
  if (taskId == WAL_READ_TASKS_ID) {  // all tasks are extracted submit data from the wal
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1222
    return 0;
1223
  }
1224

1225
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1226 1227 1228 1229 1230
  if (pTask != NULL) {
    if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
      tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId,
              pTask->id.idStr, pTask->chkInfo.version);
      streamProcessRunReq(pTask);
1231
    } else {
L
liuyao 已提交
1232 1233 1234
      if (streamTaskShouldPause(&pTask->status)) {
        atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__INACTIVE);
      }
1235
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
1236
    }
1237

L
Liu Jicong 已提交
1238
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1239
    tqStartStreamTasks(pTq);
L
Liu Jicong 已提交
1240
    return 0;
1241
  } else {
1242
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
1243
    return -1;
L
Liu Jicong 已提交
1244
  }
L
Liu Jicong 已提交
1245 1246
}

L
Liu Jicong 已提交
1247
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
1248 1249 1250
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
1251 1252 1253 1254

  SStreamDispatchReq req = {0};

  SDecoder decoder;
L
Liu Jicong 已提交
1255
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1256
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1257

1258
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1259
  if (pTask) {
1260
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1261
    streamProcessDispatchMsg(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1262
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1263
    return 0;
1264
  } else {
L
liuyao 已提交
1265
    tDeleteStreamDispatchReq(&req);
1266
    return -1;
L
Liu Jicong 已提交
1267
  }
L
Liu Jicong 已提交
1268 1269
}

L
Liu Jicong 已提交
1270 1271
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1272
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1273
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1274
  tqDebug("recv dispatch rsp, code:%x", pMsg->code);
L
Liu Jicong 已提交
1275
  if (pTask) {
1276
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1277
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1278
    return 0;
1279 1280
  } else {
    return -1;
L
Liu Jicong 已提交
1281
  }
L
Liu Jicong 已提交
1282
}
L
Liu Jicong 已提交
1283

1284
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1285
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1286 1287
  tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);

1288
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1289
  return 0;
L
Liu Jicong 已提交
1290
}
L
Liu Jicong 已提交
1291

5
54liuyao 已提交
1292 1293
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
L
liuyao 已提交
1294 1295 1296
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask) {
    tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
L
liuyao 已提交
1297
    atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
L
liuyao 已提交
1298 1299 1300
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  }
5
54liuyao 已提交
1301 1302 1303 1304 1305
  return 0;
}

int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
L
liuyao 已提交
1306 1307
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask) {
L
liuyao 已提交
1308
    atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
1309 1310

    // no lock needs to secure the access of the version
L
liuyao 已提交
1311
    if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) {  // discard all the data  when the stream task is suspended.
1312
      pTask->chkInfo.currentVer = sversion;
1313
      walReaderSeekVer(pTask->exec.pWalReader, sversion);
L
liuyao 已提交
1314 1315
      tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId,
              pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
1316
    } else {  // from the previous paused version and go on
L
liuyao 已提交
1317 1318
      tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId,
              pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
1319 1320
    }

L
liuyao 已提交
1321
    if (pTask->taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
L
liuyao 已提交
1322 1323 1324 1325
      tqStartStreamTasks(pTq);
    } else {
      streamSchedExec(pTask);
    }
L
liuyao 已提交
1326
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
liuyao 已提交
1327
  }
1328

5
54liuyao 已提交
1329 1330 1331
  return 0;
}

L
Liu Jicong 已提交
1332 1333 1334 1335 1336 1337
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1338
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1339
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1340
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1341
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1342
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1343
  if (pTask) {
1344
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
1345
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1346
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1347
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1348
    return 0;
L
Liu Jicong 已提交
1349
  } else {
L
liuyao 已提交
1350
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1351
    return -1;
L
Liu Jicong 已提交
1352 1353 1354 1355 1356 1357 1358
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1359

1360 1361 1362 1363 1364 1365
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1366 1367 1368

  SStreamDispatchReq req;
  SDecoder           decoder;
1369
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1370 1371
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1372
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1373 1374
    goto FAIL;
  }
L
Liu Jicong 已提交
1375
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1376

L
Liu Jicong 已提交
1377
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1378

L
Liu Jicong 已提交
1379
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1380
  if (pTask) {
1381
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1382
    streamProcessDispatchMsg(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1383
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1384 1385
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1386
    return 0;
5
54liuyao 已提交
1387 1388
  } else {
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1389
  }
L
Liu Jicong 已提交
1390

1391 1392
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1393
FAIL:
1394 1395 1396 1397
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1398
    SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info};
1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1415
  SRpcMsg rsp = {
1416
      .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
1417
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1418
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1419 1420
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1421
  return -1;
L
Liu Jicong 已提交
1422
}
L
Liu Jicong 已提交
1423

1424
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1425

1426
int32_t tqStartStreamTasks(STQ* pTq) {
1427
  int32_t      vgId = TD_VID(pTq->pVnode);
1428
  SStreamMeta* pMeta = pTq->pStreamMeta;
1429

1430
  taosWLockLatch(&pMeta->lock);
1431

1432
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
1433
  if (numOfTasks == 0) {
1434
    tqInfo("vgId:%d no stream tasks exist", vgId);
1435 1436 1437 1438
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1439
  pMeta->walScanCounter += 1;
1440

1441 1442
  if (pMeta->walScanCounter > 1) {
    tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
1443 1444 1445 1446
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1447 1448 1449
  SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
  if (pRunReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1450
    tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
1451
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
1452 1453 1454
    return -1;
  }

H
Haojun Liao 已提交
1455
  tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks);
1456 1457
  pRunReq->head.vgId = vgId;
  pRunReq->streamId = 0;
1458
  pRunReq->taskId = WAL_READ_TASKS_ID;
1459 1460 1461

  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
  tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
1462
  taosWUnLockLatch(&pTq->pStreamMeta->lock);
1463 1464 1465

  return 0;
}