tq.c 49.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
#define WAL_READ_TASKS_ID (-1)
22

23
static int32_t tqInitialize(STQ* pTq);
dengyihao's avatar
dengyihao 已提交
24

L
Liu Jicong 已提交
25
int32_t tqInit() {
L
Liu Jicong 已提交
26 27 28 29 30 31
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

32 33 34 35 36 37
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
38 39 40
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
41
    atomic_store_8(&tqMgmt.inited, 1);
42
  }
43

L
Liu Jicong 已提交
44 45
  return 0;
}
L
Liu Jicong 已提交
46

47
void tqCleanUp() {
L
Liu Jicong 已提交
48 49 50 51 52 53 54 55
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
56
    streamCleanUp();
L
Liu Jicong 已提交
57 58
    atomic_store_8(&tqMgmt.inited, 0);
  }
59
}
L
Liu Jicong 已提交
60

61
static void destroyTqHandle(void* data) {
62 63 64
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
65
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
66
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
67
    tqCloseReader(pData->execHandle.pTqReader);
68 69
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
70
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
71
    walCloseReader(pData->pWalReader);
72
    tqCloseReader(pData->execHandle.pTqReader);
73
  }
74 75 76 77
  if(pData->msg != NULL) {
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
D
dapan1121 已提交
78
  }
L
Liu Jicong 已提交
79 80
}

81 82 83 84 85
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
86
STQ* tqOpen(const char* path, SVnode* pVnode) {
87
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
88
  if (pTq == NULL) {
S
Shengliang Guan 已提交
89
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
90 91
    return NULL;
  }
92

93
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
94
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
95
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
96

97
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
98
  taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
99

100
  taosInitRWLatch(&pTq->lock);
101
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
102

103
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
104
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
105

106 107 108 109 110 111 112
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
113 114 115
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
116
  if (tqMetaOpen(pTq) < 0) {
117
    return -1;
118 119
  }

L
Liu Jicong 已提交
120 121
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
122
    return -1;
123 124
  }

125
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
126
  if (pTq->pStreamMeta == NULL) {
127
    return -1;
L
Liu Jicong 已提交
128 129
  }

130 131
  // the version is kept in task's meta data
  // todo check if this version is required or not
132 133
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
134 135
  }

136
  return 0;
L
Liu Jicong 已提交
137
}
L
Liu Jicong 已提交
138

L
Liu Jicong 已提交
139
void tqClose(STQ* pTq) {
140 141
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
142
  }
143 144 145 146 147 148 149 150 151

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
152
}
L
Liu Jicong 已提交
153

H
Haojun Liao 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
      tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
167 168 169
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
170
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
171
      int64_t el = taosGetTimestampMs() - st;
H
Haojun Liao 已提交
172
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
173 174 175 176 177 178
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
  }
}

D
dapan1121 已提交
179 180
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
181 182
  int32_t len = 0;
  int32_t code = 0;
D
dapan1121 已提交
183 184

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
185
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
D
dapan1121 已提交
186 187 188
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
189 190 191 192 193 194 195 196 197 198 199

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

D
dapan1121 已提交
200 201 202
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
203 204 205 206 207 208

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);

D
dapan1121 已提交
209
  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
210
    tEncodeMqDataRsp(&encoder, pRsp);
D
dapan1121 已提交
211
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
X
Xiaoyu Wang 已提交
212
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
dengyihao's avatar
dengyihao 已提交
213
  }
L
Liu Jicong 已提交
214

wmmhello's avatar
wmmhello 已提交
215
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
216 217

  SRpcMsg rsp = {
D
dapan1121 已提交
218
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
219 220 221 222
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
L
Liu Jicong 已提交
223

L
Liu Jicong 已提交
224
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
225 226
  return 0;
}
L
Liu Jicong 已提交
227

H
Haojun Liao 已提交
228
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
229 230 231 232
  SMqDataRsp dataRsp = {0};
  dataRsp.head.consumerId = pHandle->consumerId;
  dataRsp.head.epoch = pHandle->epoch;
  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
233 234

  int64_t sver = 0, ever = 0;
H
Haojun Liao 已提交
235 236
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
  tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, ever);
D
dapan1121 已提交
237 238 239

  char buf1[80] = {0};
  char buf2[80] = {0};
240 241
  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
D
dapan1121 已提交
242
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
H
Haojun Liao 已提交
243
          vgId, dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
L
Liu Jicong 已提交
244 245 246
  return 0;
}

