tq.c 48.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
#define WAL_READ_TASKS_ID (-1)
22

23
static int32_t tqInitialize(STQ* pTq);
dengyihao's avatar
dengyihao 已提交
24

wmmhello's avatar
wmmhello 已提交
25 26 27
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_EXEC;}
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_IDLE;}
wmmhello's avatar
wmmhello 已提交
28

L
Liu Jicong 已提交
29
int32_t tqInit() {
L
Liu Jicong 已提交
30 31 32 33 34 35
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

36 37 38 39 40 41
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
42 43 44
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
45
    atomic_store_8(&tqMgmt.inited, 1);
46
  }
47

L
Liu Jicong 已提交
48 49
  return 0;
}
L
Liu Jicong 已提交
50

51
void tqCleanUp() {
L
Liu Jicong 已提交
52 53 54 55 56 57 58 59
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
60
    streamCleanUp();
L
Liu Jicong 已提交
61 62
    atomic_store_8(&tqMgmt.inited, 0);
  }
63
}
L
Liu Jicong 已提交
64

65
static void destroyTqHandle(void* data) {
66 67
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
wmmhello's avatar
wmmhello 已提交
68

69
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
70
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
71
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
72
    tqReaderClose(pData->execHandle.pTqReader);
73 74
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
75
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
76
    walCloseReader(pData->pWalReader);
77
    tqReaderClose(pData->execHandle.pTqReader);
78
  }
79 80 81 82
  if(pData->msg != NULL) {
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
D
dapan1121 已提交
83
  }
L
Liu Jicong 已提交
84 85
}

86 87 88 89 90
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
91
STQ* tqOpen(const char* path, SVnode* pVnode) {
92
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
93
  if (pTq == NULL) {
S
Shengliang Guan 已提交
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
95 96
    return NULL;
  }
97

98
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
99
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
100
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
101

102
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
103
  taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
104

105
  taosInitRWLatch(&pTq->lock);
106
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
107

108
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
109
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
110

111 112 113 114 115 116 117
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
118 119 120
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
121
  if (tqMetaOpen(pTq) < 0) {
122
    return -1;
123 124
  }

L
Liu Jicong 已提交
125 126
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
127
    return -1;
128 129
  }

130
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
131
  if (pTq->pStreamMeta == NULL) {
132
    return -1;
L
Liu Jicong 已提交
133 134
  }

135 136
  // the version is kept in task's meta data
  // todo check if this version is required or not
137 138
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
139 140
  }

141
  return 0;
L
Liu Jicong 已提交
142
}
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
void tqClose(STQ* pTq) {
145 146
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
147
  }
148 149 150 151 152 153 154 155 156

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
157
}
L
Liu Jicong 已提交
158

H
Haojun Liao 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
      tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
172 173 174
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
175
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
176
      int64_t el = taosGetTimestampMs() - st;
H
Haojun Liao 已提交
177
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
178 179 180 181 182 183
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
  }
}

D
dapan1121 已提交
184 185
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
186 187
  int32_t len = 0;
  int32_t code = 0;
D
dapan1121 已提交
188 189

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
190
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
D
dapan1121 已提交
191 192 193
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
194 195 196 197 198 199 200 201 202 203 204

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

D
dapan1121 已提交
205 206 207
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
208 209 210 211 212 213

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);

D
dapan1121 已提交
214
  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
215
    tEncodeMqDataRsp(&encoder, pRsp);
D
dapan1121 已提交
216
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
X
Xiaoyu Wang 已提交
217
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
dengyihao's avatar
dengyihao 已提交
218
  }
L
Liu Jicong 已提交
219

wmmhello's avatar
wmmhello 已提交
220
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
221 222

  SRpcMsg rsp = {
D
dapan1121 已提交
223
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
224 225 226 227
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
L
Liu Jicong 已提交
228

L
Liu Jicong 已提交
229
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
230 231
  return 0;
}
L
Liu Jicong 已提交
232

H
Haojun Liao 已提交
233
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
234 235 236 237
  SMqDataRsp dataRsp = {0};
  dataRsp.head.consumerId = pHandle->consumerId;
  dataRsp.head.epoch = pHandle->epoch;
  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
