tq.c 53.2 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
static int32_t tqInitialize(STQ* pTq);
dengyihao's avatar
dengyihao 已提交
22

wmmhello's avatar
wmmhello 已提交
23
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
dengyihao's avatar
dengyihao 已提交
24 25
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
wmmhello's avatar
wmmhello 已提交
26

L
Liu Jicong 已提交
27
int32_t tqInit() {
L
Liu Jicong 已提交
28 29 30 31 32 33
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

34 35 36 37 38 39
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
40 41 42
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
43
    atomic_store_8(&tqMgmt.inited, 1);
44
  }
45

L
Liu Jicong 已提交
46 47
  return 0;
}
L
Liu Jicong 已提交
48

49
void tqCleanUp() {
L
Liu Jicong 已提交
50 51 52 53 54 55 56 57
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
58
    streamCleanUp();
L
Liu Jicong 已提交
59 60
    atomic_store_8(&tqMgmt.inited, 0);
  }
61
}
L
Liu Jicong 已提交
62

63
void tqDestroyTqHandle(void* data) {
64 65
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
wmmhello's avatar
wmmhello 已提交
66

67
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
68
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
69
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
70
    tqReaderClose(pData->execHandle.pTqReader);
71 72
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
73
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
74
    walCloseReader(pData->pWalReader);
75
    tqReaderClose(pData->execHandle.pTqReader);
76 77
    taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
    nodesDestroyNode(pData->execHandle.execTb.node);
78
  }
dengyihao's avatar
dengyihao 已提交
79
  if (pData->msg != NULL) {
80 81 82
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
D
dapan1121 已提交
83
  }
L
Liu Jicong 已提交
84 85
}

86 87 88 89 90
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
91
STQ* tqOpen(const char* path, SVnode* pVnode) {
92
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
93
  if (pTq == NULL) {
S
Shengliang Guan 已提交
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
95 96
    return NULL;
  }
97

98
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
99
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
100
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
101

102
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
103
  taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle);
104

105
  taosInitRWLatch(&pTq->lock);
106
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
107

108
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
109
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
110

111 112 113 114 115 116 117
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
118 119 120
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
121
  if (tqMetaOpen(pTq) < 0) {
122
    return -1;
123 124
  }

L
Liu Jicong 已提交
125 126
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
127
    return -1;
128 129
  }

130
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
131
  if (pTq->pStreamMeta == NULL) {
132
    return -1;
L
Liu Jicong 已提交
133 134
  }

135 136
  // the version is kept in task's meta data
  // todo check if this version is required or not
137 138
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
139 140
  }

141
  return 0;
L
Liu Jicong 已提交
142
}
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
void tqClose(STQ* pTq) {
145 146
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
147
  }
148 149 150 151 152 153 154 155 156

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
157
}
L
Liu Jicong 已提交
158

159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
  bool inTimer = false;

  taosWLockLatch(&pMeta->lock);

  void* pIter = NULL;
  while(1) {
    pIter = taosHashIterate(pMeta->pTasks, pIter);
    if (pIter == NULL) {
      break;
    }

    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->status.timerActive == 1) {
      inTimer = true;
    }
  }

  taosWUnLockLatch(&pMeta->lock);

  return inTimer;
}

H
Haojun Liao 已提交
182 183 184 185 186 187 188 189 190 191 192 193
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
H
Haojun Liao 已提交
194
      tqDebug("vgId:%d s-task:%s set closing flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
195 196 197
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
198
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
H
Haojun Liao 已提交
199

200
      int64_t el = taosGetTimestampMs() - st;
H
Haojun Liao 已提交
201
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
202 203 204
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
H
Haojun Liao 已提交
205 206 207 208 209

    tqDebug("vgId:%d start to check all tasks", pTq->pStreamMeta->vgId);

    int64_t st = taosGetTimestampMs();

210 211 212
    while(hasStreamTaskInTimer(pTq->pStreamMeta)) {
      tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pTq->pStreamMeta->vgId);
      taosMsleep(100);
H
Haojun Liao 已提交
213 214 215 216
    }

    int64_t el = taosGetTimestampMs() - st;
    tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%"PRId64" ms", pTq->pStreamMeta->vgId, el);
H
Haojun Liao 已提交
217 218 219
  }
}

D
dapan1121 已提交
220 221
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
222 223
  int32_t len = 0;
  int32_t code = 0;
D
dapan1121 已提交
224 225

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
226
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
D
dapan1121 已提交
227 228 229
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
230 231 232 233 234 235 236 237 238 239 240

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

D
dapan1121 已提交
241 242 243
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
244 245 246 247 248 249

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);

D
dapan1121 已提交
250
  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
251
    tEncodeMqDataRsp(&encoder, pRsp);
D
dapan1121 已提交
252
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
X
Xiaoyu Wang 已提交
253
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
dengyihao's avatar
dengyihao 已提交
254
  }
L
Liu Jicong 已提交
255

wmmhello's avatar
wmmhello 已提交
256
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
257 258

  SRpcMsg rsp = {
D
dapan1121 已提交
259
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
260 261 262 263
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
L
Liu Jicong 已提交
264

L
Liu Jicong 已提交
265
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
266 267
  return 0;
}
L
Liu Jicong 已提交
268