247 248 249 250 251 252
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
                      int32_t type, int32_t vgId) {
  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
253

D
dapan1121 已提交
254 255 256 257
  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);
258

X
Xiaoyu Wang 已提交
259
  tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
260
          vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
261 262 263 264

  return 0;
}

265
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
266 267
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
268

X
Xiaoyu Wang 已提交
269 270
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
271
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
272 273
    return -1;
  }
274

275 276
  tDecoderClear(&decoder);

277 278 279
  STqOffset* pOffset = &vgOffset.offset;

  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
280
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
281 282 283 284 285 286
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
            pOffset->val.version);
    if (pOffset->val.version + 1 == sversion) {
      pOffset->val.version += 1;
287
    }
288
  } else {
289
    tqError("invalid commit offset type:%d", pOffset->val.type);
290
    return -1;
291
  }
292

293 294
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
295
    tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
296
            vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
297
    return 0;  // no need to update the offset value
298 299
  }

300
  // save the new offset value
301
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
302
    return -1;
303
  }
304

305 306 307
  if (pOffset->val.type == TMQ_OFFSET__LOG) {
    STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
    if (pHandle && (walRefVer(pHandle->pRef, pOffset->val.version) < 0)) {
308
      return -1;
309 310 311
    }
  }

312 313 314
  return 0;
}

315
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
316 317
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
318 319 320

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
321
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
322 323 324 325 326
    return -1;
  }

  tDecoderClear(&decoder);

327 328 329
  STqOffset* pOffset = &vgOffset.offset;
  if (pOffset->val.type != TMQ_OFFSET__LOG) {
    tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
330 331 332
    return -1;
  }

333 334 335 336 337 338
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
  if (pHandle == NULL) {
    tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId,
        pOffset->subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
339 340
  }

341 342 343 344 345 346 347 348
  // 2. check consumer-vg assignment status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != vgOffset.consumerId) {
    tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
349
  }
350 351 352 353 354 355 356 357 358
  taosRUnLockLatch(&pTq->lock);

  //3. check the offset info
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL) {
    if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
      return 0;  // no need to update the offset value
    }
359

360 361 362 363 364 365
    if (pSavedOffset->val.version == pOffset->val.version) {
      tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
              pOffset->val.version, pSavedOffset->val.version);
      return 0;
    }
  }
366 367 368

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
369 370 371 372
  if (pOffset->val.version < sver) {
    pOffset->val.version = sver;
  } else if (pOffset->val.version > ever) {
    pOffset->val.version = ever;
373 374 375
  }

  // save the new offset value
376 377 378 379 380 381
  if (pSavedOffset != NULL) {
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
            pSavedOffset->val.version);
  } else {
    tqDebug("vgId:%d sub:%s seek to:%"PRId64" not saved yet", vgId, pOffset->subKey, pOffset->val.version);
  }
382

383 384
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
    tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
385 386 387 388 389 390
    return -1;
  }

  return 0;
}

L
Liu Jicong 已提交
391
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
392
  void* pIter = NULL;
393

L
Liu Jicong 已提交
394
  while (1) {
395
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
396 397 398 399
    if (pIter == NULL) {
      break;
    }

400
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
401

L
Liu Jicong 已提交
402 403
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
404
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
405 406
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
407
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
408 409 410 411 412
          return -1;
        }
      }
    }
  }
413

L
Liu Jicong 已提交
414 415 416
  return 0;
}

D
dapan1121 已提交
417
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
418
  SMqPollReq req = {0};
D
dapan1121 已提交
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  // 2. check re-balance status
439
  taosRLockLatch(&pTq->lock);
D
dapan1121 已提交
440 441
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
442
            consumerId, vgId, req.subKey, pHandle->consumerId);
D
dapan1121 已提交
443
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
444
    taosRUnLockLatch(&pTq->lock);
D
dapan1121 已提交
445 446
    return -1;
  }
447
  taosRUnLockLatch(&pTq->lock);
D
dapan1121 已提交
448 449

  // 3. update the epoch value
450
  taosWLockLatch(&pTq->lock);
D
dapan1121 已提交
451 452
  int32_t savedEpoch = pHandle->epoch;
  if (savedEpoch < reqEpoch) {
X
Xiaoyu Wang 已提交
453 454
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
            reqEpoch);
D
dapan1121 已提交
455 456
    pHandle->epoch = reqEpoch;
  }
457
  taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
458 459 460 461 462 463

  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);

464
  return tqExtractDataForMq(pTq, pHandle, &req, pMsg);