238 239

  int64_t sver = 0, ever = 0;
H
Haojun Liao 已提交
240 241
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
  tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, ever);
D
dapan1121 已提交
242 243 244

  char buf1[80] = {0};
  char buf2[80] = {0};
245 246
  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
D
dapan1121 已提交
247
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
H
Haojun Liao 已提交
248
          vgId, dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
L
Liu Jicong 已提交
249 250 251
  return 0;
}

252 253 254 255 256 257
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
                      int32_t type, int32_t vgId) {
  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
258

D
dapan1121 已提交
259 260 261 262
  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);
263

X
Xiaoyu Wang 已提交
264
  tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
265
          vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
266 267 268 269

  return 0;
}

270
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
271 272
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
273

X
Xiaoyu Wang 已提交
274 275
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
276
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
277 278
    return -1;
  }
279

280 281
  tDecoderClear(&decoder);

282 283 284
  STqOffset* pOffset = &vgOffset.offset;

  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
285
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
286 287 288 289 290 291
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
            pOffset->val.version);
    if (pOffset->val.version + 1 == sversion) {
      pOffset->val.version += 1;
292
    }
293
  } else {
294
    tqError("invalid commit offset type:%d", pOffset->val.type);
295
    return -1;
296
  }
297

298 299
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
300
    tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
301
            vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
302
    return 0;  // no need to update the offset value
303 304
  }

305
  // save the new offset value
306
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
307
    return -1;
308
  }
309

310 311 312
  return 0;
}

313
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
314 315
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
316 317 318

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
319
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
H
Haojun Liao 已提交
320
    tqError("vgId:%d failed to decode seek msg", vgId);
321 322 323 324 325
    return -1;
  }

  tDecoderClear(&decoder);

H
Haojun Liao 已提交
326 327 328
  tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
          vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);

329 330 331
  STqOffset* pOffset = &vgOffset.offset;
  if (pOffset->val.type != TMQ_OFFSET__LOG) {
    tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
332 333 334
    return -1;
  }

335 336 337 338 339 340
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
  if (pHandle == NULL) {
    tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId,
        pOffset->subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
341 342
  }

343 344 345 346 347 348 349 350
  // 2. check consumer-vg assignment status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != vgOffset.consumerId) {
    tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
351
  }
352 353 354 355 356 357 358 359 360
  taosRUnLockLatch(&pTq->lock);

  //3. check the offset info
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL) {
    if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
      return 0;  // no need to update the offset value
    }
361

362 363 364 365 366 367
    if (pSavedOffset->val.version == pOffset->val.version) {
      tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
              pOffset->val.version, pSavedOffset->val.version);
      return 0;
    }
  }
368 369 370

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
371 372 373 374
  if (pOffset->val.version < sver) {
    pOffset->val.version = sver;
  } else if (pOffset->val.version > ever) {
    pOffset->val.version = ever;
375 376 377
  }

  // save the new offset value
378 379 380 381 382 383
  if (pSavedOffset != NULL) {
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
            pSavedOffset->val.version);
  } else {
    tqDebug("vgId:%d sub:%s seek to:%"PRId64" not saved yet", vgId, pOffset->subKey, pOffset->val.version);
  }
384

385 386
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
    tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
387 388 389
    return -1;
  }

H
Haojun Liao 已提交
390 391 392
  tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
          vgOffset.consumerId, vgOffset.offset.val.version);

393 394 395
  return 0;
}

L
Liu Jicong 已提交
396
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
397
  void* pIter = NULL;
398

L
Liu Jicong 已提交
399
  while (1) {
400
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
401 402 403 404
    if (pIter == NULL) {
      break;
    }

405
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
406

L
Liu Jicong 已提交
407 408
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
409
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
410 411
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
412
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
413 414 415 416 417
          return -1;
        }
      }
    }
  }
418

L
Liu Jicong 已提交
419 420 421
  return 0;
}

D
dapan1121 已提交
422
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
423
  SMqPollReq req = {0};