H
Haojun Liao 已提交
269
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
270 271 272 273
  SMqDataRsp dataRsp = {0};
  dataRsp.head.consumerId = pHandle->consumerId;
  dataRsp.head.epoch = pHandle->epoch;
  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
274 275

  int64_t sver = 0, ever = 0;
H
Haojun Liao 已提交
276
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
dengyihao's avatar
dengyihao 已提交
277 278
  tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
                  ever);
D
dapan1121 已提交
279

280 281
  char buf1[TSDB_OFFSET_LEN] = {0};
  char buf2[TSDB_OFFSET_LEN] = {0};
282 283
  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
dengyihao's avatar
dengyihao 已提交
284 285
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
          dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
L
Liu Jicong 已提交
286 287 288
  return 0;
}

289 290 291 292 293 294
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
                      int32_t type, int32_t vgId) {
  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
295

296 297 298 299
  char buf1[TSDB_OFFSET_LEN] = {0};
  char buf2[TSDB_OFFSET_LEN] = {0};
  tFormatOffset(buf1, TSDB_OFFSET_LEN, &pRsp->reqOffset);
  tFormatOffset(buf2, TSDB_OFFSET_LEN, &pRsp->rspOffset);
300

301
  tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId,
dengyihao's avatar
dengyihao 已提交
302
          pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
303 304 305 306

  return 0;
}

307
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
308 309
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
310

X
Xiaoyu Wang 已提交
311 312
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
313
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
314 315
    return -1;
  }
316

317 318
  tDecoderClear(&decoder);

319 320 321
  STqOffset* pOffset = &vgOffset.offset;

  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
322
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
323 324 325 326 327 328
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
            pOffset->val.version);
    if (pOffset->val.version + 1 == sversion) {
      pOffset->val.version += 1;
329
    }
330
  } else {
331
    tqError("invalid commit offset type:%d", pOffset->val.type);
332
    return -1;
333
  }
334

335 336
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
337
    tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
338
            vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
339
    return 0;  // no need to update the offset value
340 341
  }

342
  // save the new offset value
343
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
344
    return -1;
345
  }
346

347 348 349
  return 0;
}

350
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
351 352
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
353 354 355

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
356
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
H
Haojun Liao 已提交
357
    tqError("vgId:%d failed to decode seek msg", vgId);
358 359 360 361 362
    return -1;
  }

  tDecoderClear(&decoder);

H
Haojun Liao 已提交
363 364 365
  tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
          vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);

366 367 368
  STqOffset* pOffset = &vgOffset.offset;
  if (pOffset->val.type != TMQ_OFFSET__LOG) {
    tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
369 370 371
    return -1;
  }

372 373
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
  if (pHandle == NULL) {
dengyihao's avatar
dengyihao 已提交
374
    tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey);
375 376
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
377 378
  }

379 380 381 382 383 384 385 386
  // 2. check consumer-vg assignment status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != vgOffset.consumerId) {
    tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
387
  }
388 389
  taosRUnLockLatch(&pTq->lock);

dengyihao's avatar
dengyihao 已提交
390
  // 3. check the offset info
391 392 393 394 395 396
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL) {
    if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
      return 0;  // no need to update the offset value
    }
397

398 399 400 401 402 403
    if (pSavedOffset->val.version == pOffset->val.version) {
      tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
              pOffset->val.version, pSavedOffset->val.version);
      return 0;
    }
  }
404 405 406

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
407 408 409 410
  if (pOffset->val.version < sver) {
    pOffset->val.version = sver;
  } else if (pOffset->val.version > ever) {
    pOffset->val.version = ever;
411 412 413
  }

  // save the new offset value
414 415 416 417
  if (pSavedOffset != NULL) {
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
            pSavedOffset->val.version);
  } else {
dengyihao's avatar
dengyihao 已提交
418
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version);
419
  }
420

421 422
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
    tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
423 424 425
    return -1;
  }

H
Haojun Liao 已提交
426 427 428
  tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
          vgOffset.consumerId, vgOffset.offset.val.version);

429 430 431
  return 0;
}

L
Liu Jicong 已提交
432
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
433
  void* pIter = NULL;
434

L
Liu Jicong 已提交
435
  while (1) {
436
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
437 438 439 440
    if (pIter == NULL) {
      break;
    }

441
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
442

L
Liu Jicong 已提交
443 444
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
445
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
446 447
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
448
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
449 450 451 452 453
          return -1;
        }
      }
    }
  }
454

L
Liu Jicong 已提交
455 456 457
  return 0;
}

458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
  int32_t vgId = TD_VID(pTq->pVnode);
  taosWLockLatch(&pTq->lock);
  if (taosHashGetSize(pTq->pPushMgr) > 0) {
    void* pIter = taosHashIterate(pTq->pPushMgr, NULL);

    while (pIter) {
      STqHandle* pHandle = *(STqHandle**)pIter;
      tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);

      if (ASSERT(pHandle->msg != NULL)) {
        tqError("pHandle->msg should not be null");
        break;
      }else{
        SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
        tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
        taosMemoryFree(pHandle->msg);
        pHandle->msg = NULL;
      }

      pIter = taosHashIterate(pTq->pPushMgr, pIter);
    }

    taosHashClear(pTq->pPushMgr);
  }
