tq.c 51.8 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
static int32_t tqInitialize(STQ* pTq);
dengyihao's avatar
dengyihao 已提交
22

wmmhello's avatar
wmmhello 已提交
23
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
dengyihao's avatar
dengyihao 已提交
24 25
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
wmmhello's avatar
wmmhello 已提交
26

L
Liu Jicong 已提交
27
int32_t tqInit() {
L
Liu Jicong 已提交
28 29 30 31 32 33
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

34 35 36 37 38 39
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
40 41 42
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
43
    atomic_store_8(&tqMgmt.inited, 1);
44
  }
45

L
Liu Jicong 已提交
46 47
  return 0;
}
L
Liu Jicong 已提交
48

49
void tqCleanUp() {
L
Liu Jicong 已提交
50 51 52 53 54 55 56 57
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
58
    streamCleanUp();
L
Liu Jicong 已提交
59 60
    atomic_store_8(&tqMgmt.inited, 0);
  }
61
}
L
Liu Jicong 已提交
62

63
void tqDestroyTqHandle(void* data) {
64 65
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
wmmhello's avatar
wmmhello 已提交
66

67
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
68
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
69
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
70
    tqReaderClose(pData->execHandle.pTqReader);
71 72
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
73
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
74
    walCloseReader(pData->pWalReader);
75
    tqReaderClose(pData->execHandle.pTqReader);
76 77
    taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
    nodesDestroyNode(pData->execHandle.execTb.node);
78
  }
dengyihao's avatar
dengyihao 已提交
79
  if (pData->msg != NULL) {
80 81 82
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
D
dapan1121 已提交
83
  }
L
Liu Jicong 已提交
84 85
}

86 87 88 89 90
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
91
STQ* tqOpen(const char* path, SVnode* pVnode) {
92
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
93
  if (pTq == NULL) {
S
Shengliang Guan 已提交
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
95 96
    return NULL;
  }
97

98
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
99
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
100
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
101

102
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
103
  taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle);
104

105
  taosInitRWLatch(&pTq->lock);
106
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
107

108
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
109
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
110

111 112 113 114 115 116 117
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
118 119 120
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
121
  if (tqMetaOpen(pTq) < 0) {
122
    return -1;
123 124
  }

L
Liu Jicong 已提交
125 126
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
127
    return -1;
128 129
  }

130
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
131
  if (pTq->pStreamMeta == NULL) {
132
    return -1;
L
Liu Jicong 已提交
133 134
  }

135 136
  // the version is kept in task's meta data
  // todo check if this version is required or not
137 138
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
139 140
  }

141
  return 0;
L
Liu Jicong 已提交
142
}
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
void tqClose(STQ* pTq) {
145 146
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
147
  }
148 149 150 151 152 153 154 155 156

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
157
}
L
Liu Jicong 已提交
158

H
Haojun Liao 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
      tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
172 173 174
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
175
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
176
      int64_t el = taosGetTimestampMs() - st;
H
Haojun Liao 已提交
177
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
178 179 180 181 182 183
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
  }
}

D
dapan1121 已提交
184 185
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
186 187
  int32_t len = 0;
  int32_t code = 0;
D
dapan1121 已提交
188 189

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
190
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
D
dapan1121 已提交
191 192 193
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
194 195 196 197 198 199 200 201 202 203 204

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

D
dapan1121 已提交
205 206 207
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
208 209 210 211 212 213

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);

D
dapan1121 已提交
214
  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
215
    tEncodeMqDataRsp(&encoder, pRsp);
D
dapan1121 已提交
216
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
X
Xiaoyu Wang 已提交
217
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
dengyihao's avatar
dengyihao 已提交
218
  }
L
Liu Jicong 已提交
219

wmmhello's avatar
wmmhello 已提交
220
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
221 222

  SRpcMsg rsp = {
D
dapan1121 已提交
223
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
224 225 226 227
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
L
Liu Jicong 已提交
228

L
Liu Jicong 已提交
229
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
230 231
  return 0;
}
L
Liu Jicong 已提交
232

H
Haojun Liao 已提交
233
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
234 235 236 237
  SMqDataRsp dataRsp = {0};
  dataRsp.head.consumerId = pHandle->consumerId;
  dataRsp.head.epoch = pHandle->epoch;
  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
238 239

  int64_t sver = 0, ever = 0;
H
Haojun Liao 已提交
240
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
dengyihao's avatar
dengyihao 已提交
241 242
  tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
                  ever);
D
dapan1121 已提交
243

244 245
  char buf1[TSDB_OFFSET_LEN] = {0};
  char buf2[TSDB_OFFSET_LEN] = {0};
246 247
  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
dengyihao's avatar
dengyihao 已提交
248 249
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
          dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
L
Liu Jicong 已提交
250 251 252
  return 0;
}