D
dapan1121 已提交
465 466
}

467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  // 2. check re-balance status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, vgId, req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
  }
  taosRUnLockLatch(&pTq->lock);

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

501 502
  int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);

503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526
  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);

  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
  if (pOffset != NULL) {
    if (pOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
    dataRsp.rspOffset.version = pOffset->val.version;
  } else {
    if (req.useSnapshot == true) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;

527 528 529 530
    if (reqOffset.type == TMQ_OFFSET__LOG) {
      dataRsp.rspOffset.version = currentVer; // return current consume offset value
    } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
      dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
531 532 533 534 535 536 537 538 539 540 541 542 543 544 545
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      dataRsp.rspOffset.version = ever;
    } else {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
              reqOffset.type);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }
  }

  tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
  return 0;
}

546
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
547
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
548
  int32_t vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
549

550
  tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
551
  int32_t code = 0;
wmmhello's avatar
wmmhello 已提交
552 553 554 555 556 557
//  taosWLockLatch(&pTq->lock);
//  int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
//  if (code != 0) {
//    tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
//  }
//  taosWUnLockLatch(&pTq->lock);
L
Liu Jicong 已提交
558

L
Liu Jicong 已提交
559 560
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
X
Xiaoyu Wang 已提交
561
    // walCloseRef(pHandle->pWalReader->pWal, pHandle->pRef->refId);
L
Liu Jicong 已提交
562 563 564
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
565 566 567 568 569 570

    while (tqIsHandleExecuting(pHandle)) {
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
      taosMsleep(5);
    }

L
Liu Jicong 已提交
571 572 573 574
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
575
  }
576

L
Liu Jicong 已提交
577 578
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
579
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
580
  }
L
Liu Jicong 已提交
581

L
Liu Jicong 已提交
582
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
583
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
584
  }
L
Liu Jicong 已提交
585
  return 0;
L
Liu Jicong 已提交
586 587
}

588
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
589 590
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
591
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
592
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
593 594 595 596
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
597 598 599 600 601
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
602 603 604 605 606 607
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

608
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
609 610 611 612 613 614 615 616 617 618 619
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

620
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
621
  int ret = 0;
L
Liu Jicong 已提交
622
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
623
  tDecodeSMqRebVgReq(msg, &req);
L
Liu Jicong 已提交
624

D
dapan1121 已提交
625 626 627
  SVnode* pVnode = pTq->pVnode;
  int32_t vgId = TD_VID(pVnode);

628
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
D
dapan1121 已提交
629
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
630

631
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
632
  if (pHandle == NULL) {
L
Liu Jicong 已提交
633
    if (req.oldConsumerId != -1) {
634
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
635
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
636
    }
D
dapan1121 已提交
637

L
Liu Jicong 已提交
638
    if (req.newConsumerId == -1) {
639
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
640
      goto end;
L
Liu Jicong 已提交
641
    }
D
dapan1121 已提交
642

L
Liu Jicong 已提交
643 644
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
645

H
Haojun Liao 已提交
646
    uint64_t oldConsumerId = pHandle->consumerId;
L
Liu Jicong 已提交
647 648 649
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
650

L
Liu Jicong 已提交
651
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
652
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
653

654
    // TODO version should be assigned and refed during preprocess
D
dapan1121 已提交
655
    SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
656
    if (pRef == NULL) {
657 658
      ret = -1;
      goto end;
659
    }
D
dapan1121 已提交
660

661 662
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
663

664
    SReadHandle handle = {
665
        .meta = pVnode->pMeta, .vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
wmmhello's avatar
wmmhello 已提交
666
    pHandle->snapshotVer = ver;
667

L
Liu Jicong 已提交
668
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
669
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
670
      req.qmsg = NULL;
671

X
Xiaoyu Wang 已提交
672 673
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId,
                                                          &pHandle->execHandle.numOfCols, req.newConsumerId);
L
Liu Jicong 已提交
674
      void* scanner = NULL;
675
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
676
      pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
L
Liu Jicong 已提交
677
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
D
dapan1121 已提交
678
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
679
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
D
dapan1121 已提交
680

L
Liu Jicong 已提交
681
      pHandle->execHandle.execDb.pFilterOutTbUid =
L
Liu Jicong 已提交
682
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
683 684
      buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
685

686
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
687
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
D
dapan1121 已提交
688
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
689 690
      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
691
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
D
dapan1121 已提交
692 693
      vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
694 695
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
D
dapan1121 已提交
696
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
L
Liu Jicong 已提交
697
      }