wmmhello's avatar
wmmhello 已提交
483
  taosWUnLockLatch(&pTq->lock);
484 485 486
  return 0;
}

D
dapan1121 已提交
487
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
488
  SMqPollReq req = {0};
dengyihao's avatar
dengyihao 已提交
489
  int        code = 0;
D
dapan1121 已提交
490 491 492 493 494 495 496 497 498 499
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);
wmmhello's avatar
wmmhello 已提交
500
  STqHandle*   pHandle = NULL;
D
dapan1121 已提交
501

wmmhello's avatar
wmmhello 已提交
502 503 504 505 506 507 508 509 510 511
  while (1) {
    taosWLockLatch(&pTq->lock);
    // 1. find handle
    pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
    if (pHandle == NULL) {
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_MSG;
      taosWUnLockLatch(&pTq->lock);
      return -1;
    }
D
dapan1121 已提交
512

513 514
    // 2. check re-balance status
    if (pHandle->consumerId != consumerId) {
dengyihao's avatar
dengyihao 已提交
515 516
      tqError("ERROR tmq poll: consumer:0x%" PRIx64
              " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
517
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
518
      terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
519 520 521
      taosWUnLockLatch(&pTq->lock);
      return -1;
    }
wmmhello's avatar
wmmhello 已提交
522

wmmhello's avatar
wmmhello 已提交
523
    bool exec = tqIsHandleExec(pHandle);
dengyihao's avatar
dengyihao 已提交
524
    if (!exec) {
wmmhello's avatar
wmmhello 已提交
525
      tqSetHandleExec(pHandle);
dengyihao's avatar
dengyihao 已提交
526 527 528
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
      tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
              req.subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
529 530 531
      taosWUnLockLatch(&pTq->lock);
      break;
    }
532
    taosWUnLockLatch(&pTq->lock);
533

dengyihao's avatar
dengyihao 已提交
534 535 536
    tqDebug("tmq poll: consumer:0x%" PRIx64
            "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
            consumerId, vgId, req.subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
537
    taosMsleep(10);
D
dapan1121 已提交
538 539 540
  }

  // 3. update the epoch value
541
  if (pHandle->epoch < reqEpoch) {
dengyihao's avatar
dengyihao 已提交
542
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
X
Xiaoyu Wang 已提交
543
            reqEpoch);
D
dapan1121 已提交
544 545 546
    pHandle->epoch = reqEpoch;
  }

547 548
  char buf[TSDB_OFFSET_LEN];
  tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
D
dapan1121 已提交
549 550 551
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);

wmmhello's avatar
wmmhello 已提交
552
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
553
  tqSetHandleIdle(pHandle);
554

dengyihao's avatar
dengyihao 已提交
555 556
  tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId,
          req.subKey, pHandle);
557
  return code;
D
dapan1121 已提交
558 559
}

560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  // 2. check re-balance status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, vgId, req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
  }
  taosRUnLockLatch(&pTq->lock);

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);

  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
  if (pOffset != NULL) {
    if (pOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
    dataRsp.rspOffset.version = pOffset->val.version;
  } else {
    if (req.useSnapshot == true) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;

618
    if (reqOffset.type == TMQ_OFFSET__LOG) {
619
      int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
dengyihao's avatar
dengyihao 已提交
620
      if (currentVer == -1) {  // not start to read data from wal yet, return req offset directly
621 622 623 624
        dataRsp.rspOffset.version = reqOffset.version;
      } else {
        dataRsp.rspOffset.version = currentVer;  // return current consume offset value
      }
625
    } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
626
      dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
627 628 629 630 631 632 633 634 635 636 637 638 639 640 641
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      dataRsp.rspOffset.version = ever;
    } else {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
              reqOffset.type);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }
  }

  tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
  return 0;
}

642
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
643
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
dengyihao's avatar
dengyihao 已提交
644
  int32_t        vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
645

646
  tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
647
  int32_t code = 0;
L
Liu Jicong 已提交
648

wmmhello's avatar
wmmhello 已提交
649
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
650 651
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
652
    while (tqIsHandleExec(pHandle)) {
dengyihao's avatar
dengyihao 已提交
653 654
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId,
              pHandle->subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
655
      taosMsleep(10);
656
    }
657

L
Liu Jicong 已提交
658 659 660
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
661

L
Liu Jicong 已提交
662 663 664 665
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
666
  }
667

L
Liu Jicong 已提交
668 669
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
670
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
671
  }
L
Liu Jicong 已提交
672

L
Liu Jicong 已提交
673
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
674
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
675
  }
wmmhello's avatar
wmmhello 已提交
676 677
  taosWUnLockLatch(&pTq->lock);

L
Liu Jicong 已提交
678
  return 0;
L
Liu Jicong 已提交
679 680
}

681
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
682 683
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
684
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
685
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
686 687 688 689
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
690 691 692 693 694
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
695 696 697 698 699 700
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

701
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
702 703 704 705 706 707 708 709 710 711 712
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

713
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
dengyihao's avatar
dengyihao 已提交
714
  int         ret = 0;
L
Liu Jicong 已提交
715
  SMqRebVgReq req = {0};