D
dapan1121 已提交
424 425 426 427 428 429 430 431 432 433 434
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

wmmhello's avatar
wmmhello 已提交
435
  taosWLockLatch(&pTq->lock);
D
dapan1121 已提交
436 437 438 439 440
  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
wmmhello's avatar
wmmhello 已提交
441
    taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
442 443 444
    return -1;
  }

wmmhello's avatar
wmmhello 已提交
445 446 447 448 449
  while (tqIsHandleExec(pHandle)) {
    tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", consumerId, vgId, req.subKey);
    taosMsleep(5);
  }

D
dapan1121 已提交
450 451 452
  // 2. check re-balance status
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
453
            consumerId, vgId, req.subKey, pHandle->consumerId);
D
dapan1121 已提交
454
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
455
    taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
456 457
    return -1;
  }
458 459
  tqSetHandleExec(pHandle);
  taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
460 461 462 463

  // 3. update the epoch value
  int32_t savedEpoch = pHandle->epoch;
  if (savedEpoch < reqEpoch) {
X
Xiaoyu Wang 已提交
464 465
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
            reqEpoch);
D
dapan1121 已提交
466 467 468 469 470 471 472 473
    pHandle->epoch = reqEpoch;
  }

  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);

474 475 476
  int code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
  tqSetHandleIdle(pHandle);
  return code;
D
dapan1121 已提交
477 478
}

479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  // 2. check re-balance status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, vgId, req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
  }
  taosRUnLockLatch(&pTq->lock);

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

513 514
  int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);

515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);

  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
  if (pOffset != NULL) {
    if (pOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
    dataRsp.rspOffset.version = pOffset->val.version;
  } else {
    if (req.useSnapshot == true) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;

539 540 541 542
    if (reqOffset.type == TMQ_OFFSET__LOG) {
      dataRsp.rspOffset.version = currentVer; // return current consume offset value
    } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
      dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
543 544 545 546 547 548 549 550 551 552 553 554 555 556 557
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      dataRsp.rspOffset.version = ever;
    } else {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
              reqOffset.type);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }
  }

  tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
  return 0;
}

558
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
559
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
560
  int32_t vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
561

562
  tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
563
  int32_t code = 0;
L
Liu Jicong 已提交
564

wmmhello's avatar
wmmhello 已提交
565
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
566 567
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
568
    while (tqIsHandleExec(pHandle)) {
569 570 571
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
      taosMsleep(5);
    }
572

L
Liu Jicong 已提交
573 574 575
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
576

L
Liu Jicong 已提交
577 578 579 580
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
581
  }
582

L
Liu Jicong 已提交
583 584
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
585
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
586
  }
L
Liu Jicong 已提交
587

L
Liu Jicong 已提交
588
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
589
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
590
  }
wmmhello's avatar
wmmhello 已提交
591 592
  taosWUnLockLatch(&pTq->lock);

L
Liu Jicong 已提交
593
  return 0;
L
Liu Jicong 已提交
594 595
}

596
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
597 598
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
599
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
600
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
601 602 603 604
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
605 606 607 608 609
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
610 611 612 613 614 615
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

616
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
617 618 619 620 621 622 623 624 625 626 627
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

628
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
629
  int ret = 0;
L
Liu Jicong 已提交
630
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
631
  tDecodeSMqRebVgReq(msg, &req);
L
Liu Jicong 已提交
632

D
dapan1121 已提交
633 634 635
  SVnode* pVnode = pTq->pVnode;
  int32_t vgId = TD_VID(pVnode);

636
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
D
dapan1121 已提交
637
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
638

wmmhello's avatar
wmmhello 已提交
639
  taosWLockLatch(&pTq->lock);