253 254 255 256 257 258
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
                      int32_t type, int32_t vgId) {
  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
259

260 261 262 263
  char buf1[TSDB_OFFSET_LEN] = {0};
  char buf2[TSDB_OFFSET_LEN] = {0};
  tFormatOffset(buf1, TSDB_OFFSET_LEN, &pRsp->reqOffset);
  tFormatOffset(buf2, TSDB_OFFSET_LEN, &pRsp->rspOffset);
264

265
  tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId,
dengyihao's avatar
dengyihao 已提交
266
          pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
267 268 269 270

  return 0;
}

271
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
272 273
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
274

X
Xiaoyu Wang 已提交
275 276
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
277
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
278 279
    return -1;
  }
280

281 282
  tDecoderClear(&decoder);

283 284 285
  STqOffset* pOffset = &vgOffset.offset;

  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
286
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
287 288 289 290 291 292
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
            pOffset->val.version);
    if (pOffset->val.version + 1 == sversion) {
      pOffset->val.version += 1;
293
    }
294
  } else {
295
    tqError("invalid commit offset type:%d", pOffset->val.type);
296
    return -1;
297
  }
298

299 300
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
301
    tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
302
            vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
303
    return 0;  // no need to update the offset value
304 305
  }

306
  // save the new offset value
307
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
308
    return -1;
309
  }
310

311 312 313
  return 0;
}

314
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
315 316
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
317 318 319

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
320
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
H
Haojun Liao 已提交
321
    tqError("vgId:%d failed to decode seek msg", vgId);
322 323 324 325 326
    return -1;
  }

  tDecoderClear(&decoder);

H
Haojun Liao 已提交
327 328 329
  tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
          vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);

330 331 332
  STqOffset* pOffset = &vgOffset.offset;
  if (pOffset->val.type != TMQ_OFFSET__LOG) {
    tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
333 334 335
    return -1;
  }

336 337
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
  if (pHandle == NULL) {
dengyihao's avatar
dengyihao 已提交
338
    tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey);
339 340
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
341 342
  }

343 344 345 346 347 348 349 350
  // 2. check consumer-vg assignment status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != vgOffset.consumerId) {
    tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
351
  }
352 353
  taosRUnLockLatch(&pTq->lock);

dengyihao's avatar
dengyihao 已提交
354
  // 3. check the offset info
355 356 357 358 359 360
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL) {
    if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
      return 0;  // no need to update the offset value
    }
361

362 363 364 365 366 367
    if (pSavedOffset->val.version == pOffset->val.version) {
      tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
              pOffset->val.version, pSavedOffset->val.version);
      return 0;
    }
  }
368 369 370

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
371 372 373 374
  if (pOffset->val.version < sver) {
    pOffset->val.version = sver;
  } else if (pOffset->val.version > ever) {
    pOffset->val.version = ever;
375 376 377
  }

  // save the new offset value
378 379 380 381
  if (pSavedOffset != NULL) {
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
            pSavedOffset->val.version);
  } else {
dengyihao's avatar
dengyihao 已提交
382
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version);
383
  }
384

385 386
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
    tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
387 388 389
    return -1;
  }

H
Haojun Liao 已提交
390 391 392
  tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
          vgOffset.consumerId, vgOffset.offset.val.version);

393 394 395
  return 0;
}

L
Liu Jicong 已提交
396
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
397
  void* pIter = NULL;
398

L
Liu Jicong 已提交
399
  while (1) {
400
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
401 402 403 404
    if (pIter == NULL) {
      break;
    }

405
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
406

L
Liu Jicong 已提交
407 408
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
409
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
410 411
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
412
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
413 414 415 416 417
          return -1;
        }
      }
    }
  }
418

L
Liu Jicong 已提交
419 420 421
  return 0;
}

422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
  int32_t vgId = TD_VID(pTq->pVnode);
  taosWLockLatch(&pTq->lock);
  if (taosHashGetSize(pTq->pPushMgr) > 0) {
    void* pIter = taosHashIterate(pTq->pPushMgr, NULL);

    while (pIter) {
      STqHandle* pHandle = *(STqHandle**)pIter;
      tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);

      if (ASSERT(pHandle->msg != NULL)) {
        tqError("pHandle->msg should not be null");
        break;
      }else{
        SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
        tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
        taosMemoryFree(pHandle->msg);
        pHandle->msg = NULL;
      }

      pIter = taosHashIterate(pTq->pPushMgr, pIter);
    }

    taosHashClear(pTq->pPushMgr);
  }
wmmhello's avatar
wmmhello 已提交
447
  taosWUnLockLatch(&pTq->lock);
448 449 450
  return 0;
}

D
dapan1121 已提交
451
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
452
  SMqPollReq req = {0};
dengyihao's avatar
dengyihao 已提交
453
  int        code = 0;
D
dapan1121 已提交
454 455 456 457 458 459 460 461 462 463
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);
wmmhello's avatar
wmmhello 已提交
464
  STqHandle*   pHandle = NULL;