698
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
699
      tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
L
Liu Jicong 已提交
700
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
701

L
Liu Jicong 已提交
702 703
      buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
704
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
705
    }
H
Haojun Liao 已提交
706

707
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
dengyihao's avatar
dengyihao 已提交
708 709
    tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
            pHandle->consumerId, oldConsumerId);
710 711
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
712
  } else {
D
dapan1121 已提交
713 714 715
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
      atomic_add_fetch_32(&pHandle->epoch, 1);
716

717 718 719
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
720
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
wmmhello's avatar
wmmhello 已提交
721
      atomic_store_32(&pHandle->epoch, 0);
722 723 724 725 726 727
    }
    // kill executing task
    qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
    if (pTaskInfo != NULL) {
      qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
    }
D
dapan1121 已提交
728

729 730 731
    taosWLockLatch(&pTq->lock);
    // remove if it has been register in the push manager, and return one empty block to consumer
    tqUnregisterPushHandle(pTq, pHandle);
D
dapan1121 已提交
732

H
Haojun Liao 已提交
733

734 735
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
      qStreamCloseTsdbReader(pTaskInfo);
L
Liu Jicong 已提交
736
    }
737 738 739 740

    taosWUnLockLatch(&pTq->lock);
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
741
  }
L
Liu Jicong 已提交
742

743
end:
D
dapan1121 已提交
744
  taosMemoryFree(req.qmsg);
745
  return ret;
L
Liu Jicong 已提交
746
}
747

L
liuyao 已提交
748 749 750 751
void freePtr(void *ptr) {
  taosMemoryFree(*(void**)ptr);
}

752
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
D
dapan1121 已提交
753
  int32_t vgId = TD_VID(pTq->pVnode);
754
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
755
  pTask->refCnt = 1;
756
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
dengyihao's avatar
dengyihao 已提交
757 758
  pTask->inputQueue = streamQueueOpen(512 << 10);
  pTask->outputQueue = streamQueueOpen(512 << 10);
L
Liu Jicong 已提交
759 760

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
761
    return -1;
L
Liu Jicong 已提交
762 763
  }

L
Liu Jicong 已提交
764 765
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
766
  pTask->pMsgCb = &pTq->pVnode->msgCb;
767
  pTask->pMeta = pTq->pStreamMeta;
768
  pTask->chkInfo.version = ver;
769
  pTask->chkInfo.currentVer = ver;
770

771
  // expand executor
772
  pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL;
773

774
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
775
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
776 777 778 779
    if (pTask->pState == NULL) {
      return -1;
    }

780
    SReadHandle handle = {
781
        .meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
782

783 784
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
785 786
      return -1;
    }
787

788
    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
789
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
790
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
791 792 793
    if (pTask->pState == NULL) {
      return -1;
    }
794

795 796
    int32_t     numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo);
    SReadHandle mgHandle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};
797 798 799

    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
800 801
      return -1;
    }
802 803

    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
L
Liu Jicong 已提交
804
  }
L
Liu Jicong 已提交
805 806

  // sink
807
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
808
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
809
    pTask->smaSink.smaSink = smaHandleRes;
810
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
811
    pTask->tbSink.vnode = pTq->pVnode;
H
Haojun Liao 已提交
812
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
L
Liu Jicong 已提交
813

X
Xiaoyu Wang 已提交
814
    int32_t   ver1 = 1;
5
54liuyao 已提交
815
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
816
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
817
    if (code == TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
818
      ver1 = info.skmVer;
5
54liuyao 已提交
819
    }
L
Liu Jicong 已提交
820

821 822
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
823
    if (pTask->tbSink.pTSchema == NULL) {
D
dapan1121 已提交
824 825
      return -1;
    }
L
liuyao 已提交
826 827
    pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
    tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
L
Liu Jicong 已提交
828
  }
829

830
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
831
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
832 833
  }

834
  streamSetupTrigger(pTask);
835

836
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
837
         pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
838 839 840

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
841
  return 0;
L
Liu Jicong 已提交
842
}
L
Liu Jicong 已提交
843

844 845 846 847 848 849
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
  char*               msgStr = pMsg->pCont;
  char*               msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t             msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamTaskCheckReq req;
  SDecoder            decoder;
X
Xiaoyu Wang 已提交
850
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
851 852 853 854 855 856 857 858 859 860 861 862
  tDecodeSStreamTaskCheckReq(&decoder, &req);
  tDecoderClear(&decoder);
  int32_t             taskId = req.downstreamTaskId;
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
863