dengyihao's avatar
dengyihao 已提交
716
  SDecoder    dc = {0};
717 718 719 720 721 722 723 724 725

  tDecoderInit(&dc, msg, msgLen);

  // decode req
  if (tDecodeSMqRebVgReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
  }
L
Liu Jicong 已提交
726

727
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
D
dapan1121 已提交
728
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
729

730 731 732 733 734 735 736 737
  STqHandle* pHandle = NULL;
  while(1){
    pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
    if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0){
      break;
    }
  }

L
Liu Jicong 已提交
738
  if (pHandle == NULL) {
L
Liu Jicong 已提交
739
    if (req.oldConsumerId != -1) {
740
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
741
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
742
    }
L
Liu Jicong 已提交
743
    if (req.newConsumerId == -1) {
744
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
745
      goto end;
L
Liu Jicong 已提交
746
    }
747 748 749 750
    STqHandle handle = {0};
    ret = tqCreateHandle(pTq, &req, &handle);
    if(ret < 0){
      tqDestroyTqHandle(&handle);
751
      goto end;
752
    }
753
    ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
L
Liu Jicong 已提交
754
  } else {
755
    taosWLockLatch(&pTq->lock);
wmmhello's avatar
wmmhello 已提交
756

D
dapan1121 已提交
757
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
dengyihao's avatar
dengyihao 已提交
758 759
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId,
             req.newConsumerId);
760 761 762
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
763 764
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
    }
dengyihao's avatar
dengyihao 已提交
765
    //    atomic_add_fetch_32(&pHandle->epoch, 1);
766

767
    // kill executing task
dengyihao's avatar
dengyihao 已提交
768 769 770 771 772 773 774 775 776 777
    //    if(tqIsHandleExec(pHandle)) {
    //      qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
    //      if (pTaskInfo != NULL) {
    //        qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
    //      }

    //      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
    //        qStreamCloseTsdbReader(pTaskInfo);
    //      }
    //    }
wmmhello's avatar
wmmhello 已提交
778 779
    // remove if it has been register in the push manager, and return one empty block to consumer
    tqUnregisterPushHandle(pTq, pHandle);
780
    taosWUnLockLatch(&pTq->lock);
781
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
L
Liu Jicong 已提交
782
  }
L
Liu Jicong 已提交
783

784
end:
785
  tDecoderClear(&dc);
786
  return ret;
L
Liu Jicong 已提交
787
}
788

dengyihao's avatar
dengyihao 已提交
789
void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
L
liuyao 已提交
790

791
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
D
dapan1121 已提交
792
  int32_t vgId = TD_VID(pTq->pVnode);
793

794
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
795
  pTask->refCnt = 1;
796
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
dengyihao's avatar
dengyihao 已提交
797 798
  pTask->inputQueue = streamQueueOpen(512 << 10);
  pTask->outputQueue = streamQueueOpen(512 << 10);
L
Liu Jicong 已提交
799 800

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
801
    return -1;
L
Liu Jicong 已提交
802 803
  }

L
Liu Jicong 已提交
804 805
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
806
  pTask->pMsgCb = &pTq->pVnode->msgCb;
807
  pTask->pMeta = pTq->pStreamMeta;
808

809
  pTask->chkInfo.version = ver;
810
  pTask->chkInfo.currentVer = ver;
811

812 813 814
  pTask->dataRange.range.maxVer = ver;
  pTask->dataRange.range.minVer = ver;

815
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
L
liuyao 已提交
816
    SStreamTask* pSateTask = pTask;
L
liuyao 已提交
817
    SStreamTask task = {0};
L
liuyao 已提交
818
    if (pTask->info.fillHistory) {
L
liuyao 已提交
819 820 821
      task.id = pTask->streamTaskId;
      task.pMeta = pTask->pMeta;
      pSateTask = &task;
L
liuyao 已提交
822
    }
823

L
liuyao 已提交
824
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
825 826 827 828
    if (pTask->pState == NULL) {
      return -1;
    }

L
liuyao 已提交
829 830 831 832 833
    SReadHandle handle = {.vnode = pTq->pVnode,
                          .initTqReader = 1,
                          .pStateBackend = pTask->pState,
                          .fillHistory = pTask->info.fillHistory,
                          .winRange = pTask->dataRange.window};
834
    initStorageAPI(&handle.api);
835

836 837
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
838 839
      return -1;
    }
840

841
    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
842
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
L
liuyao 已提交
843
    SStreamTask* pSateTask = pTask;
L
liuyao 已提交
844
    SStreamTask task = {0};
L
liuyao 已提交
845
    if (pTask->info.fillHistory) {
L
liuyao 已提交
846 847 848
      task.id = pTask->streamTaskId;
      task.pMeta = pTask->pMeta;
      pSateTask = &task;
L
liuyao 已提交
849
    }
L
liuyao 已提交
850
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
851 852 853
    if (pTask->pState == NULL) {
      return -1;
    }
854

855
    int32_t     numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList);
L
liuyao 已提交
856 857 858 859 860
    SReadHandle handle = {.vnode = NULL,
                          .numOfVgroups = numOfVgroups,
                          .pStateBackend = pTask->pState,
                          .fillHistory = pTask->info.fillHistory,
                          .winRange = pTask->dataRange.window};