640
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
641
  if (pHandle == NULL) {
L
Liu Jicong 已提交
642
    if (req.oldConsumerId != -1) {
643
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
644
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
645
    }
D
dapan1121 已提交
646

L
Liu Jicong 已提交
647
    if (req.newConsumerId == -1) {
648
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
649
      goto end;
L
Liu Jicong 已提交
650
    }
D
dapan1121 已提交
651

L
Liu Jicong 已提交
652 653
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
654

H
Haojun Liao 已提交
655
    uint64_t oldConsumerId = pHandle->consumerId;
L
Liu Jicong 已提交
656 657 658
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
659

L
Liu Jicong 已提交
660
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
661
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
662

663
    // TODO version should be assigned and refed during preprocess
D
dapan1121 已提交
664
    SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
665
    if (pRef == NULL) {
666 667
      ret = -1;
      goto end;
668
    }
D
dapan1121 已提交
669

670 671
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
672

673
    SReadHandle handle = {.vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
H
Haojun Liao 已提交
674 675
    initStorageAPI(&handle.api);

wmmhello's avatar
wmmhello 已提交
676
    pHandle->snapshotVer = ver;
677

L
Liu Jicong 已提交
678
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
679
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
680
      req.qmsg = NULL;
681

X
Xiaoyu Wang 已提交
682 683
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId,
                                                          &pHandle->execHandle.numOfCols, req.newConsumerId);
L
Liu Jicong 已提交
684
      void* scanner = NULL;
685
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
686
      pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
L
Liu Jicong 已提交
687
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
D
dapan1121 已提交
688
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
689
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
D
dapan1121 已提交
690

L
Liu Jicong 已提交
691
      pHandle->execHandle.execDb.pFilterOutTbUid =
wmmhello's avatar
wmmhello 已提交
692
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
693
      buildSnapContext(handle.vnode, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
694
                       (SSnapContext**)(&handle.sContext));
695

696
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
697
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
D
dapan1121 已提交
698
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
699 700
      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
701
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
D
dapan1121 已提交
702 703
      vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
704 705
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
D
dapan1121 已提交
706
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
L
Liu Jicong 已提交
707
      }
708
      pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
709
      tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
L
Liu Jicong 已提交
710
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
711

712
      buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
L
Liu Jicong 已提交
713
                       (SSnapContext**)(&handle.sContext));
714
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
715
    }
H
Haojun Liao 已提交
716

717
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
dengyihao's avatar
dengyihao 已提交
718 719
    tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
            pHandle->consumerId, oldConsumerId);
720 721
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
722
  } else {
wmmhello's avatar
wmmhello 已提交
723 724 725 726 727
    while (tqIsHandleExec(pHandle)) {
      tqDebug("sub req vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
      taosMsleep(5);
    }

D
dapan1121 已提交
728 729 730
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
      atomic_add_fetch_32(&pHandle->epoch, 1);
731

732 733 734
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
735
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
wmmhello's avatar
wmmhello 已提交
736
      atomic_store_32(&pHandle->epoch, 0);
737 738 739 740 741 742 743 744
    }
    // kill executing task
    qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
    if (pTaskInfo != NULL) {
      qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
    }
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
      qStreamCloseTsdbReader(pTaskInfo);
L
Liu Jicong 已提交
745
    }
wmmhello's avatar
wmmhello 已提交
746 747
    // remove if it has been register in the push manager, and return one empty block to consumer
    tqUnregisterPushHandle(pTq, pHandle);
748 749
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
    goto end;
L
Liu Jicong 已提交
750
  }
L
Liu Jicong 已提交
751

752
end:
wmmhello's avatar
wmmhello 已提交
753
  taosWUnLockLatch(&pTq->lock);
D
dapan1121 已提交
754
  taosMemoryFree(req.qmsg);
755
  return ret;
L
Liu Jicong 已提交
756
}
757

L
liuyao 已提交
758 759 760 761
void freePtr(void *ptr) {
  taosMemoryFree(*(void**)ptr);
}

762
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
D
dapan1121 已提交
763
  int32_t vgId = TD_VID(pTq->pVnode);
764

765
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
766
  pTask->refCnt = 1;
767
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
dengyihao's avatar
dengyihao 已提交
768 769
  pTask->inputQueue = streamQueueOpen(512 << 10);
  pTask->outputQueue = streamQueueOpen(512 << 10);
L
Liu Jicong 已提交
770 771

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
772
    return -1;
L
Liu Jicong 已提交
773 774
  }