D
dapan1121 已提交
465

wmmhello's avatar
wmmhello 已提交
466 467 468 469 470 471 472 473 474 475
  while (1) {
    taosWLockLatch(&pTq->lock);
    // 1. find handle
    pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
    if (pHandle == NULL) {
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_MSG;
      taosWUnLockLatch(&pTq->lock);
      return -1;
    }
D
dapan1121 已提交
476

477 478
    // 2. check re-balance status
    if (pHandle->consumerId != consumerId) {
dengyihao's avatar
dengyihao 已提交
479 480
      tqError("ERROR tmq poll: consumer:0x%" PRIx64
              " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
481
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
482
      terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
483 484 485
      taosWUnLockLatch(&pTq->lock);
      return -1;
    }
wmmhello's avatar
wmmhello 已提交
486

wmmhello's avatar
wmmhello 已提交
487
    bool exec = tqIsHandleExec(pHandle);
dengyihao's avatar
dengyihao 已提交
488
    if (!exec) {
wmmhello's avatar
wmmhello 已提交
489
      tqSetHandleExec(pHandle);
dengyihao's avatar
dengyihao 已提交
490 491 492
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
      tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
              req.subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
493 494 495
      taosWUnLockLatch(&pTq->lock);
      break;
    }
496
    taosWUnLockLatch(&pTq->lock);
497

dengyihao's avatar
dengyihao 已提交
498 499 500
    tqDebug("tmq poll: consumer:0x%" PRIx64
            "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
            consumerId, vgId, req.subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
501
    taosMsleep(10);
D
dapan1121 已提交
502 503 504
  }

  // 3. update the epoch value
505
  if (pHandle->epoch < reqEpoch) {
dengyihao's avatar
dengyihao 已提交
506
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
X
Xiaoyu Wang 已提交
507
            reqEpoch);
D
dapan1121 已提交
508 509 510
    pHandle->epoch = reqEpoch;
  }

511 512
  char buf[TSDB_OFFSET_LEN];
  tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
D
dapan1121 已提交
513 514 515
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);

wmmhello's avatar
wmmhello 已提交
516
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
517
  tqSetHandleIdle(pHandle);
518

dengyihao's avatar
dengyihao 已提交
519 520
  tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId,
          req.subKey, pHandle);
521
  return code;
D
dapan1121 已提交
522 523
}

524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  // 2. check re-balance status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, vgId, req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
  }
  taosRUnLockLatch(&pTq->lock);

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);

  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
  if (pOffset != NULL) {
    if (pOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
    dataRsp.rspOffset.version = pOffset->val.version;
  } else {
    if (req.useSnapshot == true) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;

582
    if (reqOffset.type == TMQ_OFFSET__LOG) {
583
      int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
dengyihao's avatar
dengyihao 已提交
584
      if (currentVer == -1) {  // not start to read data from wal yet, return req offset directly
585 586 587 588
        dataRsp.rspOffset.version = reqOffset.version;
      } else {
        dataRsp.rspOffset.version = currentVer;  // return current consume offset value
      }
589
    } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
590
      dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
591 592 593 594 595 596 597 598 599 600 601 602 603 604 605
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      dataRsp.rspOffset.version = ever;
    } else {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
              reqOffset.type);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }
  }

  tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
  return 0;
}

606
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
607
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
dengyihao's avatar
dengyihao 已提交
608
  int32_t        vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
609

610
  tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
611
  int32_t code = 0;
L
Liu Jicong 已提交
612

wmmhello's avatar
wmmhello 已提交
613
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
614 615
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
616
    while (tqIsHandleExec(pHandle)) {
dengyihao's avatar
dengyihao 已提交
617 618
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId,
              pHandle->subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
619
      taosMsleep(10);
620
    }
621

L
Liu Jicong 已提交
622 623 624
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
625

L
Liu Jicong 已提交
626 627 628 629
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
630
  }
631

L
Liu Jicong 已提交
632 633
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
634
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
635
  }
L
Liu Jicong 已提交
636

L
Liu Jicong 已提交
637
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
638
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
639
  }
wmmhello's avatar
wmmhello 已提交
640 641
  taosWUnLockLatch(&pTq->lock);

L
Liu Jicong 已提交
642
  return 0;
L
Liu Jicong 已提交
643 644
}

645
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
646 647
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
648
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
649
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
650 651 652 653
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
654 655 656 657 658
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
659 660 661 662 663 664
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

665
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
666 667 668 669 670 671 672 673 674 675 676
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

677
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
dengyihao's avatar
dengyihao 已提交
678
  int         ret = 0;
L
Liu Jicong 已提交
679
  SMqRebVgReq req = {0};
dengyihao's avatar
dengyihao 已提交
680
  SDecoder    dc = {0};