861
    initStorageAPI(&handle.api);
862

863
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
864
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
865 866
      return -1;
    }
867 868

    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
L
Liu Jicong 已提交
869
  }
L
Liu Jicong 已提交
870 871

  // sink
872
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
873
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
874
    pTask->smaSink.smaSink = smaHandleRes;
875
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
876
    pTask->tbSink.vnode = pTq->pVnode;
H
Haojun Liao 已提交
877
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
L
Liu Jicong 已提交
878

X
Xiaoyu Wang 已提交
879
    int32_t   ver1 = 1;
5
54liuyao 已提交
880
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
881
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
882
    if (code == TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
883
      ver1 = info.skmVer;
5
54liuyao 已提交
884
    }
L
Liu Jicong 已提交
885

886 887
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
888
    if (pTask->tbSink.pTSchema == NULL) {
D
dapan1121 已提交
889 890
      return -1;
    }
L
liuyao 已提交
891 892
    pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
    tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
L
Liu Jicong 已提交
893
  }
894

895
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
896
    SWalFilterCond cond = {.deleteMsg = 1};  // delete msg also extract from wal files
897
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
898 899
  }

900
  streamSetupScheduleTrigger(pTask);
901

902 903
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64
         " child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms",
904
         vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel,
905
         pTask->info.fillHistory, pTask->triggerParam);
906 907 908

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
909
  return 0;
L
Liu Jicong 已提交
910
}
L
Liu Jicong 已提交
911

912
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
913 914 915 916
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

917 918
  SStreamTaskCheckReq req;
  SDecoder            decoder;
919

X
Xiaoyu Wang 已提交
920
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
921
  tDecodeStreamTaskCheckReq(&decoder, &req);
922
  tDecoderClear(&decoder);
923

924 925
  int32_t taskId = req.downstreamTaskId;

926 927 928 929 930 931 932 933 934
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
935

L
Liu Jicong 已提交
936
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
937

938
  if (pTask != NULL) {
939
    rsp.status = streamTaskCheckStatus(pTask);
940 941
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

942
    tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d",
943 944
            pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status);
945 946
  } else {
    rsp.status = 0;
947 948
    tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
            taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
949 950 951 952 953
  }

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
954

955
  tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code);
956
  if (code < 0) {
957
    tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId);
L
Liu Jicong 已提交
958
    return -1;
959
  }
L
Liu Jicong 已提交
960

961 962 963 964 965
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
966
  tEncodeStreamTaskCheckRsp(&encoder, &rsp);
967 968
  tEncoderClear(&encoder);

969
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
970

971 972 973 974
  tmsgSendRsp(&rspMsg);
  return 0;
}

975 976 977 978
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
  char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t len = pMsg->contLen - sizeof(SMsgHead);

979 980 981 982
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
983
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
984
  code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
985

986 987 988 989 990
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

991
  tDecoderClear(&decoder);
992 993
  tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
          rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
994

L
Liu Jicong 已提交
995
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
996
  if (pTask == NULL) {
997
    tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
998
            pTq->pStreamMeta->vgId);
999
    terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
1000 1001 1002
    return -1;
  }

1003
  code = streamProcessCheckRsp(pTask, &rsp);
L
Liu Jicong 已提交
1004 1005
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
1006 1007
}

1008
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1009 1010 1011
  int32_t code = 0;
  int32_t vgId = TD_VID(pTq->pVnode);

5
54liuyao 已提交
1012 1013 1014
  if (tsDisableStream) {
    return 0;
  }
1015 1016 1017 1018

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
1019
    terrno = TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
1020 1021
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId,
            (int32_t)sizeof(SStreamTask));
1022 1023
    return -1;
  }
1024

1025 1026
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1027
  code = tDecodeStreamTask(&decoder, pTask);
1028 1029 1030 1031 1032
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
1033

1034 1035
  tDecoderClear(&decoder);

1036 1037
  SStreamMeta* pStreamMeta = pTq->pStreamMeta;

1038
  // 2.save task, use the newest commit version as the initial start version of stream task.
1039 1040 1041 1042
  taosWLockLatch(&pStreamMeta->lock);
  code = streamMetaAddDeployedTask(pStreamMeta, sversion, pTask);

  int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
1043
  if (code < 0) {
1044
    tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
1045
    taosWUnLockLatch(&pStreamMeta->lock);
1046 1047 1048
    return -1;
  }

1049
  taosWUnLockLatch(&pStreamMeta->lock);
1050

1051
  // 3. It's an fill history task, do nothing. wait for the main task to start it
1052
  streamPrepareNdoCheckDownstream(pTask);
1053

1054 1055
  tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr,
          streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
1056

1057 1058 1059
  return 0;
}

1060
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
1061
  int32_t code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1062 1063
  char*   msg = pMsg->pCont;

1064
  SStreamMeta*           pMeta = pTq->pStreamMeta;
1065
  SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg;
1066 1067

  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
1068
  if (pTask == NULL) {
1069 1070
    tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
            pMeta->vgId, pReq->taskId);
1071 1072 1073 1074
    return -1;
  }

  // do recovery step 1
