tq.c 54.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
static int32_t tqInitialize(STQ* pTq);
dengyihao's avatar
dengyihao 已提交
22

wmmhello's avatar
wmmhello 已提交
23
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
dengyihao's avatar
dengyihao 已提交
24 25
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
wmmhello's avatar
wmmhello 已提交
26

L
Liu Jicong 已提交
27
int32_t tqInit() {
L
Liu Jicong 已提交
28 29 30 31 32 33
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

34 35 36 37 38 39
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
40 41 42
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
43
    atomic_store_8(&tqMgmt.inited, 1);
44
  }
45

L
Liu Jicong 已提交
46 47
  return 0;
}
L
Liu Jicong 已提交
48

49
void tqCleanUp() {
L
Liu Jicong 已提交
50 51 52 53 54 55 56 57
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
58
    streamCleanUp();
L
Liu Jicong 已提交
59 60
    atomic_store_8(&tqMgmt.inited, 0);
  }
61
}
L
Liu Jicong 已提交
62

63
void tqDestroyTqHandle(void* data) {
64 65
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
wmmhello's avatar
wmmhello 已提交
66

67
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
68
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
69
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
70
    tqReaderClose(pData->execHandle.pTqReader);
71 72
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
73
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
74
    walCloseReader(pData->pWalReader);
75
    tqReaderClose(pData->execHandle.pTqReader);
76 77
    taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
    nodesDestroyNode(pData->execHandle.execTb.node);
78
  }
dengyihao's avatar
dengyihao 已提交
79
  if (pData->msg != NULL) {
80 81 82
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
D
dapan1121 已提交
83
  }
L
Liu Jicong 已提交
84 85
}

86 87 88 89 90
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
91
STQ* tqOpen(const char* path, SVnode* pVnode) {
92
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
93
  if (pTq == NULL) {
S
Shengliang Guan 已提交
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
95 96
    return NULL;
  }
97

98
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
99
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
100
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
101

102
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
103
  taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle);
104

105
  taosInitRWLatch(&pTq->lock);
106
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
107

108
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
109
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
110

111 112 113 114 115 116 117
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
118 119 120
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
121
  if (tqMetaOpen(pTq) < 0) {
122
    return -1;
123 124
  }

L
Liu Jicong 已提交
125 126
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
127
    return -1;
128 129
  }

130
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
131
  if (pTq->pStreamMeta == NULL) {
132
    return -1;
L
Liu Jicong 已提交
133 134
  }

135 136
  // the version is kept in task's meta data
  // todo check if this version is required or not
137 138
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
139 140
  }

141
  return 0;
L
Liu Jicong 已提交
142
}
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
void tqClose(STQ* pTq) {
145 146
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
147
  }
148 149 150 151 152 153 154 155 156

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
157
}
L
Liu Jicong 已提交
158

159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
  bool inTimer = false;

  taosWLockLatch(&pMeta->lock);

  void* pIter = NULL;
  while(1) {
    pIter = taosHashIterate(pMeta->pTasks, pIter);
    if (pIter == NULL) {
      break;
    }

    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->status.timerActive == 1) {
      inTimer = true;
    }
  }

  taosWUnLockLatch(&pMeta->lock);

  return inTimer;
}

H
Haojun Liao 已提交
182 183 184 185 186 187 188 189 190 191 192 193
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
H
Haojun Liao 已提交
194
      tqDebug("vgId:%d s-task:%s set closing flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
195 196 197
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
198
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
H
Haojun Liao 已提交
199

200
      int64_t el = taosGetTimestampMs() - st;
H
Haojun Liao 已提交
201
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
202 203 204
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
H
Haojun Liao 已提交
205 206 207 208 209

    tqDebug("vgId:%d start to check all tasks", pTq->pStreamMeta->vgId);

    int64_t st = taosGetTimestampMs();

210 211 212
    while(hasStreamTaskInTimer(pTq->pStreamMeta)) {
      tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pTq->pStreamMeta->vgId);
      taosMsleep(100);
H
Haojun Liao 已提交
213 214 215 216
    }

    int64_t el = taosGetTimestampMs() - st;
    tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%"PRId64" ms", pTq->pStreamMeta->vgId, el);
H
Haojun Liao 已提交
217 218 219
  }
}

220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
//static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
//                             int64_t consumerId, int32_t type) {
//  int32_t len = 0;
//  int32_t code = 0;
//
//  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) {
//    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
//  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
//    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
//  }
//
//  if (code < 0) {
//    return -1;
//  }
//
//  int32_t tlen = sizeof(SMqRspHead) + len;
//  void*   buf = rpcMallocCont(tlen);
//  if (buf == NULL) {
//    return -1;
//  }
//
//  ((SMqRspHead*)buf)->mqMsgType = type;
//  ((SMqRspHead*)buf)->epoch = epoch;
//  ((SMqRspHead*)buf)->consumerId = consumerId;
//
//  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
//
//  SEncoder encoder = {0};
//  tEncoderInit(&encoder, abuf, len);
//
//  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) {
//    tEncodeMqDataRsp(&encoder, pRsp);
//  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
//    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
//  }
//
//  tEncoderClear(&encoder);
//
//  SRpcMsg rsp = {
//      .info = *pRpcHandleInfo,
//      .pCont = buf,
//      .contLen = tlen,
//      .code = 0,
//  };
//
//  tmsgSendRsp(&rsp);
//  return 0;
//}
L
Liu Jicong 已提交
268