681 682 683 684 685 686 687 688 689

  tDecoderInit(&dc, msg, msgLen);

  // decode req
  if (tDecodeSMqRebVgReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
  }
L
Liu Jicong 已提交
690

691
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
D
dapan1121 已提交
692
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
693

694 695 696 697 698 699 700 701
  STqHandle* pHandle = NULL;
  while(1){
    pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
    if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0){
      break;
    }
  }

L
Liu Jicong 已提交
702
  if (pHandle == NULL) {
L
Liu Jicong 已提交
703
    if (req.oldConsumerId != -1) {
704
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
705
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
706
    }
D
dapan1121 已提交
707

L
Liu Jicong 已提交
708
    if (req.newConsumerId == -1) {
709
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
710
      goto end;
L
Liu Jicong 已提交
711
    }
712 713 714 715
    STqHandle handle = {0};
    ret = tqCreateHandle(pTq, &req, &handle);
    if(ret < 0){
      tqDestroyTqHandle(&handle);
716
      goto end;
717
    }
718
    ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
L
Liu Jicong 已提交
719
  } else {
720
    taosWLockLatch(&pTq->lock);
wmmhello's avatar
wmmhello 已提交
721

D
dapan1121 已提交
722
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
dengyihao's avatar
dengyihao 已提交
723 724
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId,
             req.newConsumerId);
725 726 727
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
728 729
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
    }
dengyihao's avatar
dengyihao 已提交
730
    //    atomic_add_fetch_32(&pHandle->epoch, 1);
731

732
    // kill executing task
dengyihao's avatar
dengyihao 已提交
733 734 735 736 737 738 739 740 741 742
    //    if(tqIsHandleExec(pHandle)) {
    //      qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
    //      if (pTaskInfo != NULL) {
    //        qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
    //      }

    //      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
    //        qStreamCloseTsdbReader(pTaskInfo);
    //      }
    //    }
wmmhello's avatar
wmmhello 已提交
743 744
    // remove if it has been register in the push manager, and return one empty block to consumer
    tqUnregisterPushHandle(pTq, pHandle);
745
    taosWUnLockLatch(&pTq->lock);
746
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
L
Liu Jicong 已提交
747
  }
L
Liu Jicong 已提交
748

749
end:
750
  tDecoderClear(&dc);
751
  return ret;
L
Liu Jicong 已提交
752
}
753

dengyihao's avatar
dengyihao 已提交
754
void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
L
liuyao 已提交
755

756
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
D
dapan1121 已提交
757
  int32_t vgId = TD_VID(pTq->pVnode);
758

759
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
760
  pTask->refCnt = 1;
761
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
dengyihao's avatar
dengyihao 已提交
762 763
  pTask->inputQueue = streamQueueOpen(512 << 10);
  pTask->outputQueue = streamQueueOpen(512 << 10);
L
Liu Jicong 已提交
764 765

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
766
    return -1;
L
Liu Jicong 已提交
767 768
  }

L
Liu Jicong 已提交
769 770
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
771
  pTask->pMsgCb = &pTq->pVnode->msgCb;
772
  pTask->pMeta = pTq->pStreamMeta;
773

774
  pTask->chkInfo.version = ver;
775
  pTask->chkInfo.currentVer = ver;
776

777 778 779
  pTask->dataRange.range.maxVer = ver;
  pTask->dataRange.range.minVer = ver;

780
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
L
liuyao 已提交
781 782 783 784 785
    SStreamTask* pSateTask = pTask;
    // if (pTask->info.fillHistory) {
    //   pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t));
    // }
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
786 787 788 789
    if (pTask->pState == NULL) {
      return -1;
    }

790
    SReadHandle handle = {.vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
791
    initStorageAPI(&handle.api);
792

793 794
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
795 796
      return -1;
    }
797

798
    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
799
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
L
liuyao 已提交
800 801 802 803 804
    SStreamTask* pSateTask = pTask;
    // if (pTask->info.fillHistory) {
    //   pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t));
    // }
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
805 806 807
    if (pTask->pState == NULL) {
      return -1;
    }
808

809
    int32_t     numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList);
810 811
    SReadHandle handle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};
    initStorageAPI(&handle.api);
812

813
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
814
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
815 816
      return -1;
    }
817 818

    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
L
Liu Jicong 已提交
819
  }
L
Liu Jicong 已提交
820 821

  // sink
822
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
823
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
824
    pTask->smaSink.smaSink = smaHandleRes;
825
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
826
    pTask->tbSink.vnode = pTq->pVnode;
H
Haojun Liao 已提交
827
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
L
Liu Jicong 已提交
828

X
Xiaoyu Wang 已提交
829
    int32_t   ver1 = 1;
5
54liuyao 已提交
830
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
831
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
832
    if (code == TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
833
      ver1 = info.skmVer;
5
54liuyao 已提交
834
    }
L
Liu Jicong 已提交
835