1075 1076 1077
  const char* pId = pTask->id.idStr;
  tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pId,
          streamGetTaskStatusStr(pTask->status.taskStatus));
1078

H
Haojun Liao 已提交
1079
  int64_t st = taosGetTimestampMs();
1080 1081
  int8_t  schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
                                                      TASK_SCHED_STATUS__WAITING);
1082 1083 1084 1085 1086
  if (schedStatus != TASK_SCHED_STATUS__INACTIVE) {
    ASSERT(0);
    return 0;
  }

1087
  if (!streamTaskRecoverScanStep1Finished(pTask)) {
L
liuyao 已提交
1088 1089
    streamSourceScanHistoryData(pTask);
  }
1090

L
liuyao 已提交
1091
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
1092
    tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId);
L
liuyao 已提交
1093
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
1094
    streamMetaReleaseTask(pMeta, pTask);
L
Liu Jicong 已提交
1095 1096 1097
    return 0;
  }

H
Haojun Liao 已提交
1098
  double el = (taosGetTimestampMs() - st) / 1000.0;
1099
  tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pId, el);
1100

H
Haojun Liao 已提交
1101
  if (pTask->info.fillHistory) {
L
liuyao 已提交
1102
    SVersionRange* pRange = NULL;
1103
    SStreamTask*   pStreamTask = NULL;
1104

L
liuyao 已提交
1105 1106 1107 1108 1109 1110
    if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
      // 1. stop the related stream task, get the current scan wal version of stream task, ver.
      pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
      if (pStreamTask == NULL) {
        // todo handle error
      }
H
Haojun Liao 已提交
1111

L
liuyao 已提交
1112
      ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
1113

L
liuyao 已提交
1114
      // wait for the stream task get ready for scan history data
1115
      while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) ||
1116
             pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
1117 1118
        tqDebug("s-task:%s level:%d related stream task:%s not ready for halt, wait for it and recheck in 100ms", pId,
                pTask->info.taskLevel, pId);
L
liuyao 已提交
1119 1120
        taosMsleep(100);
      }
1121

L
liuyao 已提交
1122 1123
      // now we can stop the stream task execution
      pStreamTask->status.taskStatus = TASK_STATUS__HALT;
1124
      tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pId,
1125
              pStreamTask->info.taskLevel, pId);
H
Haojun Liao 已提交
1126

L
liuyao 已提交
1127
      // if it's an source task, extract the last version in wal.
1128
      streamHistoryTaskSetVerRangeStep2(pTask);
L
liuyao 已提交
1129
    }
1130

L
liuyao 已提交
1131
    if (!streamTaskRecoverScanStep1Finished(pTask)) {
1132 1133
      tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s",
              pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pId);
1134
      ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
H
Haojun Liao 已提交
1135

1136
      st = taosGetTimestampMs();
L
liuyao 已提交
1137 1138
      streamSetParamForStreamScannerStep2(pTask, pRange, &pTask->dataRange.window);
    }
1139

1140
    if (!streamTaskRecoverScanStep2Finished(pTask)) {
1141
      streamSourceScanHistoryData(pTask);
1142

L
liuyao 已提交
1143
      if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
1144
        tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId);
1145 1146 1147
        streamMetaReleaseTask(pMeta, pTask);
        return 0;
      }
1148

L
liuyao 已提交
1149
      streamTaskRecoverSetAllStepFinished(pTask);
1150
    }
H
Haojun Liao 已提交
1151

L
liuyao 已提交
1152
    el = (taosGetTimestampMs() - st) / 1000.0;
1153
    tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pId, el);
L
liuyao 已提交
1154

1155
    // 3. notify downstream tasks to transfer executor state after handle all history blocks.
L
liuyao 已提交
1156 1157 1158 1159 1160
    if (!pTask->status.transferState) {
      code = streamDispatchTransferStateMsg(pTask);
      if (code != TSDB_CODE_SUCCESS) {
        // todo handle error
      }
1161 1162

      pTask->status.transferState = true;
H
Haojun Liao 已提交
1163
    }
H
Haojun Liao 已提交
1164 1165 1166

    // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
    // 5. resume the related stream task.
1167
    streamTryExec(pTask);
H
Haojun Liao 已提交
1168

1169
    pTask->status.taskStatus = TASK_STATUS__DROPPING;
1170
    tqDebug("s-task:%s scan-history-task set status to be dropping", pId);
1171

1172
    streamMetaSaveTask(pMeta, pTask);
1173
    streamMetaSaveTask(pMeta, pStreamTask);
1174

1175
    streamMetaReleaseTask(pMeta, pTask);
1176 1177
    streamMetaReleaseTask(pMeta, pStreamTask);

1178
    taosWLockLatch(&pMeta->lock);
1179 1180
    if (streamMetaCommit(pTask->pMeta) < 0) {
      // persist to disk
1181
    }
1182
    taosWUnLockLatch(&pMeta->lock);