H
Haojun Liao 已提交
269
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
270 271 272
  SMqDataRsp dataRsp = {0};
  dataRsp.head.consumerId = pHandle->consumerId;
  dataRsp.head.epoch = pHandle->epoch;
273
  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_DATA_RSP;
274 275

  int64_t sver = 0, ever = 0;
H
Haojun Liao 已提交
276
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
277
  tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_DATA_RSP, sver,
dengyihao's avatar
dengyihao 已提交
278
                  ever);
D
dapan1121 已提交
279

280 281
  char buf1[TSDB_OFFSET_LEN] = {0};
  char buf2[TSDB_OFFSET_LEN] = {0};
282 283
  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
dengyihao's avatar
dengyihao 已提交
284 285
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
          dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
L
Liu Jicong 已提交
286 287 288
  return 0;
}

289 290 291 292 293 294
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
                      int32_t type, int32_t vgId) {
  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
295

296 297 298 299
  char buf1[TSDB_OFFSET_LEN] = {0};
  char buf2[TSDB_OFFSET_LEN] = {0};
  tFormatOffset(buf1, TSDB_OFFSET_LEN, &pRsp->reqOffset);
  tFormatOffset(buf2, TSDB_OFFSET_LEN, &pRsp->rspOffset);
300

301
  tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId,
dengyihao's avatar
dengyihao 已提交
302
          pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
303 304 305 306

  return 0;
}

307
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
308 309
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
310

X
Xiaoyu Wang 已提交
311 312
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
313
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
314 315
    return -1;
  }
316

317 318
  tDecoderClear(&decoder);

319 320 321
  STqOffset* pOffset = &vgOffset.offset;

  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
322
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
323 324 325 326 327 328
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
            pOffset->val.version);
    if (pOffset->val.version + 1 == sversion) {
      pOffset->val.version += 1;
329
    }
330
  } else {
331
    tqError("invalid commit offset type:%d", pOffset->val.type);
332
    return -1;
333
  }
334

335 336
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
337
    tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
338
            vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
339
    return 0;  // no need to update the offset value
340 341
  }

342
  // save the new offset value
343
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
344
    return -1;
345
  }
346

347 348 349
  return 0;
}

350
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
351 352
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
353 354 355

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
356
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
H
Haojun Liao 已提交
357
    tqError("vgId:%d failed to decode seek msg", vgId);
358 359 360 361 362
    return -1;
  }

  tDecoderClear(&decoder);

H
Haojun Liao 已提交
363 364 365
  tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
          vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);

366 367 368
  STqOffset* pOffset = &vgOffset.offset;
  if (pOffset->val.type != TMQ_OFFSET__LOG) {
    tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
369 370 371
    return -1;
  }

372 373
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
  if (pHandle == NULL) {
dengyihao's avatar
dengyihao 已提交
374
    tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey);
375 376
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
377 378
  }

379 380 381 382 383 384 385 386
  // 2. check consumer-vg assignment status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != vgOffset.consumerId) {
    tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
387
  }
388 389
  taosRUnLockLatch(&pTq->lock);

dengyihao's avatar
dengyihao 已提交
390
  // 3. check the offset info
391 392 393 394 395 396
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL) {
    if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
      return 0;  // no need to update the offset value
    }
397

398 399 400 401 402 403
    if (pSavedOffset->val.version == pOffset->val.version) {
      tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
              pOffset->val.version, pSavedOffset->val.version);
      return 0;
    }
  }
404 405 406

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
407 408 409 410
  if (pOffset->val.version < sver) {
    pOffset->val.version = sver;
  } else if (pOffset->val.version > ever) {
    pOffset->val.version = ever;
411 412 413
  }

  // save the new offset value
414 415 416 417
  if (pSavedOffset != NULL) {
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
            pSavedOffset->val.version);
  } else {
dengyihao's avatar
dengyihao 已提交
418
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version);
419
  }
420

421 422
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
    tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
423 424 425
    return -1;
  }

H
Haojun Liao 已提交
426 427 428
  tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
          vgOffset.consumerId, vgOffset.offset.val.version);

429 430 431
  return 0;
}

L
Liu Jicong 已提交
432
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
433
  void* pIter = NULL;
434

L
Liu Jicong 已提交
435
  while (1) {
436
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
437 438 439 440
    if (pIter == NULL) {
      break;
    }

441
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
442

L
Liu Jicong 已提交
443 444
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
445
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
446 447
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
448
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
449 450 451 452 453
          return -1;
        }
      }
    }
  }
454

L
Liu Jicong 已提交
455 456 457
  return 0;
}

458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
  int32_t vgId = TD_VID(pTq->pVnode);
  taosWLockLatch(&pTq->lock);
  if (taosHashGetSize(pTq->pPushMgr) > 0) {
    void* pIter = taosHashIterate(pTq->pPushMgr, NULL);

    while (pIter) {
      STqHandle* pHandle = *(STqHandle**)pIter;
      tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);

      if (ASSERT(pHandle->msg != NULL)) {
        tqError("pHandle->msg should not be null");
        break;
      }else{
        SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
        tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
        taosMemoryFree(pHandle->msg);
        pHandle->msg = NULL;
      }

      pIter = taosHashIterate(pTq->pPushMgr, pIter);
    }

    taosHashClear(pTq->pPushMgr);
  }
wmmhello's avatar
wmmhello 已提交
483
  taosWUnLockLatch(&pTq->lock);