836 837
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
838
    if (pTask->tbSink.pTSchema == NULL) {
D
dapan1121 已提交
839 840
      return -1;
    }
L
liuyao 已提交
841 842
    pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
    tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
L
Liu Jicong 已提交
843
  }
844

845
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
846
    SWalFilterCond cond = {.deleteMsg = 1};  // delete msg also extract from wal files
847
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
848 849
  }

850
  streamSetupTrigger(pTask);
851

dengyihao's avatar
dengyihao 已提交
852
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId,
853
         pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel);
854 855 856

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
857
  return 0;
L
Liu Jicong 已提交
858
}
L
Liu Jicong 已提交
859

860
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
861 862 863 864
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

865 866
  SStreamTaskCheckReq req;
  SDecoder            decoder;
867

X
Xiaoyu Wang 已提交
868
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
869
  tDecodeStreamTaskCheckReq(&decoder, &req);
870
  tDecoderClear(&decoder);
871

872 873
  int32_t taskId = req.downstreamTaskId;

874 875 876 877 878 879 880 881 882
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
883

L
Liu Jicong 已提交
884
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
885

886
  if (pTask != NULL) {
887
    rsp.status = streamTaskCheckStatus(pTask);
888 889
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

890
    tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d",
891 892
            pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status);
893 894
  } else {
    rsp.status = 0;
895 896
    tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
            taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
897 898 899 900 901
  }

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
902

903
  tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code);
904
  if (code < 0) {
905
    tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId);
L
Liu Jicong 已提交
906
    return -1;
907
  }
L
Liu Jicong 已提交
908

909 910 911 912 913
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
914
  tEncodeStreamTaskCheckRsp(&encoder, &rsp);
915 916
  tEncoderClear(&encoder);

917
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
918

919 920 921 922
  tmsgSendRsp(&rspMsg);
  return 0;
}

923 924 925 926
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
  char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t len = pMsg->contLen - sizeof(SMsgHead);

927 928 929 930
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
931
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
932
  code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
933

934 935 936 937 938
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

939
  tDecoderClear(&decoder);
940 941
  tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
          rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
942

L
Liu Jicong 已提交
943
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
944
  if (pTask == NULL) {
945
    tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
946
            pTq->pStreamMeta->vgId);
947 948 949
    return -1;
  }

950
  code = streamProcessCheckRsp(pTask, &rsp);
L
Liu Jicong 已提交
951 952
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
953 954
}

955
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
956 957 958
  int32_t code = 0;
  int32_t vgId = TD_VID(pTq->pVnode);

5
54liuyao 已提交
959 960 961
  if (tsDisableStream) {
    return 0;
  }
962 963 964 965

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
966
    terrno = TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
967 968
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId,
            (int32_t)sizeof(SStreamTask));
969 970
    return -1;
  }
971

972 973
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
974
  code = tDecodeStreamTask(&decoder, pTask);
975 976 977 978 979
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
980

981 982
  tDecoderClear(&decoder);

983 984
  SStreamMeta* pStreamMeta = pTq->pStreamMeta;

985
  // 2.save task, use the newest commit version as the initial start version of stream task.
986 987 988 989
  taosWLockLatch(&pStreamMeta->lock);
  code = streamMetaAddDeployedTask(pStreamMeta, sversion, pTask);

  int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
990
  if (code < 0) {
991
    tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
992
    taosWUnLockLatch(&pStreamMeta->lock);
993 994 995
    return -1;
  }

996
  taosWUnLockLatch(&pStreamMeta->lock);
997

998
  // 3. It's an fill history task, do nothing. wait for the main task to start it
999
  streamPrepareNdoCheckDownstream(pTask);
1000

1001 1002
  tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr,
          streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
1003

1004 1005 1006
  return 0;
}

1007
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
1008
  int32_t code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1009 1010
  char*   msg = pMsg->pCont;

1011
  SStreamMeta* pMeta = pTq->pStreamMeta;
1012
  SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg;
1013 1014

  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
1015
  if (pTask == NULL) {
1016 1017
    tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
            pMeta->vgId, pReq->taskId);
1018 1019 1020 1021
    return -1;
  }

  // check param
1022
  int64_t fillVer1 = pTask->chkInfo.version;
1023
  if (fillVer1 <= 0) {
1024
    streamMetaReleaseTask(pMeta, pTask);
1025 1026 1027 1028
    return -1;
  }

  // do recovery step 1
1029 1030
  tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus));

H
Haojun Liao 已提交
1031
  int64_t st = taosGetTimestampMs();
1032 1033
  int8_t  schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
                                                      TASK_SCHED_STATUS__WAITING);
1034 1035 1036 1037 1038
  if (schedStatus != TASK_SCHED_STATUS__INACTIVE) {
    ASSERT(0);
    return 0;
  }

1039
  streamSourceScanHistoryData(pTask);
1040
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
H
Haojun Liao 已提交
1041
    tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);