L
Liu Jicong 已提交
864
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
865

866
  if (pTask) {
867
    rsp.status = streamTaskCheckStatus(pTask);
868 869 870 871 872 873
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

    tqDebug("tq recv task check req(reqId:0x%" PRIx64
            ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d",
            rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus, rsp.upstreamTaskId,
            rsp.upstreamNodeId, rsp.status);
874 875
  } else {
    rsp.status = 0;
876 877 878 879
    tqDebug("tq recv task check(taskId:%d not built yet) req(reqId:0x%" PRIx64
            ") %d at node %d, check req from task %d at node %d, rsp status %d",
            taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            rsp.status);
880 881 882 883 884 885 886
  }

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
  tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
  if (code < 0) {
L
Liu Jicong 已提交
887
    tqError("unable to encode rsp %d", __LINE__);
L
Liu Jicong 已提交
888
    return -1;
889
  }
L
Liu Jicong 已提交
890

891 892 893 894 895 896 897 898
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
  tEncoderClear(&encoder);

899
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
900 901 902 903
  tmsgSendRsp(&rspMsg);
  return 0;
}

904
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
905 906 907 908 909 910 911 912 913 914 915
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

916
  tDecoderClear(&decoder);
917
  tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
918 919
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

L
Liu Jicong 已提交
920
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
921 922 923 924
  if (pTask == NULL) {
    return -1;
  }

925
  code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
L
Liu Jicong 已提交
926 927
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
928 929
}

930
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
931 932 933
  int32_t code = 0;
  int32_t vgId = TD_VID(pTq->pVnode);

5
54liuyao 已提交
934 935 936
  if (tsDisableStream) {
    return 0;
  }
937 938 939 940 941 942

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
943

944 945
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
946
  code = tDecodeStreamTask(&decoder, pTask);
947 948 949 950 951
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
952

953 954
  tDecoderClear(&decoder);

955
  // 2.save task, use the newest commit version as the initial start version of stream task.
956
  taosWLockLatch(&pTq->pStreamMeta->lock);
957
  code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
958
  if (code < 0) {
959
    tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr,
960
            streamMetaGetNumOfTasks(pTq->pStreamMeta));
961
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
962 963 964
    return -1;
  }

965 966
  taosWUnLockLatch(&pTq->pStreamMeta->lock);

967 968
  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
969
    streamTaskCheckDownstream(pTask, sversion);
970 971
  }

972 973
  tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr,
          pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta));
974 975 976
  return 0;
}

L
Liu Jicong 已提交
977 978 979 980 981
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
  int32_t code;
  char*   msg = pMsg->pCont;
  int32_t msgLen = pMsg->contLen;

982
  SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
L
Liu Jicong 已提交
983
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
984 985 986 987 988
  if (pTask == NULL) {
    return -1;
  }

  // check param
989
  int64_t fillVer1 = pTask->chkInfo.version;
990
  if (fillVer1 <= 0) {
L
Liu Jicong 已提交
991
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
992 993 994 995
    return -1;
  }

  // do recovery step 1
H
Haojun Liao 已提交
996
  tqDebug("s-task:%s start non-blocking recover stage(step 1) scan", pTask->id.idStr);
H
Haojun Liao 已提交
997
  int64_t st = taosGetTimestampMs();
998

H
Haojun Liao 已提交
999
  streamSourceRecoverScanStep1(pTask);
1000
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
H
Haojun Liao 已提交
1001 1002
    tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);

L
Liu Jicong 已提交
1003 1004 1005 1006
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

H
Haojun Liao 已提交
1007
  double el = (taosGetTimestampMs() - st) / 1000.0;
H
Haojun Liao 已提交
1008
  tqDebug("s-task:%s non-blocking recover stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1009

1010 1011 1012 1013
  // build msg to launch next step
  SStreamRecoverStep2Req req;
  code = streamBuildSourceRecover2Req(pTask, &req);
  if (code < 0) {
L
Liu Jicong 已提交
1014
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1015 1016 1017
    return -1;
  }

L
Liu Jicong 已提交
1018
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1019
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1020 1021 1022
    return 0;
  }

1023
  // serialize msg
L
Liu Jicong 已提交
1024 1025 1026 1027
  int32_t len = sizeof(SStreamRecoverStep1Req);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
H
Haojun Liao 已提交
1028
    tqError("s-task:%s failed to prepare the step2 stage, out of memory", pTask->id.idStr);