484 485 486
  return 0;
}

D
dapan1121 已提交
487
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
488
  SMqPollReq req = {0};
dengyihao's avatar
dengyihao 已提交
489
  int        code = 0;
D
dapan1121 已提交
490 491 492 493 494 495 496 497 498 499
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);
wmmhello's avatar
wmmhello 已提交
500
  STqHandle*   pHandle = NULL;
D
dapan1121 已提交
501

wmmhello's avatar
wmmhello 已提交
502 503 504 505 506 507 508 509 510 511
  while (1) {
    taosWLockLatch(&pTq->lock);
    // 1. find handle
    pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
    if (pHandle == NULL) {
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_MSG;
      taosWUnLockLatch(&pTq->lock);
      return -1;
    }
D
dapan1121 已提交
512

513 514
    // 2. check re-balance status
    if (pHandle->consumerId != consumerId) {
dengyihao's avatar
dengyihao 已提交
515 516
      tqError("ERROR tmq poll: consumer:0x%" PRIx64
              " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
517
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
518
      terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
519 520 521
      taosWUnLockLatch(&pTq->lock);
      return -1;
    }
wmmhello's avatar
wmmhello 已提交
522

wmmhello's avatar
wmmhello 已提交
523
    bool exec = tqIsHandleExec(pHandle);
dengyihao's avatar
dengyihao 已提交
524
    if (!exec) {
wmmhello's avatar
wmmhello 已提交
525
      tqSetHandleExec(pHandle);
dengyihao's avatar
dengyihao 已提交
526 527 528
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
      tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
              req.subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
529 530 531
      taosWUnLockLatch(&pTq->lock);
      break;
    }
532
    taosWUnLockLatch(&pTq->lock);
533

dengyihao's avatar
dengyihao 已提交
534 535 536
    tqDebug("tmq poll: consumer:0x%" PRIx64
            "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
            consumerId, vgId, req.subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
537
    taosMsleep(10);
D
dapan1121 已提交
538 539 540
  }

  // 3. update the epoch value
541
  if (pHandle->epoch < reqEpoch) {
dengyihao's avatar
dengyihao 已提交
542
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
X
Xiaoyu Wang 已提交
543
            reqEpoch);
D
dapan1121 已提交
544 545 546
    pHandle->epoch = reqEpoch;
  }

547 548
  char buf[TSDB_OFFSET_LEN];
  tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
D
dapan1121 已提交
549 550 551
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);

wmmhello's avatar
wmmhello 已提交
552
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
553
  tqSetHandleIdle(pHandle);
554

dengyihao's avatar
dengyihao 已提交
555 556
  tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId,
          req.subKey, pHandle);
557
  return code;
D
dapan1121 已提交
558 559
}

560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  // 2. check re-balance status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, vgId, req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
  }
  taosRUnLockLatch(&pTq->lock);

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);

  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
  if (pOffset != NULL) {
    if (pOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
    dataRsp.rspOffset.version = pOffset->val.version;
  } else {
    if (req.useSnapshot == true) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;

618
    if (reqOffset.type == TMQ_OFFSET__LOG) {
619
      int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
dengyihao's avatar
dengyihao 已提交
620
      if (currentVer == -1) {  // not start to read data from wal yet, return req offset directly
621 622 623 624
        dataRsp.rspOffset.version = reqOffset.version;
      } else {
        dataRsp.rspOffset.version = currentVer;  // return current consume offset value
      }
625
    } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
626
      dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
627 628 629 630 631 632 633 634 635 636 637 638 639 640 641
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      dataRsp.rspOffset.version = ever;
    } else {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
              reqOffset.type);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }
  }

  tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
  return 0;
}

642
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
643
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
dengyihao's avatar
dengyihao 已提交
644
  int32_t        vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
645

646
  tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
647
  int32_t code = 0;
L
Liu Jicong 已提交
648

wmmhello's avatar
wmmhello 已提交
649
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
650 651
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
652
    while (tqIsHandleExec(pHandle)) {
dengyihao's avatar
dengyihao 已提交
653 654
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId,
              pHandle->subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
655
      taosMsleep(10);
656
    }
657

L
Liu Jicong 已提交
658 659 660
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
661

L
Liu Jicong 已提交
662 663 664 665
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
666
  }
667

L
Liu Jicong 已提交
668 669
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
670
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
671
  }
L
Liu Jicong 已提交
672

L
Liu Jicong 已提交
673
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
674
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
675
  }
wmmhello's avatar
wmmhello 已提交
676 677
  taosWUnLockLatch(&pTq->lock);

L
Liu Jicong 已提交
678
  return 0;
L
Liu Jicong 已提交
679 680
}

681
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
682 683
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
684
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
685
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
686 687 688 689
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
690 691 692 693 694
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
695 696 697 698 699 700
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

701
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
702 703 704 705 706 707 708 709 710 711 712
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

713
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
dengyihao's avatar
dengyihao 已提交
714
  int         ret = 0;
L
Liu Jicong 已提交
715
  SMqRebVgReq req = {0};
dengyihao's avatar
dengyihao 已提交
716
  SDecoder    dc = {0};
717 718 719 720 721 722 723 724 725

  tDecoderInit(&dc, msg, msgLen);

  // decode req
  if (tDecodeSMqRebVgReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
  }
L
Liu Jicong 已提交
726

727
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
D
dapan1121 已提交
728
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
729