1042
    streamMetaReleaseTask(pMeta, pTask);
L
Liu Jicong 已提交
1043 1044 1045
    return 0;
  }

H
Haojun Liao 已提交
1046
  double el = (taosGetTimestampMs() - st) / 1000.0;
H
Haojun Liao 已提交
1047
  tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
1048

H
Haojun Liao 已提交
1049 1050
  if (pTask->info.fillHistory) {
    // 1. stop the related stream task, get the current scan wal version of stream task, ver.
H
Haojun Liao 已提交
1051 1052 1053 1054
    SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
    if (pStreamTask == NULL) {
      // todo handle error
    }
L
liuyao 已提交
1055 1056
    // streamTaskReleaseState(pTask);
    // streamTaskReloadState(pStreamTask);
H
Haojun Liao 已提交
1057

1058 1059 1060
    ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);

    // wait for the stream task get ready for scan history data
1061 1062
    while (((pStreamTask->status.checkDownstream == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) ||
           pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
1063 1064 1065 1066 1067
      tqDebug("s-task:%s level:%d not ready for halt, wait for 100ms and recheck", pStreamTask->id.idStr,
              pStreamTask->info.taskLevel);
      taosMsleep(100);
    }

1068
    // now we can stop the stream task execution
1069
    pStreamTask->status.taskStatus = TASK_STATUS__HALT;
1070 1071
    tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr,
            pStreamTask->info.taskLevel, pTask->id.idStr);
H
Haojun Liao 已提交
1072 1073

    // if it's an source task, extract the last version in wal.
1074
    int64_t ver = pTask->dataRange.range.maxVer + 1;
H
Haojun Liao 已提交
1075
    int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
1076 1077 1078
    if (latestVer >= ver) {
      ver = latestVer;
    }
H
Haojun Liao 已提交
1079

H
Haojun Liao 已提交
1080
    // 2. do secondary scan of the history data, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1]
1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093
    SVersionRange* pRange = &pTask->dataRange.range;

    pRange->minVer = pRange->maxVer + 1;
    pRange->maxVer = ver;
    if (pRange->minVer == pRange->maxVer) {
      tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest", pTask->id.idStr);
    } else {
      tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64
              " do secondary scan-history-data after halt the related stream task:%s",
              pTask->id.idStr, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr);

      ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
      st = taosGetTimestampMs();
H
Haojun Liao 已提交
1094

1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106
      streamSetParamForStreamScanner(pTask, pRange, &pTask->dataRange.window);

      streamSourceScanHistoryData(pTask);
      if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
        tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);
        streamMetaReleaseTask(pMeta, pTask);
        return 0;
      }

      el = (taosGetTimestampMs() - st) / 1000.0;
      tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pTask->id.idStr, el);
    }
H
Haojun Liao 已提交
1107

H
Haojun Liao 已提交
1108
    // 3. notify the downstream tasks to transfer executor state after handle all history blocks.
1109 1110
    pTask->status.transferState = true;

H
Haojun Liao 已提交
1111 1112 1113 1114
    code = streamDispatchTransferStateMsg(pTask);
    if (code != TSDB_CODE_SUCCESS) {
      // todo handle error
    }
H
Haojun Liao 已提交
1115 1116 1117

    // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
    // 5. resume the related stream task.
1118
    streamTryExec(pTask);
H
Haojun Liao 已提交
1119

1120 1121 1122 1123
    pTask->status.taskStatus = TASK_STATUS__DROPPING;
    tqDebug("s-task:%s set status to be dropping", pTask->id.idStr);

    streamMetaSaveTask(pMeta, pTask);
1124
    streamMetaSaveTask(pMeta, pStreamTask);
1125

1126
    streamMetaReleaseTask(pMeta, pTask);
1127 1128
    streamMetaReleaseTask(pMeta, pStreamTask);

1129 1130
    if (streamMetaCommit(pTask->pMeta) < 0) {
      // persist to disk
1131
    }
1132 1133 1134 1135
  } else {
    // todo update the chkInfo version for current task.
    // this task has an associated history stream task, so we need to scan wal from the end version of
    // history scan. The current version of chkInfo.current is not updated during the history scan
1136 1137 1138 1139 1140 1141 1142 1143 1144 1145
    if (pTask->historyTaskId.taskId == 0) {
      pTask->dataRange.window.ekey = INT64_MAX;
      pTask->dataRange.window.skey = INT64_MIN;
      tqDebug("s-task:%s without associated stream task, reset the time window:%"PRId64" - %"PRId64, pTask->id.idStr,
              pTask->dataRange.window.skey, pTask->dataRange.window.ekey);
    } else {
      tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64
              ", window:%" PRId64 " - %" PRId64,
              pTask->id.idStr, pTask->chkInfo.currentVer, pTask->dataRange.window.skey, pTask->dataRange.window.ekey);
    }