L
Liu Jicong 已提交
775 776
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
777
  pTask->pMsgCb = &pTq->pVnode->msgCb;
778
  pTask->pMeta = pTq->pStreamMeta;
779
  pTask->chkInfo.version = ver;
780
  pTask->chkInfo.currentVer = ver;
781

782
  // expand executor
783
  pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL;
784

785
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
786
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
787 788 789 790
    if (pTask->pState == NULL) {
      return -1;
    }

791
    SReadHandle handle = {.vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
792
    initStorageAPI(&handle.api);
793

794 795
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
796 797
      return -1;
    }
798

799
    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
800
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
801
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
802 803 804
    if (pTask->pState == NULL) {
      return -1;
    }
805

806 807
    int32_t     numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo);
    SReadHandle mgHandle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};
808 809 810

    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
811 812
      return -1;
    }
813 814

    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
L
Liu Jicong 已提交
815
  }
L
Liu Jicong 已提交
816 817

  // sink
818
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
819
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
820
    pTask->smaSink.smaSink = smaHandleRes;
821
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
822
    pTask->tbSink.vnode = pTq->pVnode;
H
Haojun Liao 已提交
823
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
L
Liu Jicong 已提交
824

X
Xiaoyu Wang 已提交
825
    int32_t   ver1 = 1;
5
54liuyao 已提交
826
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
827
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
828
    if (code == TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
829
      ver1 = info.skmVer;
5
54liuyao 已提交
830
    }
L
Liu Jicong 已提交
831

832 833
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
834
    if (pTask->tbSink.pTSchema == NULL) {
D
dapan1121 已提交
835 836
      return -1;
    }
L
liuyao 已提交
837 838
    pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
    tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
L
Liu Jicong 已提交
839
  }
840

841
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
842
    SWalFilterCond cond = {.deleteMsg = 1};  // delete msg also extract from wal files
843
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
844 845
  }

846
  streamSetupTrigger(pTask);
847

848
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
849
         pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
850 851 852

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
853
  return 0;
L
Liu Jicong 已提交
854
}
L
Liu Jicong 已提交
855

856
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
857 858 859 860
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

861 862
  SStreamTaskCheckReq req;
  SDecoder            decoder;
863

X
Xiaoyu Wang 已提交
864
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
865 866
  tDecodeSStreamTaskCheckReq(&decoder, &req);
  tDecoderClear(&decoder);
867

868 869 870 871 872 873 874 875 876 877
  int32_t             taskId = req.downstreamTaskId;
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
878

L
Liu Jicong 已提交
879
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
880

881
  if (pTask != NULL) {
882
    rsp.status = streamTaskCheckStatus(pTask);
883 884
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

885
    tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64
886
            ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d",
887 888
            pTask->id.idStr, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus,
            rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
889 890
  } else {
    rsp.status = 0;
891 892
    tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
            ") %d at node %d, check req from task:0x%x at node %d, rsp status %d",
893 894
            taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            rsp.status);
895 896 897 898 899
  }

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
900

901 902
  tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
  if (code < 0) {
903
    tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId);
L
Liu Jicong 已提交
904
    return -1;
905
  }
L
Liu Jicong 已提交
906

907 908 909 910 911 912 913 914
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
  tEncoderClear(&encoder);

915
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
916

917 918 919 920
  tmsgSendRsp(&rspMsg);
  return 0;
}

921
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
922 923 924 925 926 927
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
928

929 930 931 932 933
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

934
  tDecoderClear(&decoder);
935
  tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task:0x%x at node %d, status %d",
936 937
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

L
Liu Jicong 已提交
938
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
939
  if (pTask == NULL) {
940 941
    tqError("tq failed to locate the stream task:0x%x vgId:%d, it may have been destroyed", rsp.upstreamTaskId,
            pTq->pStreamMeta->vgId);
942 943 944
    return -1;
  }

945
  code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
L
Liu Jicong 已提交
946 947
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
948 949
}

950
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
951 952 953
  int32_t code = 0;
  int32_t vgId = TD_VID(pTq->pVnode);

5
54liuyao 已提交
954 955 956
  if (tsDisableStream) {
    return 0;
  }