730 731 732 733 734 735 736 737
  STqHandle* pHandle = NULL;
  while(1){
    pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
    if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0){
      break;
    }
  }

L
Liu Jicong 已提交
738
  if (pHandle == NULL) {
L
Liu Jicong 已提交
739
    if (req.oldConsumerId != -1) {
740
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
741
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
742
    }
L
Liu Jicong 已提交
743
    if (req.newConsumerId == -1) {
744
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
745
      goto end;
L
Liu Jicong 已提交
746
    }
747 748 749 750
    STqHandle handle = {0};
    ret = tqCreateHandle(pTq, &req, &handle);
    if(ret < 0){
      tqDestroyTqHandle(&handle);
751
      goto end;
752
    }
753
    ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
L
Liu Jicong 已提交
754
  } else {
755
    taosWLockLatch(&pTq->lock);
wmmhello's avatar
wmmhello 已提交
756

D
dapan1121 已提交
757
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
dengyihao's avatar
dengyihao 已提交
758 759
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId,
             req.newConsumerId);
760 761 762
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
763 764
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
    }
dengyihao's avatar
dengyihao 已提交
765
    //    atomic_add_fetch_32(&pHandle->epoch, 1);
766

767
    // kill executing task
dengyihao's avatar
dengyihao 已提交
768 769 770 771 772 773 774 775 776 777
    //    if(tqIsHandleExec(pHandle)) {
    //      qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
    //      if (pTaskInfo != NULL) {
    //        qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
    //      }

    //      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
    //        qStreamCloseTsdbReader(pTaskInfo);
    //      }
    //    }
wmmhello's avatar
wmmhello 已提交
778 779
    // remove if it has been register in the push manager, and return one empty block to consumer
    tqUnregisterPushHandle(pTq, pHandle);
780
    taosWUnLockLatch(&pTq->lock);
781
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
L
Liu Jicong 已提交
782
  }
L
Liu Jicong 已提交
783

784
end:
785
  tDecoderClear(&dc);
786
  return ret;
L
Liu Jicong 已提交
787
}
788

dengyihao's avatar
dengyihao 已提交
789
void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
L
liuyao 已提交
790

791
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
D
dapan1121 已提交
792
  int32_t vgId = TD_VID(pTq->pVnode);
793

794
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
795
  pTask->refCnt = 1;
796
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
dengyihao's avatar
dengyihao 已提交
797 798
  pTask->inputQueue = streamQueueOpen(512 << 10);
  pTask->outputQueue = streamQueueOpen(512 << 10);
L
Liu Jicong 已提交
799 800

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
801
    return -1;
L
Liu Jicong 已提交
802 803
  }

L
Liu Jicong 已提交
804 805
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
806
  pTask->pMsgCb = &pTq->pVnode->msgCb;
807
  pTask->pMeta = pTq->pStreamMeta;
808

809
  pTask->chkInfo.version = ver;
810
  pTask->chkInfo.currentVer = ver;
811

812 813
  pTask->dataRange.range.maxVer = ver;
  pTask->dataRange.range.minVer = ver;
814

815
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
L
liuyao 已提交
816
    SStreamTask* pSateTask = pTask;
L
liuyao 已提交
817
    SStreamTask task = {0};
L
liuyao 已提交
818
    if (pTask->info.fillHistory) {
L
liuyao 已提交
819 820 821
      task.id = pTask->streamTaskId;
      task.pMeta = pTask->pMeta;
      pSateTask = &task;
L
liuyao 已提交
822
    }
823

L
liuyao 已提交
824
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
825 826 827 828
    if (pTask->pState == NULL) {
      return -1;
    }

L
liuyao 已提交
829 830 831 832 833
    SReadHandle handle = {.vnode = pTq->pVnode,
                          .initTqReader = 1,
                          .pStateBackend = pTask->pState,
                          .fillHistory = pTask->info.fillHistory,
                          .winRange = pTask->dataRange.window};
834
    initStorageAPI(&handle.api);
835

836 837
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
838 839
      return -1;
    }
840

841
    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
842
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
L
liuyao 已提交
843
    SStreamTask* pSateTask = pTask;
L
liuyao 已提交
844
    SStreamTask task = {0};
L
liuyao 已提交
845
    if (pTask->info.fillHistory) {
L
liuyao 已提交
846 847 848
      task.id = pTask->streamTaskId;
      task.pMeta = pTask->pMeta;
      pSateTask = &task;
L
liuyao 已提交
849
    }
L
liuyao 已提交
850
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
851 852 853
    if (pTask->pState == NULL) {
      return -1;
    }
854

855
    int32_t     numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList);
L
liuyao 已提交
856 857 858 859 860
    SReadHandle handle = {.vnode = NULL,
                          .numOfVgroups = numOfVgroups,
                          .pStateBackend = pTask->pState,
                          .fillHistory = pTask->info.fillHistory,
                          .winRange = pTask->dataRange.window};
861
    initStorageAPI(&handle.api);
862

863
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
864
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
865 866
      return -1;
    }
867 868

    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
L
Liu Jicong 已提交
869
  }
L
Liu Jicong 已提交
870 871

  // sink
872
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
873
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
874
    pTask->smaSink.smaSink = smaHandleRes;
875
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
876
    pTask->tbSink.vnode = pTq->pVnode;
H
Haojun Liao 已提交
877
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
L
Liu Jicong 已提交
878