L
Liu Jicong 已提交
1029 1030 1031 1032
    return -1;
  }

  memcpy(serializedReq, &req, len);
1033 1034

  // dispatch msg
H
Haojun Liao 已提交
1035
  tqDebug("s-task:%s step 1 finished, send msg to start blocking recover stage(step 2)", pTask->id.idStr);
1036

H
Haojun Liao 已提交
1037 1038
  SRpcMsg rpcMsg = {
      .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
1039 1040 1041 1042
  tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
  return 0;
}

1043
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
H
Haojun Liao 已提交
1044 1045
  int32_t code = 0;

1046
  SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
H
Haojun Liao 已提交
1047 1048

  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1049 1050 1051 1052 1053
  if (pTask == NULL) {
    return -1;
  }

  // do recovery step 2
H
Haojun Liao 已提交
1054 1055 1056
  int64_t st = taosGetTimestampMs();
  tqDebug("s-task:%s start step2 recover, ts:%"PRId64, pTask->id.idStr, st);

1057
  code = streamSourceRecoverScanStep2(pTask, sversion);
1058
  if (code < 0) {
L
Liu Jicong 已提交
1059
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1060 1061 1062
    return -1;
  }

1063
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1064 1065 1066 1067
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1068 1069 1070
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1071
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1072 1073 1074 1075
    return -1;
  }

  // set status normal
H
Haojun Liao 已提交
1076
  tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr);
1077 1078
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1079
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1080 1081 1082
    return -1;
  }

H
Haojun Liao 已提交
1083
  double el = (taosGetTimestampMs() - st)/ 1000.0;
H
Haojun Liao 已提交
1084
  tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1085

1086 1087 1088
  // dispatch recover finish req to all related downstream task
  code = streamDispatchRecoverFinishReq(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1089
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1090 1091 1092
    return -1;
  }

L
Liu Jicong 已提交
1093 1094 1095
  atomic_store_8(&pTask->fillHistory, 0);
  streamMetaSaveTask(pTq->pStreamMeta, pTask);

L
Liu Jicong 已提交
1096
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1097 1098 1099
  return 0;
}

L
Liu Jicong 已提交
1100 1101 1102
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1103 1104

  // deserialize
1105 1106 1107
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1108
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1109 1110 1111
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

1112
  // find task
L
Liu Jicong 已提交
1113
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1114 1115 1116
  if (pTask == NULL) {
    return -1;
  }
1117
  // do process request
1118
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
1119
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1120 1121 1122
    return -1;
  }

L
Liu Jicong 已提交
1123
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1124
  return 0;
L
Liu Jicong 已提交
1125
}
L
Liu Jicong 已提交
1126

L
Liu Jicong 已提交
1127 1128 1129 1130 1131
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

L
Liu Jicong 已提交
1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
  bool        failed = false;
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    failed = true;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t sz = taosArrayGetSize(pRes->uidList);
L
Liu Jicong 已提交
1148
  if (sz == 0 || pRes->affectedRows == 0) {
L
Liu Jicong 已提交
1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159
    taosArrayDestroy(pRes->uidList);
    return 0;
  }
  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, sz);
  pDelBlock->info.rows = sz;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < sz; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
1160
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
L
Liu Jicong 已提交
1161
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
1162
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
L
Liu Jicong 已提交
1163 1164 1165
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
1166
    colDataSetVal(pUidCol, i, (const char*)pUid, false);
L
Liu Jicong 已提交
1167

1168 1169 1170
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
L
Liu Jicong 已提交
1171 1172
  }

L
Liu Jicong 已提交
1173 1174
  taosArrayDestroy(pRes->uidList);

L
Liu Jicong 已提交
1175 1176 1177
  int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
  *pRef = 1;

1178 1179
  taosWLockLatch(&pTq->pStreamMeta->lock);

L
Liu Jicong 已提交
1180 1181 1182
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
1183 1184 1185 1186
    if (pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
1187
    SStreamTask* pTask = *(SStreamTask**)pIter;
1188 1189 1190
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
      continue;
    }
L
Liu Jicong 已提交
1191

1192
    qDebug("s-task:%s delete req enqueue, ver: %" PRId64, pTask->id.idStr, ver);
L
Liu Jicong 已提交
1193