1183 1184 1185 1186
  } else {
    // todo update the chkInfo version for current task.
    // this task has an associated history stream task, so we need to scan wal from the end version of
    // history scan. The current version of chkInfo.current is not updated during the history scan
1187 1188
    STimeWindow* pWindow = &pTask->dataRange.window;

1189
    if (pTask->historyTaskId.taskId == 0) {
1190
      *pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
1191 1192
      tqDebug("s-task:%s no related scan-history-data task, reset the time window:%" PRId64 " - %" PRId64, pId,
              pWindow->skey, pWindow->ekey);
1193
    } else {
1194 1195 1196 1197
      tqDebug(
          "s-task:%s history data in current time window scan completed, now start to handle data from WAL, start "
          "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
          pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
1198
    }
1199

1200
    // notify the downstream agg tasks that upstream tasks are ready to processing the WAL data, update the
1201 1202
    code = streamTaskScanHistoryDataComplete(pTask);
    streamMetaReleaseTask(pMeta, pTask);
1203 1204 1205 1206 1207 1208

    // let's start the stream task by extracting data from wal
    if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
      tqStartStreamTasks(pTq);
    }

1209 1210
    return code;
  }
H
Haojun Liao 已提交
1211

1212 1213 1214
  return 0;
}

H
Haojun Liao 已提交
1215 1216
// notify the downstream tasks to transfer executor state after handle all history blocks.
int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1217 1218 1219 1220
  SStreamTransferReq req;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1221
  int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
H
Haojun Liao 已提交
1222
  tDecoderClear(&decoder);
H
Haojun Liao 已提交
1223

1224
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1225
  if (pTask == NULL) {
1226
    tqError("failed to find task:0x%x, it may have been dropped already", req.taskId);
1227 1228
    return -1;
  }
H
Haojun Liao 已提交
1229

L
liuyao 已提交
1230
  // transfer the ownership of executor state
L
liuyao 已提交
1231
  streamTaskReleaseState(pTask);
1232 1233
  tqDebug("s-task:%s receive state transfer req", pTask->id.idStr);

1234
  // related stream task load the state from the state storage backend
L
liuyao 已提交
1235
  SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId);
1236 1237
  if (pStreamTask == NULL) {
    tqError("failed to find related stream task:0x%x, it may have been dropped already", req.taskId);
1238 1239 1240
    return -1;
  }

1241
  streamTaskReloadState(pStreamTask);
1242

1243 1244
  ASSERT(pTask->streamTaskId.taskId != 0);
  pTask->status.transferState = true;
L
Liu Jicong 已提交
1245

H
Haojun Liao 已提交
1246
  streamSchedExec(pTask);
L
Liu Jicong 已提交
1247
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
H
Haojun Liao 已提交
1248

1249 1250 1251
  return 0;
}

1252
int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
1253 1254
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1255 1256

  // deserialize
1257
  SStreamScanHistoryFinishReq req = {0};
1258 1259

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1260
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1261
  tDecodeStreamScanHistoryFinishReq(&decoder, &req);
1262 1263
  tDecoderClear(&decoder);

1264
  // find task
L
Liu Jicong 已提交
1265
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1266
  if (pTask == NULL) {
1267
    tqError("failed to find task:0x%x, it may be destroyed, vgId:%d", req.taskId, pTq->pStreamMeta->vgId);
1268 1269 1270
    return -1;
  }

1271
  int32_t code = streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId);
L
Liu Jicong 已提交
1272
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1273
  return code;
L
Liu Jicong 已提交
1274
}
L
Liu Jicong 已提交
1275

L
Liu Jicong 已提交
1276 1277 1278 1279 1280
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

1281 1282 1283 1284
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

H
Haojun Liao 已提交
1285 1286
  *pRefBlock = NULL;

1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tDecoderInit(pCoder, (uint8_t*)pData, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
  if (numOfTables == 0 || pRes->affectedRows == 0) {
    taosArrayDestroy(pRes->uidList);
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, numOfTables);
  pDelBlock->info.rows = numOfTables;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < numOfTables; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataSetVal(pUidCol, i, (const char*)pUid, false);

    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

  taosArrayDestroy(pRes->uidList);
  *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
  if (pRefBlock == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
  (*pRefBlock)->pBlock = pDelBlock;
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1334 1335
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1336 1337 1338 1339

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1340 1341 1342 1343 1344 1345
  if (taskId == STREAM_TASK_STATUS_CHECK_ID) {
    tqStreamTasksStatusCheck(pTq);
    return 0;
  }

  if (taskId == EXTRACT_DATA_FROM_WAL_ID) {  // all tasks are extracted submit data from the wal
1346
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1347
    return 0;
1348
  }
1349

1350
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1351
  if (pTask != NULL) {
1352
    // even in halt status, the data in inputQ must be processed
1353 1354 1355
    int8_t status = pTask->status.taskStatus;
    if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) {
      tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
dengyihao's avatar
dengyihao 已提交
1356
              pTask->chkInfo.version);
1357
      streamProcessRunReq(pTask);
1358
    } else {
1359
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
1360 1361
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
              pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
1362
    }
1363

L
Liu Jicong 已提交
1364
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1365
    tqStartStreamTasks(pTq);
L
Liu Jicong 已提交
1366
    return 0;
1367
  } else {
1368
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
1369
    return -1;
L
Liu Jicong 已提交
1370
  }
L
Liu Jicong 已提交
1371 1372
}

L
Liu Jicong 已提交
1373
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
dengyihao's avatar
dengyihao 已提交
1374 1375 1376
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1377 1378 1379 1380

  SStreamDispatchReq req = {0};

  SDecoder decoder;