X
Xiaoyu Wang 已提交
879
    int32_t   ver1 = 1;
5
54liuyao 已提交
880
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
881
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
882
    if (code == TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
883
      ver1 = info.skmVer;
5
54liuyao 已提交
884
    }
L
Liu Jicong 已提交
885

886 887
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
888
    if (pTask->tbSink.pTSchema == NULL) {
D
dapan1121 已提交
889 890
      return -1;
    }
L
liuyao 已提交
891 892
    pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
    tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
L
Liu Jicong 已提交
893
  }
894

895
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
896
    SWalFilterCond cond = {.deleteMsg = 1};  // delete msg also extract from wal files
897
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
898 899
  }

900
  streamSetupScheduleTrigger(pTask);
901

902 903
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64
         " child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms",
904
         vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel,
905
         pTask->info.fillHistory, pTask->triggerParam);
906 907 908

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
909
  return 0;
L
Liu Jicong 已提交
910
}
L
Liu Jicong 已提交
911

912
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
913 914 915 916
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

917 918
  SStreamTaskCheckReq req;
  SDecoder            decoder;
919

X
Xiaoyu Wang 已提交
920
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
921
  tDecodeStreamTaskCheckReq(&decoder, &req);
922
  tDecoderClear(&decoder);
923

924 925
  int32_t taskId = req.downstreamTaskId;

926 927 928 929 930 931 932 933 934
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
935

L
Liu Jicong 已提交
936
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
937
  if (pTask != NULL) {
938
    rsp.status = streamTaskCheckStatus(pTask);
939 940
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

941
    tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d",
942 943
            pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status);
944 945
  } else {
    rsp.status = 0;
946 947
    tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
            taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
948 949 950 951 952
  }

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
953

954
  tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code);
955
  if (code < 0) {
956
    tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId);
L
Liu Jicong 已提交
957
    return -1;
958
  }
L
Liu Jicong 已提交
959

960 961 962 963 964
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
965
  tEncodeStreamTaskCheckRsp(&encoder, &rsp);
966 967
  tEncoderClear(&encoder);

968
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
969

970 971 972 973
  tmsgSendRsp(&rspMsg);
  return 0;
}

974 975 976 977
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
  char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t len = pMsg->contLen - sizeof(SMsgHead);

978 979 980 981
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
982
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
983
  code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
984

985 986 987 988 989
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

990
  tDecoderClear(&decoder);
991 992
  tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
          rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
993

L
Liu Jicong 已提交
994
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
995
  if (pTask == NULL) {
996
    tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
997
            pTq->pStreamMeta->vgId);
998
    terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
999 1000 1001
    return -1;
  }

1002
  code = streamProcessCheckRsp(pTask, &rsp);
L
Liu Jicong 已提交
1003 1004
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
1005 1006
}

1007
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1008 1009 1010
  int32_t code = 0;
  int32_t vgId = TD_VID(pTq->pVnode);

5
54liuyao 已提交
1011 1012 1013
  if (tsDisableStream) {
    return 0;
  }
1014 1015 1016 1017

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
1018
    terrno = TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
1019 1020
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId,
            (int32_t)sizeof(SStreamTask));
1021 1022
    return -1;
  }
1023

1024 1025
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1026
  code = tDecodeStreamTask(&decoder, pTask);
1027 1028 1029 1030 1031
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
1032

1033 1034
  tDecoderClear(&decoder);

1035 1036
  SStreamMeta* pStreamMeta = pTq->pStreamMeta;

1037
  // 2.save task, use the newest commit version as the initial start version of stream task.
1038 1039 1040 1041
  taosWLockLatch(&pStreamMeta->lock);
  code = streamMetaAddDeployedTask(pStreamMeta, sversion, pTask);

  int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
1042
  if (code < 0) {
1043
    tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
1044
    taosWUnLockLatch(&pStreamMeta->lock);
1045 1046 1047
    return -1;
  }

1048
  taosWUnLockLatch(&pStreamMeta->lock);
1049

1050
  // 3. It's an fill history task, do nothing. wait for the main task to start it
1051
  streamPrepareNdoCheckDownstream(pTask);
1052

1053 1054
  tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr,
          streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
1055 1056 1057 1058

  return 0;
}

1059
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
1060
  int32_t code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1061 1062
  char*   msg = pMsg->pCont;

1063
  SStreamMeta*           pMeta = pTq->pStreamMeta;
1064
  SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg;
1065 1066

  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
1067
  if (pTask == NULL) {
1068 1069
    tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
            pMeta->vgId, pReq->taskId);
1070 1071 1072 1073
    return -1;
  }

  // do recovery step 1
1074 1075 1076
  const char* pId = pTask->id.idStr;
  tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pId,
          streamGetTaskStatusStr(pTask->status.taskStatus));
1077

H
Haojun Liao 已提交
1078
  int64_t st = taosGetTimestampMs();
1079 1080
  int8_t  schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
                                                      TASK_SCHED_STATUS__WAITING);
1081 1082 1083 1084
  if (schedStatus != TASK_SCHED_STATUS__INACTIVE) {
    ASSERT(0);
    return 0;
  }
1085

1086
  if (!streamTaskRecoverScanStep1Finished(pTask)) {
L
liuyao 已提交
1087 1088
    streamSourceScanHistoryData(pTask);
  }
H
Haojun Liao 已提交
1089

L
liuyao 已提交
1090
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
1091
    tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId);