957 958 959 960

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
961 962
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, (int32_t) sizeof(SStreamTask));
963 964
    return -1;
  }
965

966 967
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
968
  code = tDecodeStreamTask(&decoder, pTask);
969 970 971 972 973
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
974

975 976
  tDecoderClear(&decoder);

977
  // 2.save task, use the newest commit version as the initial start version of stream task.
978
  taosWLockLatch(&pTq->pStreamMeta->lock);
979
  code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
980
  int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
981
  if (code < 0) {
982
    tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
983
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
984 985 986
    return -1;
  }

987 988
  taosWUnLockLatch(&pTq->pStreamMeta->lock);

989 990
  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
991
    streamTaskCheckDownstream(pTask, sversion);
992 993
  }

994
  tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr,
995
          pTask->status.taskStatus, numOfTasks);
996 997 998
  return 0;
}

L
Liu Jicong 已提交
999 1000 1001 1002 1003
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
  int32_t code;
  char*   msg = pMsg->pCont;
  int32_t msgLen = pMsg->contLen;

1004
  SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
L
Liu Jicong 已提交
1005
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1006 1007 1008 1009 1010
  if (pTask == NULL) {
    return -1;
  }

  // check param
1011
  int64_t fillVer1 = pTask->chkInfo.version;
1012
  if (fillVer1 <= 0) {
L
Liu Jicong 已提交
1013
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1014 1015 1016 1017
    return -1;
  }

  // do recovery step 1
H
Haojun Liao 已提交
1018
  tqDebug("s-task:%s start non-blocking recover stage(step 1) scan", pTask->id.idStr);
H
Haojun Liao 已提交
1019
  int64_t st = taosGetTimestampMs();
1020

H
Haojun Liao 已提交
1021
  streamSourceRecoverScanStep1(pTask);
1022
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
H
Haojun Liao 已提交
1023 1024
    tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);

L
Liu Jicong 已提交
1025 1026 1027 1028
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

H
Haojun Liao 已提交
1029
  double el = (taosGetTimestampMs() - st) / 1000.0;
H
Haojun Liao 已提交
1030
  tqDebug("s-task:%s non-blocking recover stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1031

1032 1033 1034 1035
  // build msg to launch next step
  SStreamRecoverStep2Req req;
  code = streamBuildSourceRecover2Req(pTask, &req);
  if (code < 0) {
L
Liu Jicong 已提交
1036
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1037 1038 1039
    return -1;
  }

L
Liu Jicong 已提交
1040
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1041
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1042 1043 1044
    return 0;
  }

1045
  // serialize msg
L
Liu Jicong 已提交
1046 1047 1048 1049
  int32_t len = sizeof(SStreamRecoverStep1Req);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
H
Haojun Liao 已提交
1050
    tqError("s-task:%s failed to prepare the step2 stage, out of memory", pTask->id.idStr);
L
Liu Jicong 已提交
1051 1052 1053 1054
    return -1;
  }

  memcpy(serializedReq, &req, len);
1055 1056

  // dispatch msg
H
Haojun Liao 已提交
1057
  tqDebug("s-task:%s start recover block stage", pTask->id.idStr);
1058

H
Haojun Liao 已提交
1059 1060
  SRpcMsg rpcMsg = {
      .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
1061 1062 1063 1064
  tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
  return 0;
}

1065
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
H
Haojun Liao 已提交
1066 1067
  int32_t code = 0;

1068
  SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
H
Haojun Liao 已提交
1069 1070

  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1071 1072 1073 1074 1075
  if (pTask == NULL) {
    return -1;
  }

  // do recovery step 2
H
Haojun Liao 已提交
1076 1077 1078
  int64_t st = taosGetTimestampMs();
  tqDebug("s-task:%s start step2 recover, ts:%"PRId64, pTask->id.idStr, st);

1079
  code = streamSourceRecoverScanStep2(pTask, sversion);
1080
  if (code < 0) {
L
Liu Jicong 已提交
1081
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1082 1083 1084
    return -1;
  }