1146 1147 1148 1149 1150

    code = streamTaskScanHistoryDataComplete(pTask);
    streamMetaReleaseTask(pMeta, pTask);
    return code;
  }
H
Haojun Liao 已提交
1151

1152 1153 1154
  return 0;
}

H
Haojun Liao 已提交
1155 1156
// notify the downstream tasks to transfer executor state after handle all history blocks.
int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1157 1158 1159 1160 1161
  SStreamTransferReq req;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  int32_t code = tDecodeStreamRecoverFinishReq(&decoder, &req);
H
Haojun Liao 已提交
1162

1163
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1164
  if (pTask == NULL) {
1165
    tqError("failed to find task:0x%x", req.taskId);
1166 1167 1168
    return -1;
  }

H
Haojun Liao 已提交
1169 1170 1171 1172 1173
  ASSERT(pTask->streamTaskId.taskId != 0);
  pTask->status.transferState = true;  // persistent data?

#if 0
  // do check if current task handle all data in the input queue
H
Haojun Liao 已提交
1174
  int64_t st = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
1175
  tqDebug("s-task:%s start step2 recover, ts:%" PRId64, pTask->id.idStr, st);
H
Haojun Liao 已提交
1176

1177
  code = streamSourceRecoverScanStep2(pTask, sversion);
1178
  if (code < 0) {
L
Liu Jicong 已提交
1179
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1180 1181 1182
    return -1;
  }

1183
  qDebug("s-task:%s set start wal scan start ver:%"PRId64, pTask->id.idStr, sversion);
1184

1185
  walReaderSeekVer(pTask->exec.pWalReader, sversion);
L
liuyao 已提交
1186
  pTask->chkInfo.currentVer = sversion;
1187

1188
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1189 1190 1191 1192
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1193 1194 1195
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1196
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1197 1198 1199 1200
    return -1;
  }

  // set status normal
H
Haojun Liao 已提交
1201
  tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr);
1202 1203
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1204
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1205 1206 1207
    return -1;
  }

dengyihao's avatar
dengyihao 已提交
1208
  double el = (taosGetTimestampMs() - st) / 1000.0;
H
Haojun Liao 已提交
1209
  tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1210

1211
  // dispatch recover finish req to all related downstream task
1212
  code = streamDispatchScanHistoryFinishMsg(pTask);
1213
  if (code < 0) {
L
Liu Jicong 已提交
1214
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1215 1216 1217
    return -1;
  }

1218
  atomic_store_8(&pTask->info.fillHistory, 0);
L
Liu Jicong 已提交
1219
  streamMetaSaveTask(pTq->pStreamMeta, pTask);
H
Haojun Liao 已提交
1220
#endif
L
Liu Jicong 已提交
1221

H
Haojun Liao 已提交
1222
  streamSchedExec(pTask);
L
Liu Jicong 已提交
1223
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
H
Haojun Liao 已提交
1224

1225 1226 1227
  return 0;
}

L
Liu Jicong 已提交
1228 1229 1230
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1231 1232

  // deserialize
1233 1234 1235
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1236
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
H
Haojun Liao 已提交
1237
  tDecodeStreamRecoverFinishReq(&decoder, &req);
1238 1239
  tDecoderClear(&decoder);

1240
  // find task
L
Liu Jicong 已提交
1241
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1242 1243 1244
  if (pTask == NULL) {
    return -1;
  }
1245
  // do process request
1246
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
1247
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1248 1249 1250
    return -1;
  }

L
Liu Jicong 已提交
1251
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1252
  return 0;
L
Liu Jicong 已提交
1253
}
L
Liu Jicong 已提交
1254

L
Liu Jicong 已提交
1255 1256 1257 1258 1259
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

1260 1261 1262 1263
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

H
Haojun Liao 已提交
1264 1265
  *pRefBlock = NULL;

1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tDecoderInit(pCoder, (uint8_t*)pData, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
  if (numOfTables == 0 || pRes->affectedRows == 0) {
    taosArrayDestroy(pRes->uidList);
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, numOfTables);
  pDelBlock->info.rows = numOfTables;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < numOfTables; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataSetVal(pUidCol, i, (const char*)pUid, false);

    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

  taosArrayDestroy(pRes->uidList);
  *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
  if (pRefBlock == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
  (*pRefBlock)->pBlock = pDelBlock;
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1313 1314
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1315 1316 1317 1318

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1319 1320 1321 1322 1323 1324
  if (taskId == STREAM_TASK_STATUS_CHECK_ID) {
    tqStreamTasksStatusCheck(pTq);
    return 0;
  }

  if (taskId == EXTRACT_DATA_FROM_WAL_ID) {  // all tasks are extracted submit data from the wal
1325
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1326
    return 0;
1327
  }
1328

1329
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1330
  if (pTask != NULL) {
1331 1332 1333
    int8_t status = pTask->status.taskStatus;
    if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) {
      tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
dengyihao's avatar
dengyihao 已提交
1334
              pTask->chkInfo.version);
1335
      streamProcessRunReq(pTask);
1336
    } else {
1337
      if (streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
1338
        atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
L
liuyao 已提交
1339
      }
1340 1341 1342

      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
              pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
1343
    }
1344

L
Liu Jicong 已提交
1345
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1346
    tqStartStreamTasks(pTq);
L
Liu Jicong 已提交
1347
    return 0;
1348
  } else {
1349
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
1350
    return -1;
L
Liu Jicong 已提交
1351
  }