L
liuyao 已提交
1092
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
1093
    streamMetaReleaseTask(pMeta, pTask);
L
Liu Jicong 已提交
1094 1095 1096
    return 0;
  }

H
Haojun Liao 已提交
1097
  double el = (taosGetTimestampMs() - st) / 1000.0;
1098
  tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pId, el);
H
Haojun Liao 已提交
1099

H
Haojun Liao 已提交
1100
  if (pTask->info.fillHistory) {
L
liuyao 已提交
1101
    SVersionRange* pRange = NULL;
1102
    SStreamTask*   pStreamTask = NULL;
1103

L
liuyao 已提交
1104 1105 1106 1107
    if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
      // 1. stop the related stream task, get the current scan wal version of stream task, ver.
      pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
      if (pStreamTask == NULL) {
1108 1109 1110 1111 1112 1113 1114 1115 1116
        qError("failed to find s-task:0x%x, it may have been destroyed, drop fill history task:%s",
               pTask->streamTaskId.taskId, pTask->id.idStr);

        pTask->status.taskStatus = TASK_STATUS__DROPPING;
        tqDebug("s-task:%s scan-history-task set status to be dropping", pId);

        streamMetaSaveTask(pMeta, pTask);
        streamMetaReleaseTask(pMeta, pTask);
        return -1;
L
liuyao 已提交
1117
      }
L
Liu Jicong 已提交
1118

L
liuyao 已提交
1119
      ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
L
Liu Jicong 已提交
1120

L
liuyao 已提交
1121
      // wait for the stream task get ready for scan history data
1122
      while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) ||
1123
             pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
1124 1125
        tqDebug("s-task:%s level:%d related stream task:%s not ready for halt, wait for it and recheck in 100ms", pId,
                pTask->info.taskLevel, pId);
L
liuyao 已提交
1126 1127
        taosMsleep(100);
      }
1128

L
liuyao 已提交
1129 1130
      // now we can stop the stream task execution
      pStreamTask->status.taskStatus = TASK_STATUS__HALT;
1131
      tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pId,
1132
              pStreamTask->info.taskLevel, pId);
H
Haojun Liao 已提交
1133

L
liuyao 已提交
1134
      // if it's an source task, extract the last version in wal.
1135
      streamHistoryTaskSetVerRangeStep2(pTask);
L
liuyao 已提交
1136
    }
1137

L
liuyao 已提交
1138
    if (!streamTaskRecoverScanStep1Finished(pTask)) {
1139 1140
      tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s",
              pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pId);
1141
      ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
L
Liu Jicong 已提交
1142

1143
      st = taosGetTimestampMs();
L
liuyao 已提交
1144 1145
      streamSetParamForStreamScannerStep2(pTask, pRange, &pTask->dataRange.window);
    }
1146

1147
    if (!streamTaskRecoverScanStep2Finished(pTask)) {
1148
      streamSourceScanHistoryData(pTask);
1149

L
liuyao 已提交
1150
      if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
1151
        tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId);
1152 1153 1154
        streamMetaReleaseTask(pMeta, pTask);
        return 0;
      }
1155

L
liuyao 已提交
1156
      streamTaskRecoverSetAllStepFinished(pTask);
1157
    }
H
Haojun Liao 已提交
1158

L
liuyao 已提交
1159
    el = (taosGetTimestampMs() - st) / 1000.0;
1160
    tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pId, el);
L
liuyao 已提交
1161

1162
    // 3. notify downstream tasks to transfer executor state after handle all history blocks.
L
liuyao 已提交
1163 1164 1165 1166 1167
    if (!pTask->status.transferState) {
      code = streamDispatchTransferStateMsg(pTask);
      if (code != TSDB_CODE_SUCCESS) {
        // todo handle error
      }
1168 1169

      pTask->status.transferState = true;
H
Haojun Liao 已提交
1170
    }
H
Haojun Liao 已提交
1171 1172 1173

    // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
    // 5. resume the related stream task.
1174
    streamTryExec(pTask);
H
Haojun Liao 已提交
1175

1176
    pTask->status.taskStatus = TASK_STATUS__DROPPING;
1177
    tqDebug("s-task:%s scan-history-task set status to be dropping", pId);
1178

1179
    streamMetaSaveTask(pMeta, pTask);
1180
    streamMetaSaveTask(pMeta, pStreamTask);
1181

1182
    streamMetaReleaseTask(pMeta, pTask);
1183 1184
    streamMetaReleaseTask(pMeta, pStreamTask);

1185
    taosWLockLatch(&pMeta->lock);
1186 1187
    if (streamMetaCommit(pTask->pMeta) < 0) {
      // persist to disk
1188
    }
1189
    taosWUnLockLatch(&pMeta->lock);
1190 1191 1192 1193
  } else {
    // todo update the chkInfo version for current task.
    // this task has an associated history stream task, so we need to scan wal from the end version of
    // history scan. The current version of chkInfo.current is not updated during the history scan
1194 1195
    STimeWindow* pWindow = &pTask->dataRange.window;

1196
    if (pTask->historyTaskId.taskId == 0) {
1197
      *pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
1198 1199
      tqDebug("s-task:%s no related scan-history-data task, reset the time window:%" PRId64 " - %" PRId64, pId,
              pWindow->skey, pWindow->ekey);
1200
    } else {
1201 1202 1203 1204
      tqDebug(
          "s-task:%s history data in current time window scan completed, now start to handle data from WAL, start "
          "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
          pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
1205
    }
1206

1207
    // notify the downstream agg tasks that upstream tasks are ready to processing the WAL data, update the
1208 1209
    code = streamTaskScanHistoryDataComplete(pTask);
    streamMetaReleaseTask(pMeta, pTask);
1210 1211 1212 1213 1214 1215

    // let's start the stream task by extracting data from wal
    if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
      tqStartStreamTasks(pTq);
    }

1216 1217
    return code;
  }