L
Liu Jicong 已提交
1194
    if (!failed) {
S
Shengliang Guan 已提交
1195
      SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1196 1197 1198 1199 1200
      pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
      pRefBlock->pBlock = pDelBlock;
      pRefBlock->dataRef = pRef;
      atomic_add_fetch_32(pRefBlock->dataRef, 1);

1201
      if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
L
Liu Jicong 已提交
1202
        atomic_sub_fetch_32(pRef, 1);
L
Liu Jicong 已提交
1203
        taosFreeQitem(pRefBlock);
L
Liu Jicong 已提交
1204 1205
        continue;
      }
L
Liu Jicong 已提交
1206

L
Liu Jicong 已提交
1207
      if (streamSchedExec(pTask) < 0) {
1208
        qError("s-task:%s stream task launch failed", pTask->id.idStr);
L
Liu Jicong 已提交
1209 1210
        continue;
      }
L
Liu Jicong 已提交
1211

L
Liu Jicong 已提交
1212 1213 1214 1215
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1216

1217 1218
  taosWUnLockLatch(&pTq->pStreamMeta->lock);

L
Liu Jicong 已提交
1219 1220
  int32_t ref = atomic_sub_fetch_32(pRef, 1);
  if (ref == 0) {
L
Liu Jicong 已提交
1221
    blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1222 1223 1224 1225
    taosMemoryFree(pRef);
  }

#if 0
S
Shengliang Guan 已提交
1226
    SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1227 1228 1229 1230 1231 1232 1233 1234
    pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
    pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
    SSDataBlock block = {0};
    assignOneDataBlock(&block, pDelBlock);
    block.info.type = STREAM_DELETE_DATA;
    taosArrayPush(pStreamBlock->blocks, &block);

    if (!failed) {
1235
      if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
1236
        qError("stream task input del failed, task id %d", pTask->id.taskId);
L
Liu Jicong 已提交
1237 1238 1239 1240
        continue;
      }

      if (streamSchedExec(pTask) < 0) {
1241
        qError("stream task launch failed, task id %d", pTask->id.taskId);
L
Liu Jicong 已提交
1242 1243 1244 1245 1246 1247
        continue;
      }
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1248
  blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1249
#endif
L
Liu Jicong 已提交
1250 1251 1252
  return 0;
}

1253 1254 1255 1256
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
  int32_t vgId = TD_VID(pTq->pVnode);

  taosWLockLatch(&pTq->lock);
1257 1258 1259 1260 1261

  if (taosHashGetSize(pTq->pPushMgr) > 0) {
    void* pIter = taosHashIterate(pTq->pPushMgr, NULL);

    while (pIter) {
wmmhello's avatar
wmmhello 已提交
1262
      STqHandle* pHandle = *(STqHandle**)pIter;
1263 1264 1265
      tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);

      if (ASSERT(pHandle->msg != NULL)) {
wmmhello's avatar
wmmhello 已提交
1266 1267 1268 1269 1270 1271 1272 1273
        tqError("pHandle->msg should not be null");
        break;
      }else{
        SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
        tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
        taosMemoryFree(pHandle->msg);
        pHandle->msg = NULL;
      }
1274

wmmhello's avatar
wmmhello 已提交
1275
      pIter = taosHashIterate(pTq->pPushMgr, pIter);
1276
    }
1277

wmmhello's avatar
wmmhello 已提交
1278
    taosHashClear(pTq->pPushMgr);
1279
  }
1280

1281 1282
  // unlock
  taosWUnLockLatch(&pTq->lock);
1283
  return 0;
L
Liu Jicong 已提交
1284 1285
}

L
Liu Jicong 已提交
1286 1287
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1288 1289 1290 1291

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1292 1293
  if (taskId == WAL_READ_TASKS_ID) {  // all tasks are extracted submit data from the wal
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1294
    return 0;
1295
  }
1296

1297
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1298 1299 1300 1301 1302
  if (pTask != NULL) {
    if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
      tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId,
              pTask->id.idStr, pTask->chkInfo.version);
      streamProcessRunReq(pTask);
1303
    } else {
1304
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
1305
    }
1306

L
Liu Jicong 已提交
1307
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1308
    tqStartStreamTasks(pTq);
L
Liu Jicong 已提交
1309
    return 0;
1310
  } else {
1311
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
1312
    return -1;
L
Liu Jicong 已提交
1313
  }
L
Liu Jicong 已提交
1314 1315
}

L
Liu Jicong 已提交
1316
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
1317 1318 1319 1320 1321
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamDispatchReq req;
  SDecoder           decoder;
L
Liu Jicong 已提交
1322
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1323
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1324