L
Liu Jicong 已提交
1352 1353
}

L
Liu Jicong 已提交
1354
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
dengyihao's avatar
dengyihao 已提交
1355 1356 1357
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1358 1359 1360 1361

  SStreamDispatchReq req = {0};

  SDecoder decoder;
L
Liu Jicong 已提交
1362
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1363
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1364

1365
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1366
  if (pTask) {
1367
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1368
    streamProcessDispatchMsg(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1369
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1370
    return 0;
1371
  } else {
L
liuyao 已提交
1372
    tDeleteStreamDispatchReq(&req);
1373
    return -1;
L
Liu Jicong 已提交
1374
  }
L
Liu Jicong 已提交
1375 1376
}

L
Liu Jicong 已提交
1377 1378
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1379
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1380
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1381 1382

  int32_t vgId = pTq->pStreamMeta->vgId;
L
Liu Jicong 已提交
1383
  if (pTask) {
1384
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1385
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1386
    return 0;
1387
  } else {
1388
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
1389
    return TSDB_CODE_INVALID_MSG;
L
Liu Jicong 已提交
1390
  }
L
Liu Jicong 已提交
1391
}
L
Liu Jicong 已提交
1392

1393
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1394
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1395 1396
  tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);

1397
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1398
  return 0;
L
Liu Jicong 已提交
1399
}
L
Liu Jicong 已提交
1400

5
54liuyao 已提交
1401 1402
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
dengyihao's avatar
dengyihao 已提交
1403
  SStreamTask*          pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
L
liuyao 已提交
1404 1405
  if (pTask) {
    tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
L
liuyao 已提交
1406
    atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
L
liuyao 已提交
1407 1408 1409
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  }
5
54liuyao 已提交
1410 1411 1412 1413 1414
  return 0;
}

int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
1415 1416

  int32_t      vgId = pTq->pStreamMeta->vgId;
L
liuyao 已提交
1417 1418
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask) {
L
liuyao 已提交
1419
    atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
1420 1421

    // no lock needs to secure the access of the version
L
liuyao 已提交
1422
    if (pReq->igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
1423 1424 1425 1426 1427
      // discard all the data  when the stream task is suspended.
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
              ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
1428
    } else {  // from the previous paused version and go on
1429 1430
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
1431 1432
    }

L
liuyao 已提交
1433 1434 1435
    if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
      streamStartRecoverTask(pTask, pReq->igUntreated);
    } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
L
liuyao 已提交
1436 1437 1438 1439
      tqStartStreamTasks(pTq);
    } else {
      streamSchedExec(pTask);
    }
L
liuyao 已提交
1440
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1441 1442
  } else {
    tqError("vgId:%d failed to find the s-task:0x%x for resume stream task", vgId, pReq->taskId);
L
liuyao 已提交
1443
  }
1444

5
54liuyao 已提交
1445 1446 1447
  return 0;
}

L
Liu Jicong 已提交
1448 1449 1450 1451 1452 1453
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1454
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1455
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1456
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1457
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1458
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1459
  if (pTask) {
1460
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
1461
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1462
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1463
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1464
    return 0;
L
Liu Jicong 已提交
1465
  } else {
L
liuyao 已提交
1466
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1467
    return -1;
L
Liu Jicong 已提交
1468 1469 1470 1471 1472 1473 1474
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1475

1476 1477 1478 1479 1480 1481
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1482 1483 1484

  SStreamDispatchReq req;
  SDecoder           decoder;
1485
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1486 1487
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1488
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1489 1490
    goto FAIL;
  }
L
Liu Jicong 已提交
1491
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1492

L
Liu Jicong 已提交
1493
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1494

L
Liu Jicong 已提交
1495
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1496
  if (pTask) {
1497
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1498
    streamProcessDispatchMsg(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1499
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1500 1501
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1502
    return 0;
5
54liuyao 已提交
1503 1504
  } else {
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1505
  }
L
Liu Jicong 已提交
1506

1507 1508
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1509
FAIL:
1510 1511 1512 1513
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1514
    SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info};
1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1531
  SRpcMsg rsp = {
1532
      .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
1533
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1534
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1535 1536
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1537
  return -1;
L
Liu Jicong 已提交
1538
}
L
Liu Jicong 已提交
1539

1540
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1541