1218 1219 1220 1221

  return 0;
}

H
Haojun Liao 已提交
1222
// notify the downstream tasks to transfer executor state after handle all history blocks.
H
Haojun Liao 已提交
1223 1224 1225 1226 1227
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t len = pMsg->contLen - sizeof(SMsgHead);

  SStreamTransferReq req = {0};
H
Haojun Liao 已提交
1228

1229
  SDecoder decoder;
H
Haojun Liao 已提交
1230
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
1231
  int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
H
Haojun Liao 已提交
1232
  tDecoderClear(&decoder);
H
Haojun Liao 已提交
1233

1234
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1235
  if (pTask == NULL) {
1236
    tqError("failed to find task:0x%x, it may have been dropped already", req.taskId);
1237 1238 1239
    return -1;
  }

H
Haojun Liao 已提交
1240 1241 1242 1243 1244 1245
  int32_t remain = streamAlignTransferState(pTask);
  if (remain > 0) {
    tqDebug("s-task:%s receive transfer state msg, remain:%d", pTask->id.idStr, remain);
    return 0;
  }

L
liuyao 已提交
1246
  // transfer the ownership of executor state
H
Haojun Liao 已提交
1247
  tqDebug("s-task:%s all upstream tasks end transfer msg", pTask->id.idStr);
1248

1249
  // related stream task load the state from the state storage backend
L
liuyao 已提交
1250
  SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId);
1251
  if (pStreamTask == NULL) {
H
Haojun Liao 已提交
1252
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1253
    tqError("failed to find related stream task:0x%x, it may have been dropped already", req.taskId);
1254 1255 1256
    return -1;
  }

H
Haojun Liao 已提交
1257 1258
  // when all upstream tasks have notified the this task to start transfer state, then we start the transfer procedure.
  streamTaskReleaseState(pTask);
1259
  streamTaskReloadState(pStreamTask);
H
Haojun Liao 已提交
1260
  streamMetaReleaseTask(pTq->pStreamMeta, pStreamTask);
1261

1262 1263
  ASSERT(pTask->streamTaskId.taskId != 0);
  pTask->status.transferState = true;
L
Liu Jicong 已提交
1264

H
Haojun Liao 已提交
1265
  streamSchedExec(pTask);
L
Liu Jicong 已提交
1266
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1267 1268 1269
  return 0;
}

1270
int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
1271 1272
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1273 1274

  // deserialize
1275
  SStreamScanHistoryFinishReq req = {0};
1276 1277

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1278
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1279
  tDecodeStreamScanHistoryFinishReq(&decoder, &req);
1280 1281
  tDecoderClear(&decoder);

1282
  // find task
L
Liu Jicong 已提交
1283
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1284
  if (pTask == NULL) {
1285
    tqError("failed to find task:0x%x, it may be destroyed, vgId:%d", req.taskId, pTq->pStreamMeta->vgId);
1286 1287 1288
    return -1;
  }

1289
  int32_t code = streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId);
L
Liu Jicong 已提交
1290
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1291
  return code;
L
Liu Jicong 已提交
1292
}
L
Liu Jicong 已提交
1293

L
Liu Jicong 已提交
1294 1295 1296 1297 1298
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

1299 1300 1301 1302
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

H
Haojun Liao 已提交
1303 1304
  *pRefBlock = NULL;

1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tDecoderInit(pCoder, (uint8_t*)pData, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
  if (numOfTables == 0 || pRes->affectedRows == 0) {
    taosArrayDestroy(pRes->uidList);
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, numOfTables);
  pDelBlock->info.rows = numOfTables;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < numOfTables; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataSetVal(pUidCol, i, (const char*)pUid, false);

    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

  taosArrayDestroy(pRes->uidList);
  *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
  if (pRefBlock == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
  (*pRefBlock)->pBlock = pDelBlock;
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1352 1353
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1354 1355 1356 1357

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1358 1359 1360 1361 1362 1363
  if (taskId == STREAM_TASK_STATUS_CHECK_ID) {
    tqStreamTasksStatusCheck(pTq);
    return 0;
  }

  if (taskId == EXTRACT_DATA_FROM_WAL_ID) {  // all tasks are extracted submit data from the wal
1364
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1365
    return 0;
1366
  }
1367

1368
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1369
  if (pTask != NULL) {
1370
    // even in halt status, the data in inputQ must be processed
1371 1372 1373
    int8_t status = pTask->status.taskStatus;
    if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) {
      tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
dengyihao's avatar
dengyihao 已提交
1374
              pTask->chkInfo.version);
1375
      streamProcessRunReq(pTask);
1376
    } else {
1377
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
1378 1379
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
              pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
1380
    }
1381

L
Liu Jicong 已提交
1382
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1383
    tqStartStreamTasks(pTq);
L
Liu Jicong 已提交
1384
    return 0;
1385
  } else {
1386
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
1387
    return -1;
L
Liu Jicong 已提交
1388
  }