1325
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1326
  if (pTask) {
1327
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
1328
    streamProcessDispatchReq(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1329
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1330
    return 0;
1331
  } else {
L
liuyao 已提交
1332
    tDeleteStreamDispatchReq(&req);
1333
    return -1;
L
Liu Jicong 已提交
1334
  }
L
Liu Jicong 已提交
1335 1336
}

L
Liu Jicong 已提交
1337 1338
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1339
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1340
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1341
  tqDebug("recv dispatch rsp, code:%x", pMsg->code);
L
Liu Jicong 已提交
1342
  if (pTask) {
1343
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1344
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1345
    return 0;
1346 1347
  } else {
    return -1;
L
Liu Jicong 已提交
1348
  }
L
Liu Jicong 已提交
1349
}
L
Liu Jicong 已提交
1350

1351
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1352
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1353
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1354
  return 0;
L
Liu Jicong 已提交
1355
}
L
Liu Jicong 已提交
1356

5
54liuyao 已提交
1357 1358
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
L
liuyao 已提交
1359 1360 1361
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask) {
    tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
L
liuyao 已提交
1362
    atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
L
liuyao 已提交
1363 1364 1365
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  }
5
54liuyao 已提交
1366 1367 1368 1369 1370
  return 0;
}

int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
L
liuyao 已提交
1371 1372
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask) {
L
liuyao 已提交
1373
    atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384

    // no lock needs to secure the access of the version
    if (pReq->igUntreated) {  // discard all the data  when the stream task is suspended.
      pTask->chkInfo.currentVer = sversion;
      tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId,
              pTask->id.idStr, pTask->chkInfo.currentVer, sversion);
    } else {  // from the previous paused version and go on
      tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId,
              pTask->id.idStr, pTask->chkInfo.currentVer, sversion);
    }

L
liuyao 已提交
1385
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
5
54liuyao 已提交
1386
    tqStartStreamTasks(pTq);
L
liuyao 已提交
1387
  }
1388

5
54liuyao 已提交
1389 1390 1391
  return 0;
}

L
Liu Jicong 已提交
1392 1393 1394 1395 1396 1397
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1398
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1399
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1400
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1401
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1402
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1403
  if (pTask) {
1404
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
1405
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1406
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1407
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1408
    return 0;
L
Liu Jicong 已提交
1409
  } else {
L
liuyao 已提交
1410
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1411
    return -1;
L
Liu Jicong 已提交
1412 1413 1414 1415 1416 1417 1418
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1419

1420 1421 1422 1423 1424 1425
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1426 1427 1428

  SStreamDispatchReq req;
  SDecoder           decoder;
1429
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1430 1431
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1432
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1433 1434
    goto FAIL;
  }
L
Liu Jicong 已提交
1435
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1436

L
Liu Jicong 已提交
1437
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1438

L
Liu Jicong 已提交
1439
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1440
  if (pTask) {
1441
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
1442
    streamProcessDispatchReq(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1443
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1444 1445
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1446
    return 0;
5
54liuyao 已提交
1447 1448
  } else {
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1449
  }
L
Liu Jicong 已提交
1450

1451 1452
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1453
FAIL:
1454 1455 1456 1457
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1458
    SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info};
1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1475
  SRpcMsg rsp = {
1476
      .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
1477
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1478
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1479 1480
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1481
  return -1;
L
Liu Jicong 已提交
1482
}
L
Liu Jicong 已提交
1483

1484
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1485

1486
int32_t tqStartStreamTasks(STQ* pTq) {
1487
  int32_t      vgId = TD_VID(pTq->pVnode);
1488
  SStreamMeta* pMeta = pTq->pStreamMeta;
1489

1490
  taosWLockLatch(&pMeta->lock);
1491

1492
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
1493
  if (numOfTasks == 0) {
1494
    tqInfo("vgId:%d no stream tasks exist", vgId);
1495 1496 1497 1498
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1499
  pMeta->walScanCounter += 1;
1500

1501 1502
  if (pMeta->walScanCounter > 1) {
    tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
1503 1504 1505 1506
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1507 1508 1509
  SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
  if (pRunReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1510
    tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
1511
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
1512 1513 1514
    return -1;
  }

H
Haojun Liao 已提交
1515
  tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks);
1516 1517
  pRunReq->head.vgId = vgId;
  pRunReq->streamId = 0;
1518
  pRunReq->taskId = WAL_READ_TASKS_ID;
1519 1520 1521

  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
  tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
1522
  taosWUnLockLatch(&pTq->pStreamMeta->lock);
1523 1524 1525

  return 0;
}