L
Liu Jicong 已提交
1381
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1382
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1383

1384
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1385
  if (pTask) {
1386
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1387
    streamProcessDispatchMsg(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1388
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1389
    return 0;
1390
  } else {
L
liuyao 已提交
1391
    tDeleteStreamDispatchReq(&req);
1392
    return -1;
L
Liu Jicong 已提交
1393
  }
L
Liu Jicong 已提交
1394 1395
}

L
Liu Jicong 已提交
1396 1397
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1398
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1399
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1400 1401

  int32_t vgId = pTq->pStreamMeta->vgId;
L
Liu Jicong 已提交
1402
  if (pTask) {
1403
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1404
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1405
    return 0;
1406
  } else {
1407
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
1408
    return TSDB_CODE_INVALID_MSG;
L
Liu Jicong 已提交
1409
  }
L
Liu Jicong 已提交
1410
}
L
Liu Jicong 已提交
1411

1412
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1413
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1414 1415
  tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);

1416
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1417
  return 0;
L
Liu Jicong 已提交
1418
}
L
Liu Jicong 已提交
1419

L
liuyao 已提交
1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433
int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) {
  if (pTask) {
    if (!streamTaskShouldPause(&pTask->status)) {
      tqDebug("vgId:%d s-task:%s set pause flag", pStreamMeta->vgId, pTask->id.idStr);
      atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
      atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
    }
    streamMetaReleaseTask(pStreamMeta, pTask);
  } else {
    return -1;
  }
  return 0;
}

5
54liuyao 已提交
1434 1435
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
dengyihao's avatar
dengyihao 已提交
1436
  SStreamTask*          pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
L
liuyao 已提交
1437 1438 1439 1440 1441
  int32_t code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pTask);
  if (code != 0) {
    return code;
  }
  SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId);
L
liuyao 已提交
1442 1443 1444
  if (pHistoryTask) {
    code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pHistoryTask);
  }
L
liuyao 已提交
1445 1446 1447 1448 1449
  return code;
}

int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int8_t igUntreated) {
  int32_t vgId = pTq->pStreamMeta->vgId;
1450 1451 1452
  if (pTask == NULL) {
    return -1;
  }
L
liuyao 已提交
1453

1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474
  if (streamTaskShouldPause(&pTask->status)) {
    atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);

    // no lock needs to secure the access of the version
    if (igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
      // discard all the data  when the stream task is suspended.
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
              ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
    } else {  // from the previous paused version and go on
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
    }

    if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
      streamStartRecoverTask(pTask, igUntreated);
    } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
      tqStartStreamTasks(pTq);
    } else {
      streamSchedExec(pTask);
L
liuyao 已提交
1475
    }
L
liuyao 已提交
1476
  }
1477 1478

  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
5
54liuyao 已提交
1479 1480 1481 1482 1483
  return 0;
}

int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
L
liuyao 已提交
1484
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
L
liuyao 已提交
1485 1486 1487
  int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
  if (code != 0) {
    return code;
L
liuyao 已提交
1488
  }
1489

L
liuyao 已提交
1490
  SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId);
L
liuyao 已提交
1491 1492 1493
  if (pHistoryTask) {
    code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
  }
1494

L
liuyao 已提交
1495
  return code;
5
54liuyao 已提交
1496 1497
}

L
Liu Jicong 已提交
1498
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
1499 1500 1501 1502 1503
  char*    msgStr = pMsg->pCont;
  char*    msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t  msgLen = pMsg->contLen - sizeof(SMsgHead);
  SDecoder decoder;

L
Liu Jicong 已提交
1504
  SStreamRetrieveReq req;
1505
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1506
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1507
  tDecoderClear(&decoder);
1508

L
Liu Jicong 已提交
1509
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1510
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1511

L
Liu Jicong 已提交
1512
  if (pTask) {
1513
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
1514
    streamProcessRetrieveReq(pTask, &req, &rsp);
1515

L
Liu Jicong 已提交
1516
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1517
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1518
    return 0;
L
Liu Jicong 已提交
1519
  } else {
L
liuyao 已提交
1520
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1521
    return -1;
L
Liu Jicong 已提交
1522 1523 1524 1525 1526 1527 1528
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1529

1530 1531 1532 1533 1534 1535
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1536 1537 1538

  SStreamDispatchReq req;
  SDecoder           decoder;
1539
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1540 1541
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1542
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1543 1544
    goto FAIL;
  }
L
Liu Jicong 已提交
1545
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1546

L
Liu Jicong 已提交
1547
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1548

L
Liu Jicong 已提交
1549
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1550
  if (pTask) {
1551
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1552
    streamProcessDispatchMsg(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1553
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1554 1555
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1556
    return 0;
5
54liuyao 已提交
1557 1558
  } else {
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1559
  }
L
Liu Jicong 已提交
1560

1561 1562
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1563
FAIL:
1564 1565 1566 1567
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1568
    SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info};
1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1585
  SRpcMsg rsp = {
1586
      .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
1587
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1588
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1589 1590
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1591
  return -1;
L
Liu Jicong 已提交
1592
}
L
Liu Jicong 已提交
1593

1594
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1595