L
Liu Jicong 已提交
1389 1390
}

L
Liu Jicong 已提交
1391
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
dengyihao's avatar
dengyihao 已提交
1392 1393 1394
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1395 1396 1397 1398

  SStreamDispatchReq req = {0};

  SDecoder decoder;
L
Liu Jicong 已提交
1399
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1400
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1401

1402
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1403
  if (pTask) {
1404
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1405
    streamProcessDispatchMsg(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1406
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1407
    return 0;
1408
  } else {
L
liuyao 已提交
1409
    tDeleteStreamDispatchReq(&req);
1410
    return -1;
L
Liu Jicong 已提交
1411
  }
L
Liu Jicong 已提交
1412 1413
}

L
Liu Jicong 已提交
1414 1415
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1416
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1417
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1418 1419

  int32_t vgId = pTq->pStreamMeta->vgId;
L
Liu Jicong 已提交
1420
  if (pTask) {
1421
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1422
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1423
    return 0;
1424
  } else {
1425
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
1426
    return TSDB_CODE_INVALID_MSG;
L
Liu Jicong 已提交
1427
  }
L
Liu Jicong 已提交
1428
}
L
Liu Jicong 已提交
1429

1430
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1431
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1432 1433
  tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);

1434
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1435
  return 0;
L
Liu Jicong 已提交
1436
}
L
Liu Jicong 已提交
1437

L
liuyao 已提交
1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451
int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) {
  if (pTask) {
    if (!streamTaskShouldPause(&pTask->status)) {
      tqDebug("vgId:%d s-task:%s set pause flag", pStreamMeta->vgId, pTask->id.idStr);
      atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
      atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
    }
    streamMetaReleaseTask(pStreamMeta, pTask);
  } else {
    return -1;
  }
  return 0;
}

5
54liuyao 已提交
1452 1453
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
dengyihao's avatar
dengyihao 已提交
1454
  SStreamTask*          pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
L
liuyao 已提交
1455 1456 1457 1458 1459
  int32_t code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pTask);
  if (code != 0) {
    return code;
  }
  SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId);
L
liuyao 已提交
1460 1461 1462
  if (pHistoryTask) {
    code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pHistoryTask);
  }
L
liuyao 已提交
1463 1464 1465 1466 1467
  return code;
}

int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int8_t igUntreated) {
  int32_t vgId = pTq->pStreamMeta->vgId;
1468 1469 1470
  if (pTask == NULL) {
    return -1;
  }
L
liuyao 已提交
1471

1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492
  if (streamTaskShouldPause(&pTask->status)) {
    atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);

    // no lock needs to secure the access of the version
    if (igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
      // discard all the data  when the stream task is suspended.
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
              ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
    } else {  // from the previous paused version and go on
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
    }

    if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
      streamStartRecoverTask(pTask, igUntreated);
    } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
      tqStartStreamTasks(pTq);
    } else {
      streamSchedExec(pTask);
L
liuyao 已提交
1493
    }
L
liuyao 已提交
1494
  }
1495 1496

  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
5
54liuyao 已提交
1497 1498 1499 1500 1501
  return 0;
}

int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
L
liuyao 已提交
1502
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
L
liuyao 已提交
1503 1504 1505
  int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
  if (code != 0) {
    return code;
L
liuyao 已提交
1506
  }
1507

L
liuyao 已提交
1508
  SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId);
L
liuyao 已提交
1509 1510 1511
  if (pHistoryTask) {
    code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
  }
1512

L
liuyao 已提交
1513
  return code;
5
54liuyao 已提交
1514 1515
}

L
Liu Jicong 已提交
1516
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
1517 1518 1519 1520 1521
  char*    msgStr = pMsg->pCont;
  char*    msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t  msgLen = pMsg->contLen - sizeof(SMsgHead);
  SDecoder decoder;

L
Liu Jicong 已提交
1522
  SStreamRetrieveReq req;
1523
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1524
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1525
  tDecoderClear(&decoder);
1526

L
Liu Jicong 已提交
1527
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1528
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1529

L
Liu Jicong 已提交
1530
  if (pTask) {
1531
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
1532
    streamProcessRetrieveReq(pTask, &req, &rsp);
1533

L
Liu Jicong 已提交
1534
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1535
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1536
    return 0;
L
Liu Jicong 已提交
1537
  } else {
L
liuyao 已提交
1538
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1539
    return -1;
L
Liu Jicong 已提交
1540 1541 1542 1543 1544 1545 1546
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1547

1548 1549 1550 1551 1552 1553
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1554 1555 1556

  SStreamDispatchReq req;
  SDecoder           decoder;
1557
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1558 1559
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1560
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1561 1562
    goto FAIL;
  }
L
Liu Jicong 已提交
1563
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1564

L
Liu Jicong 已提交
1565
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1566

L
Liu Jicong 已提交
1567
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1568
  if (pTask) {
1569
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1570
    streamProcessDispatchMsg(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1571
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1572 1573
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1574
    return 0;
5
54liuyao 已提交
1575 1576
  } else {
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1577
  }
L
Liu Jicong 已提交
1578

1579 1580
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1581
FAIL:
1582 1583 1584 1585
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1586
    SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info};
1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1603
  SRpcMsg rsp = {
1604
      .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
1605
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1606
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1607 1608
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1609
  return -1;
L
Liu Jicong 已提交
1610
}
L
Liu Jicong 已提交
1611

1612
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1613