1085 1086 1087
  qDebug("s-task:%s set the start wal offset to be:%"PRId64, pTask->id.idStr, sversion);
  walReaderSeekVer(pTask->exec.pWalReader, sversion);

1088
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1089 1090 1091 1092
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1093 1094 1095
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1096
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1097 1098 1099 1100
    return -1;
  }

  // set status normal
H
Haojun Liao 已提交
1101
  tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr);
1102 1103
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1104
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1105 1106 1107
    return -1;
  }

H
Haojun Liao 已提交
1108
  double el = (taosGetTimestampMs() - st)/ 1000.0;
H
Haojun Liao 已提交
1109
  tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1110

1111 1112 1113
  // dispatch recover finish req to all related downstream task
  code = streamDispatchRecoverFinishReq(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1114
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1115 1116 1117
    return -1;
  }

L
Liu Jicong 已提交
1118 1119 1120
  atomic_store_8(&pTask->fillHistory, 0);
  streamMetaSaveTask(pTq->pStreamMeta, pTask);

L
Liu Jicong 已提交
1121
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1122 1123 1124
  return 0;
}

L
Liu Jicong 已提交
1125 1126 1127
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1128 1129

  // deserialize
1130 1131 1132
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1133
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1134 1135 1136
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

1137
  // find task
L
Liu Jicong 已提交
1138
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1139 1140 1141
  if (pTask == NULL) {
    return -1;
  }
1142
  // do process request
1143
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
1144
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1145 1146 1147
    return -1;
  }

L
Liu Jicong 已提交
1148
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1149
  return 0;
L
Liu Jicong 已提交
1150
}
L
Liu Jicong 已提交
1151

L
Liu Jicong 已提交
1152 1153 1154 1155 1156
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

1157 1158 1159 1160
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

H
Haojun Liao 已提交
1161 1162
  *pRefBlock = NULL;

1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tDecoderInit(pCoder, (uint8_t*)pData, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
  if (numOfTables == 0 || pRes->affectedRows == 0) {
    taosArrayDestroy(pRes->uidList);
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, numOfTables);
  pDelBlock->info.rows = numOfTables;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < numOfTables; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataSetVal(pUidCol, i, (const char*)pUid, false);

    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

  taosArrayDestroy(pRes->uidList);
  *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
  if (pRefBlock == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
  (*pRefBlock)->pBlock = pDelBlock;
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1210 1211
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1212 1213 1214 1215

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1216 1217
  if (taskId == WAL_READ_TASKS_ID) {  // all tasks are extracted submit data from the wal
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1218
    return 0;
1219
  }
1220

1221
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1222 1223 1224 1225 1226
  if (pTask != NULL) {
    if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
      tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId,
              pTask->id.idStr, pTask->chkInfo.version);
      streamProcessRunReq(pTask);
1227
    } else {
1228
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
1229
    }
1230

L
Liu Jicong 已提交
1231
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1232
    tqStartStreamTasks(pTq);
L
Liu Jicong 已提交
1233
    return 0;
1234
  } else {
1235
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
1236
    return -1;
L
Liu Jicong 已提交
1237
  }
L
Liu Jicong 已提交
1238 1239
}

L
Liu Jicong 已提交
1240
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
1241 1242 1243
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
1244 1245 1246 1247

  SStreamDispatchReq req = {0};

  SDecoder decoder;
L
Liu Jicong 已提交
1248
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1249
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1250

1251
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1252
  if (pTask) {
1253
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1254
    streamProcessDispatchMsg(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1255
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1256
    return 0;
1257
  } else {
L
liuyao 已提交
1258
    tDeleteStreamDispatchReq(&req);
1259
    return -1;
L
Liu Jicong 已提交
1260
  }
L
Liu Jicong 已提交
1261 1262
}

L
Liu Jicong 已提交
1263 1264
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1265
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1266
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1267
  tqDebug("recv dispatch rsp, code:%x", pMsg->code);
L
Liu Jicong 已提交
1268
  if (pTask) {
1269
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1270
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1271
    return 0;
1272 1273
  } else {
    return -1;
L
Liu Jicong 已提交
1274
  }
L
Liu Jicong 已提交
1275
}
L
Liu Jicong 已提交
1276

1277
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1278
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1279
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1280
  return 0;
L
Liu Jicong 已提交
1281
}
L
Liu Jicong 已提交
1282

5
54liuyao 已提交
1283 1284
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
L
liuyao 已提交
1285 1286 1287
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask) {
    tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
L
liuyao 已提交
1288
    atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
L
liuyao 已提交
1289 1290 1291
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  }
5
54liuyao 已提交
1292 1293 1294 1295 1296
  return 0;
}

int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
L
liuyao 已提交
1297 1298
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask) {
L
liuyao 已提交
1299
    atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
1300 1301 1302 1303

    // no lock needs to secure the access of the version
    if (pReq->igUntreated) {  // discard all the data  when the stream task is suspended.
      pTask->chkInfo.currentVer = sversion;
1304
      walReaderSeekVer(pTask->exec.pWalReader, sversion);
1305 1306 1307 1308 1309 1310 1311
      tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId,
              pTask->id.idStr, pTask->chkInfo.currentVer, sversion);
    } else {  // from the previous paused version and go on
      tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId,
              pTask->id.idStr, pTask->chkInfo.currentVer, sversion);
    }

L
liuyao 已提交
1312
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
liuyao 已提交
1313 1314 1315 1316 1317
    if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
      tqStartStreamTasks(pTq);
    } else {
      streamSchedExec(pTask);
    }
L
liuyao 已提交
1318
  }
1319

5
54liuyao 已提交
1320 1321 1322
  return 0;
}

L
Liu Jicong 已提交
1323 1324 1325 1326 1327 1328
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1329
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1330
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1331
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1332
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1333
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1334
  if (pTask) {
1335
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
1336
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1337
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1338
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1339
    return 0;
L
Liu Jicong 已提交
1340
  } else {
L
liuyao 已提交
1341
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1342
    return -1;
L
Liu Jicong 已提交
1343 1344 1345 1346 1347 1348 1349
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1350

1351 1352 1353 1354 1355 1356
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1357 1358 1359

  SStreamDispatchReq req;
  SDecoder           decoder;
1360
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1361 1362
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1363
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1364 1365
    goto FAIL;
  }
L
Liu Jicong 已提交
1366
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1367

L
Liu Jicong 已提交
1368
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1369

L
Liu Jicong 已提交
1370
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1371
  if (pTask) {
1372
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1373
    streamProcessDispatchMsg(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1374
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1375 1376
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1377
    return 0;
5
54liuyao 已提交
1378 1379
  } else {
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1380
  }
L
Liu Jicong 已提交
1381

1382 1383
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1384
FAIL:
1385 1386 1387 1388
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1389
    SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info};
1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1406
  SRpcMsg rsp = {
1407
      .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
1408
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1409
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1410 1411
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1412
  return -1;
L
Liu Jicong 已提交
1413
}
L
Liu Jicong 已提交
1414

1415
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1416

1417
int32_t tqStartStreamTasks(STQ* pTq) {
1418
  int32_t      vgId = TD_VID(pTq->pVnode);
1419
  SStreamMeta* pMeta = pTq->pStreamMeta;
1420

1421
  taosWLockLatch(&pMeta->lock);
1422

1423
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
1424
  if (numOfTasks == 0) {
1425
    tqInfo("vgId:%d no stream tasks exist", vgId);
1426 1427 1428 1429
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1430
  pMeta->walScanCounter += 1;
1431

1432 1433
  if (pMeta->walScanCounter > 1) {
    tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
1434 1435 1436 1437
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1438 1439 1440
  SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
  if (pRunReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1441
    tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
1442
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
1443 1444 1445
    return -1;
  }

H
Haojun Liao 已提交
1446
  tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks);
1447 1448
  pRunReq->head.vgId = vgId;
  pRunReq->streamId = 0;
1449
  pRunReq->taskId = WAL_READ_TASKS_ID;
1450 1451 1452

  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
  tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
1453
  taosWUnLockLatch(&pTq->pStreamMeta->lock);
1454 1455 1456

  return 0;
}