tq.c 55.2 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
static int32_t tqInitialize(STQ* pTq);
dengyihao's avatar
dengyihao 已提交
22

wmmhello's avatar
wmmhello 已提交
23
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
dengyihao's avatar
dengyihao 已提交
24 25
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
wmmhello's avatar
wmmhello 已提交
26

L
Liu Jicong 已提交
27
int32_t tqInit() {
L
Liu Jicong 已提交
28 29 30 31 32 33
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

34 35 36 37 38 39
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
40 41 42
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
43
    atomic_store_8(&tqMgmt.inited, 1);
44
  }
45

L
Liu Jicong 已提交
46 47
  return 0;
}
L
Liu Jicong 已提交
48

49
void tqCleanUp() {
L
Liu Jicong 已提交
50 51 52 53 54 55 56 57
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
58
    streamCleanUp();
L
Liu Jicong 已提交
59 60
    atomic_store_8(&tqMgmt.inited, 0);
  }
61
}
L
Liu Jicong 已提交
62

63
void tqDestroyTqHandle(void* data) {
64 65
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
wmmhello's avatar
wmmhello 已提交
66

67
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
68
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
69
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
70
    tqReaderClose(pData->execHandle.pTqReader);
71 72
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
73
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
74
    walCloseReader(pData->pWalReader);
75
    tqReaderClose(pData->execHandle.pTqReader);
76 77
    taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
    nodesDestroyNode(pData->execHandle.execTb.node);
78
  }
dengyihao's avatar
dengyihao 已提交
79
  if (pData->msg != NULL) {
80 81 82
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
D
dapan1121 已提交
83
  }
L
Liu Jicong 已提交
84 85
}

86 87 88 89 90
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
91
STQ* tqOpen(const char* path, SVnode* pVnode) {
92
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
93
  if (pTq == NULL) {
S
Shengliang Guan 已提交
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
95 96
    return NULL;
  }
97

98
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
99
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
100
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
101

102
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
103
  taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle);
104

105
  taosInitRWLatch(&pTq->lock);
106
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
107

108
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
109
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
110

111 112 113 114 115 116 117
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
118 119 120
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
121
  if (tqMetaOpen(pTq) < 0) {
122
    return -1;
123 124
  }

L
Liu Jicong 已提交
125 126
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
127
    return -1;
128 129
  }

130
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
131
  if (pTq->pStreamMeta == NULL) {
132
    return -1;
L
Liu Jicong 已提交
133 134
  }

135 136
  // the version is kept in task's meta data
  // todo check if this version is required or not
137 138
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
139 140
  }

141
  return 0;
L
Liu Jicong 已提交
142
}
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
void tqClose(STQ* pTq) {
145 146
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
147
  }
148 149 150 151 152 153 154 155 156

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
157
}
L
Liu Jicong 已提交
158

159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
  bool inTimer = false;

  taosWLockLatch(&pMeta->lock);

  void* pIter = NULL;
  while(1) {
    pIter = taosHashIterate(pMeta->pTasks, pIter);
    if (pIter == NULL) {
      break;
    }

    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->status.timerActive == 1) {
      inTimer = true;
    }
  }

  taosWUnLockLatch(&pMeta->lock);

  return inTimer;
}

H
Haojun Liao 已提交
182 183 184 185 186 187 188 189 190 191 192 193
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
H
Haojun Liao 已提交
194
      tqDebug("vgId:%d s-task:%s set closing flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
195 196 197
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
198
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
H
Haojun Liao 已提交
199

200
      int64_t el = taosGetTimestampMs() - st;
H
Haojun Liao 已提交
201
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
202 203 204
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
H
Haojun Liao 已提交
205 206 207 208 209

    tqDebug("vgId:%d start to check all tasks", pTq->pStreamMeta->vgId);

    int64_t st = taosGetTimestampMs();

210 211 212
    while(hasStreamTaskInTimer(pTq->pStreamMeta)) {
      tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pTq->pStreamMeta->vgId);
      taosMsleep(100);
H
Haojun Liao 已提交
213 214 215 216
    }

    int64_t el = taosGetTimestampMs() - st;
    tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%"PRId64" ms", pTq->pStreamMeta->vgId, el);
H
Haojun Liao 已提交
217 218 219
  }
}

D
dapan1121 已提交
220 221
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
222 223
  int32_t len = 0;
  int32_t code = 0;
D
dapan1121 已提交
224 225

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
226
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
D
dapan1121 已提交
227 228 229
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
230 231 232 233 234 235 236 237 238 239 240

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

D
dapan1121 已提交
241 242 243
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
244 245 246 247 248 249

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);

D
dapan1121 已提交
250
  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
251
    tEncodeMqDataRsp(&encoder, pRsp);
D
dapan1121 已提交
252
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
X
Xiaoyu Wang 已提交
253
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
dengyihao's avatar
dengyihao 已提交
254
  }
L
Liu Jicong 已提交
255

wmmhello's avatar
wmmhello 已提交
256
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
257 258

  SRpcMsg rsp = {
D
dapan1121 已提交
259
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
260 261 262 263
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
L
Liu Jicong 已提交
264

L
Liu Jicong 已提交
265
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
266 267
  return 0;
}
L
Liu Jicong 已提交
268

H
Haojun Liao 已提交
269
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
270 271 272 273
  SMqDataRsp dataRsp = {0};
  dataRsp.head.consumerId = pHandle->consumerId;
  dataRsp.head.epoch = pHandle->epoch;
  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
274 275

  int64_t sver = 0, ever = 0;
H
Haojun Liao 已提交
276
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
dengyihao's avatar
dengyihao 已提交
277 278
  tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
                  ever);
D
dapan1121 已提交
279

280 281
  char buf1[TSDB_OFFSET_LEN] = {0};
  char buf2[TSDB_OFFSET_LEN] = {0};
282 283
  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
dengyihao's avatar
dengyihao 已提交
284 285
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
          dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
L
Liu Jicong 已提交
286 287 288
  return 0;
}

289 290 291 292 293 294
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
                      int32_t type, int32_t vgId) {
  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
295

296 297 298 299
  char buf1[TSDB_OFFSET_LEN] = {0};
  char buf2[TSDB_OFFSET_LEN] = {0};
  tFormatOffset(buf1, TSDB_OFFSET_LEN, &pRsp->reqOffset);
  tFormatOffset(buf2, TSDB_OFFSET_LEN, &pRsp->rspOffset);
300

301
  tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId,
dengyihao's avatar
dengyihao 已提交
302
          pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
303 304 305 306

  return 0;
}

307
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
308 309
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
310

X
Xiaoyu Wang 已提交
311 312
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
313
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
314 315
    return -1;
  }
316

317 318
  tDecoderClear(&decoder);

319 320 321
  STqOffset* pOffset = &vgOffset.offset;

  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
322
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
323 324 325 326 327 328
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
            pOffset->val.version);
    if (pOffset->val.version + 1 == sversion) {
      pOffset->val.version += 1;
329
    }
330
  } else {
331
    tqError("invalid commit offset type:%d", pOffset->val.type);
332
    return -1;
333
  }
334

335 336
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
337
    tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
338
            vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
339
    return 0;  // no need to update the offset value
340 341
  }

342
  // save the new offset value
343
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
344
    return -1;
345
  }
346

347 348 349
  return 0;
}

350
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
351 352
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
353 354 355

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
356
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
H
Haojun Liao 已提交
357
    tqError("vgId:%d failed to decode seek msg", vgId);
358 359 360 361 362
    return -1;
  }

  tDecoderClear(&decoder);

H
Haojun Liao 已提交
363 364 365
  tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
          vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);

366 367 368
  STqOffset* pOffset = &vgOffset.offset;
  if (pOffset->val.type != TMQ_OFFSET__LOG) {
    tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
369 370 371
    return -1;
  }

372 373
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
  if (pHandle == NULL) {
dengyihao's avatar
dengyihao 已提交
374
    tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey);
375 376
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
377 378
  }

379 380 381 382 383 384 385 386
  // 2. check consumer-vg assignment status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != vgOffset.consumerId) {
    tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
387
  }
388 389
  taosRUnLockLatch(&pTq->lock);

dengyihao's avatar
dengyihao 已提交
390
  // 3. check the offset info
391 392 393 394 395 396
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL) {
    if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
      return 0;  // no need to update the offset value
    }
397

398 399 400 401 402 403
    if (pSavedOffset->val.version == pOffset->val.version) {
      tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
              pOffset->val.version, pSavedOffset->val.version);
      return 0;
    }
  }
404 405 406

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
407 408 409 410
  if (pOffset->val.version < sver) {
    pOffset->val.version = sver;
  } else if (pOffset->val.version > ever) {
    pOffset->val.version = ever;
411 412 413
  }

  // save the new offset value
414 415 416 417
  if (pSavedOffset != NULL) {
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
            pSavedOffset->val.version);
  } else {
dengyihao's avatar
dengyihao 已提交
418
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version);
419
  }
420

421 422
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
    tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
423 424 425
    return -1;
  }

H
Haojun Liao 已提交
426 427 428
  tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
          vgOffset.consumerId, vgOffset.offset.val.version);

429 430 431
  return 0;
}

L
Liu Jicong 已提交
432
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
433
  void* pIter = NULL;
434

L
Liu Jicong 已提交
435
  while (1) {
436
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
437 438 439 440
    if (pIter == NULL) {
      break;
    }

441
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
442

L
Liu Jicong 已提交
443 444
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
445
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
446 447
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
448
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
449 450 451 452 453
          return -1;
        }
      }
    }
  }
454

L
Liu Jicong 已提交
455 456 457
  return 0;
}

458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
  int32_t vgId = TD_VID(pTq->pVnode);
  taosWLockLatch(&pTq->lock);
  if (taosHashGetSize(pTq->pPushMgr) > 0) {
    void* pIter = taosHashIterate(pTq->pPushMgr, NULL);

    while (pIter) {
      STqHandle* pHandle = *(STqHandle**)pIter;
      tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);

      if (ASSERT(pHandle->msg != NULL)) {
        tqError("pHandle->msg should not be null");
        break;
      }else{
        SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
        tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
        taosMemoryFree(pHandle->msg);
        pHandle->msg = NULL;
      }

      pIter = taosHashIterate(pTq->pPushMgr, pIter);
    }

    taosHashClear(pTq->pPushMgr);
  }
wmmhello's avatar
wmmhello 已提交
483
  taosWUnLockLatch(&pTq->lock);
484 485 486
  return 0;
}

D
dapan1121 已提交
487
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
488
  SMqPollReq req = {0};
dengyihao's avatar
dengyihao 已提交
489
  int        code = 0;
D
dapan1121 已提交
490 491 492 493 494 495 496 497 498 499
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);
wmmhello's avatar
wmmhello 已提交
500
  STqHandle*   pHandle = NULL;
D
dapan1121 已提交
501

wmmhello's avatar
wmmhello 已提交
502 503 504 505 506 507 508 509 510 511
  while (1) {
    taosWLockLatch(&pTq->lock);
    // 1. find handle
    pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
    if (pHandle == NULL) {
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_MSG;
      taosWUnLockLatch(&pTq->lock);
      return -1;
    }
D
dapan1121 已提交
512

513 514
    // 2. check re-balance status
    if (pHandle->consumerId != consumerId) {
dengyihao's avatar
dengyihao 已提交
515 516
      tqError("ERROR tmq poll: consumer:0x%" PRIx64
              " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
517
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
518
      terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
519 520 521
      taosWUnLockLatch(&pTq->lock);
      return -1;
    }
wmmhello's avatar
wmmhello 已提交
522

wmmhello's avatar
wmmhello 已提交
523
    bool exec = tqIsHandleExec(pHandle);
dengyihao's avatar
dengyihao 已提交
524
    if (!exec) {
wmmhello's avatar
wmmhello 已提交
525
      tqSetHandleExec(pHandle);
dengyihao's avatar
dengyihao 已提交
526 527 528
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
      tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
              req.subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
529 530 531
      taosWUnLockLatch(&pTq->lock);
      break;
    }
532
    taosWUnLockLatch(&pTq->lock);
533

dengyihao's avatar
dengyihao 已提交
534 535 536
    tqDebug("tmq poll: consumer:0x%" PRIx64
            "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
            consumerId, vgId, req.subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
537
    taosMsleep(10);
D
dapan1121 已提交
538 539 540
  }

  // 3. update the epoch value
541
  if (pHandle->epoch < reqEpoch) {
dengyihao's avatar
dengyihao 已提交
542
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
X
Xiaoyu Wang 已提交
543
            reqEpoch);
D
dapan1121 已提交
544 545 546
    pHandle->epoch = reqEpoch;
  }

547 548
  char buf[TSDB_OFFSET_LEN];
  tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
D
dapan1121 已提交
549 550 551
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);

wmmhello's avatar
wmmhello 已提交
552
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
553
  tqSetHandleIdle(pHandle);
554

dengyihao's avatar
dengyihao 已提交
555 556
  tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId,
          req.subKey, pHandle);
557
  return code;
D
dapan1121 已提交
558 559
}

560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  // 2. check re-balance status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, vgId, req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
  }
  taosRUnLockLatch(&pTq->lock);

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);

  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
  if (pOffset != NULL) {
    if (pOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
    dataRsp.rspOffset.version = pOffset->val.version;
  } else {
    if (req.useSnapshot == true) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;

618
    if (reqOffset.type == TMQ_OFFSET__LOG) {
619
      int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
dengyihao's avatar
dengyihao 已提交
620
      if (currentVer == -1) {  // not start to read data from wal yet, return req offset directly
621 622 623 624
        dataRsp.rspOffset.version = reqOffset.version;
      } else {
        dataRsp.rspOffset.version = currentVer;  // return current consume offset value
      }
625
    } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
626
      dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
627 628 629 630 631 632 633 634 635 636 637 638 639 640 641
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      dataRsp.rspOffset.version = ever;
    } else {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
              reqOffset.type);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }
  }

  tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
  return 0;
}

642
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
643
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
dengyihao's avatar
dengyihao 已提交
644
  int32_t        vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
645

646
  tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
647
  int32_t code = 0;
L
Liu Jicong 已提交
648

wmmhello's avatar
wmmhello 已提交
649
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
650 651
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
652
    while (tqIsHandleExec(pHandle)) {
dengyihao's avatar
dengyihao 已提交
653 654
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId,
              pHandle->subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
655
      taosMsleep(10);
656
    }
657

L
Liu Jicong 已提交
658 659 660
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
661

L
Liu Jicong 已提交
662 663 664 665
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
666
  }
667

L
Liu Jicong 已提交
668 669
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
670
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
671
  }
L
Liu Jicong 已提交
672

L
Liu Jicong 已提交
673
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
674
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
675
  }
wmmhello's avatar
wmmhello 已提交
676 677
  taosWUnLockLatch(&pTq->lock);

L
Liu Jicong 已提交
678
  return 0;
L
Liu Jicong 已提交
679 680
}

681
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
682 683
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
684
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
685
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
686 687 688 689
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
690 691 692 693 694
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
695 696 697 698 699 700
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

701
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
702 703 704 705 706 707 708 709 710 711 712
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

713
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
dengyihao's avatar
dengyihao 已提交
714
  int         ret = 0;
L
Liu Jicong 已提交
715
  SMqRebVgReq req = {0};
dengyihao's avatar
dengyihao 已提交
716
  SDecoder    dc = {0};
717 718 719 720 721 722 723 724 725

  tDecoderInit(&dc, msg, msgLen);

  // decode req
  if (tDecodeSMqRebVgReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
  }
L
Liu Jicong 已提交
726

727
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
D
dapan1121 已提交
728
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
729

730 731 732 733 734 735 736 737
  STqHandle* pHandle = NULL;
  while(1){
    pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
    if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0){
      break;
    }
  }

L
Liu Jicong 已提交
738
  if (pHandle == NULL) {
L
Liu Jicong 已提交
739
    if (req.oldConsumerId != -1) {
740
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
741
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
742
    }
L
Liu Jicong 已提交
743
    if (req.newConsumerId == -1) {
744
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
745
      goto end;
L
Liu Jicong 已提交
746
    }
747 748 749 750
    STqHandle handle = {0};
    ret = tqCreateHandle(pTq, &req, &handle);
    if(ret < 0){
      tqDestroyTqHandle(&handle);
751
      goto end;
752
    }
753
    ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
L
Liu Jicong 已提交
754
  } else {
755
    taosWLockLatch(&pTq->lock);
wmmhello's avatar
wmmhello 已提交
756

D
dapan1121 已提交
757
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
dengyihao's avatar
dengyihao 已提交
758 759
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId,
             req.newConsumerId);
760 761 762
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
763 764
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
    }
dengyihao's avatar
dengyihao 已提交
765
    //    atomic_add_fetch_32(&pHandle->epoch, 1);
766

767
    // kill executing task
dengyihao's avatar
dengyihao 已提交
768 769 770 771 772 773 774 775 776 777
    //    if(tqIsHandleExec(pHandle)) {
    //      qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
    //      if (pTaskInfo != NULL) {
    //        qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
    //      }

    //      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
    //        qStreamCloseTsdbReader(pTaskInfo);
    //      }
    //    }
wmmhello's avatar
wmmhello 已提交
778 779
    // remove if it has been register in the push manager, and return one empty block to consumer
    tqUnregisterPushHandle(pTq, pHandle);
780
    taosWUnLockLatch(&pTq->lock);
781
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
L
Liu Jicong 已提交
782
  }
L
Liu Jicong 已提交
783

784
end:
785
  tDecoderClear(&dc);
786
  return ret;
L
Liu Jicong 已提交
787
}
788

dengyihao's avatar
dengyihao 已提交
789
void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
L
liuyao 已提交
790

791
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
D
dapan1121 已提交
792
  int32_t vgId = TD_VID(pTq->pVnode);
793

794
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
795
  pTask->refCnt = 1;
796
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
dengyihao's avatar
dengyihao 已提交
797 798
  pTask->inputQueue = streamQueueOpen(512 << 10);
  pTask->outputQueue = streamQueueOpen(512 << 10);
L
Liu Jicong 已提交
799 800

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
801
    return -1;
L
Liu Jicong 已提交
802 803
  }

L
Liu Jicong 已提交
804 805
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
806
  pTask->pMsgCb = &pTq->pVnode->msgCb;
807
  pTask->pMeta = pTq->pStreamMeta;
808

809
  pTask->chkInfo.version = ver;
810
  pTask->chkInfo.currentVer = ver;
811

812 813 814
  pTask->dataRange.range.maxVer = ver;
  pTask->dataRange.range.minVer = ver;

815
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
L
liuyao 已提交
816
    SStreamTask* pSateTask = pTask;
L
liuyao 已提交
817
    SStreamTask task = {0};
L
liuyao 已提交
818
    if (pTask->info.fillHistory) {
L
liuyao 已提交
819 820 821
      task.id = pTask->streamTaskId;
      task.pMeta = pTask->pMeta;
      pSateTask = &task;
L
liuyao 已提交
822
    }
823

L
liuyao 已提交
824
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
825 826 827 828
    if (pTask->pState == NULL) {
      return -1;
    }

L
liuyao 已提交
829 830 831 832 833
    SReadHandle handle = {.vnode = pTq->pVnode,
                          .initTqReader = 1,
                          .pStateBackend = pTask->pState,
                          .fillHistory = pTask->info.fillHistory,
                          .winRange = pTask->dataRange.window};
834
    initStorageAPI(&handle.api);
835

836 837
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
838 839
      return -1;
    }
840

841
    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
842
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
L
liuyao 已提交
843
    SStreamTask* pSateTask = pTask;
L
liuyao 已提交
844
    SStreamTask task = {0};
L
liuyao 已提交
845
    if (pTask->info.fillHistory) {
L
liuyao 已提交
846 847 848
      task.id = pTask->streamTaskId;
      task.pMeta = pTask->pMeta;
      pSateTask = &task;
L
liuyao 已提交
849
    }
L
liuyao 已提交
850
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
851 852 853
    if (pTask->pState == NULL) {
      return -1;
    }
854

855
    int32_t     numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList);
L
liuyao 已提交
856 857 858 859 860
    SReadHandle handle = {.vnode = NULL,
                          .numOfVgroups = numOfVgroups,
                          .pStateBackend = pTask->pState,
                          .fillHistory = pTask->info.fillHistory,
                          .winRange = pTask->dataRange.window};
861
    initStorageAPI(&handle.api);
862

863
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
864
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
865 866
      return -1;
    }
867 868

    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
L
Liu Jicong 已提交
869
  }
L
Liu Jicong 已提交
870 871

  // sink
872
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
873
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
874
    pTask->smaSink.smaSink = smaHandleRes;
875
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
876
    pTask->tbSink.vnode = pTq->pVnode;
H
Haojun Liao 已提交
877
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
L
Liu Jicong 已提交
878

X
Xiaoyu Wang 已提交
879
    int32_t   ver1 = 1;
5
54liuyao 已提交
880
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
881
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
882
    if (code == TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
883
      ver1 = info.skmVer;
5
54liuyao 已提交
884
    }
L
Liu Jicong 已提交
885

886 887
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
888
    if (pTask->tbSink.pTSchema == NULL) {
D
dapan1121 已提交
889 890
      return -1;
    }
L
liuyao 已提交
891 892
    pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
    tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
L
Liu Jicong 已提交
893
  }
894

895
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
896
    SWalFilterCond cond = {.deleteMsg = 1};  // delete msg also extract from wal files
897
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
898 899
  }

900
  streamSetupScheduleTrigger(pTask);
901

902 903
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64
         " child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms",
904
         vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel,
905
         pTask->info.fillHistory, pTask->triggerParam);
906 907 908

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
909
  return 0;
L
Liu Jicong 已提交
910
}
L
Liu Jicong 已提交
911

912
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
913 914 915 916
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

917 918
  SStreamTaskCheckReq req;
  SDecoder            decoder;
919

X
Xiaoyu Wang 已提交
920
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
921
  tDecodeStreamTaskCheckReq(&decoder, &req);
922
  tDecoderClear(&decoder);
923

924 925
  int32_t taskId = req.downstreamTaskId;

926 927 928 929 930 931 932 933 934
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
935

L
Liu Jicong 已提交
936
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
937

938
  if (pTask != NULL) {
939
    rsp.status = streamTaskCheckStatus(pTask);
940 941
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

942
    tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d",
943 944
            pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status);
945 946
  } else {
    rsp.status = 0;
947 948
    tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
            taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
949 950 951 952 953
  }

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
954

955
  tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code);
956
  if (code < 0) {
957
    tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId);
L
Liu Jicong 已提交
958
    return -1;
959
  }
L
Liu Jicong 已提交
960

961 962 963 964 965
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
966
  tEncodeStreamTaskCheckRsp(&encoder, &rsp);
967 968
  tEncoderClear(&encoder);

969
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
970

971 972 973 974
  tmsgSendRsp(&rspMsg);
  return 0;
}

975 976 977 978
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
  char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t len = pMsg->contLen - sizeof(SMsgHead);

979 980 981 982
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
983
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
984
  code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
985

986 987 988 989 990
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

991
  tDecoderClear(&decoder);
992 993
  tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
          rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
994

L
Liu Jicong 已提交
995
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
996
  if (pTask == NULL) {
997
    tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
998
            pTq->pStreamMeta->vgId);
999
    terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
1000 1001 1002
    return -1;
  }

1003
  code = streamProcessCheckRsp(pTask, &rsp);
L
Liu Jicong 已提交
1004 1005
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
1006 1007
}

1008
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1009 1010 1011
  int32_t code = 0;
  int32_t vgId = TD_VID(pTq->pVnode);

5
54liuyao 已提交
1012 1013 1014
  if (tsDisableStream) {
    return 0;
  }
1015 1016 1017 1018

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
1019
    terrno = TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
1020 1021
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId,
            (int32_t)sizeof(SStreamTask));
1022 1023
    return -1;
  }
1024

1025 1026
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1027
  code = tDecodeStreamTask(&decoder, pTask);
1028 1029 1030 1031 1032
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
1033

1034 1035
  tDecoderClear(&decoder);

1036 1037
  SStreamMeta* pStreamMeta = pTq->pStreamMeta;

1038
  // 2.save task, use the newest commit version as the initial start version of stream task.
1039 1040 1041 1042
  taosWLockLatch(&pStreamMeta->lock);
  code = streamMetaAddDeployedTask(pStreamMeta, sversion, pTask);

  int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
1043
  if (code < 0) {
1044
    tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
1045
    taosWUnLockLatch(&pStreamMeta->lock);
1046 1047 1048
    return -1;
  }

1049
  taosWUnLockLatch(&pStreamMeta->lock);
1050

1051
  // 3. It's an fill history task, do nothing. wait for the main task to start it
1052
  streamPrepareNdoCheckDownstream(pTask);
1053

1054 1055
  tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr,
          streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
1056

1057 1058 1059
  return 0;
}

1060
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
1061
  int32_t code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1062 1063
  char*   msg = pMsg->pCont;

1064
  SStreamMeta* pMeta = pTq->pStreamMeta;
1065
  SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg;
1066 1067

  SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
1068
  if (pTask == NULL) {
1069 1070
    tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
            pMeta->vgId, pReq->taskId);
1071 1072 1073 1074
    return -1;
  }

  // check param
1075
  int64_t fillVer1 = pTask->chkInfo.version;
1076
  if (fillVer1 <= 0) {
1077
    streamMetaReleaseTask(pMeta, pTask);
1078 1079 1080 1081
    return -1;
  }

  // do recovery step 1
1082 1083 1084
  const char* pId = pTask->id.idStr;
  tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pId,
          streamGetTaskStatusStr(pTask->status.taskStatus));
1085

H
Haojun Liao 已提交
1086
  int64_t st = taosGetTimestampMs();
1087 1088
  int8_t  schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
                                                      TASK_SCHED_STATUS__WAITING);
1089 1090 1091 1092 1093
  if (schedStatus != TASK_SCHED_STATUS__INACTIVE) {
    ASSERT(0);
    return 0;
  }

L
liuyao 已提交
1094 1095 1096
  if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
    streamSourceScanHistoryData(pTask);
  }
1097

L
liuyao 已提交
1098
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
1099
    tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId);
L
liuyao 已提交
1100
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
1101
    streamMetaReleaseTask(pMeta, pTask);
L
Liu Jicong 已提交
1102 1103 1104
    return 0;
  }

H
Haojun Liao 已提交
1105
  double el = (taosGetTimestampMs() - st) / 1000.0;
1106
  tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pId, el);
1107

H
Haojun Liao 已提交
1108
  if (pTask->info.fillHistory) {
L
liuyao 已提交
1109 1110
    SVersionRange* pRange = NULL;
    SStreamTask* pStreamTask = NULL;
1111

L
liuyao 已提交
1112 1113 1114 1115 1116 1117
    if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
      // 1. stop the related stream task, get the current scan wal version of stream task, ver.
      pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
      if (pStreamTask == NULL) {
        // todo handle error
      }
H
Haojun Liao 已提交
1118

L
liuyao 已提交
1119
      ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
1120

L
liuyao 已提交
1121
      // wait for the stream task get ready for scan history data
1122
      while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) ||
L
liuyao 已提交
1123
            pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
1124
        tqDebug("s-task:%s level:%d related stream task:%s not ready for halt, wait for it continue and recheck in 100ms",
H
Haojun Liao 已提交
1125
                pTask->id.idStr, pTask->info.taskLevel, pStreamTask->id.idStr);
L
liuyao 已提交
1126 1127
        taosMsleep(100);
      }
1128

L
liuyao 已提交
1129 1130 1131
      // now we can stop the stream task execution
      pStreamTask->status.taskStatus = TASK_STATUS__HALT;
      tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr,
1132
              pStreamTask->info.taskLevel, pId);
H
Haojun Liao 已提交
1133

L
liuyao 已提交
1134 1135 1136 1137 1138 1139
      // if it's an source task, extract the last version in wal.
      int64_t ver = pTask->dataRange.range.maxVer + 1;
      int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
      if (latestVer >= ver) {
        ver = latestVer;
      }
H
Haojun Liao 已提交
1140

L
liuyao 已提交
1141 1142
      // 2. do secondary scan of the history data, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1]
      pRange = &pTask->dataRange.range;
1143

L
liuyao 已提交
1144 1145 1146 1147
      pRange->minVer = pRange->maxVer + 1;
      pRange->maxVer = ver;
      if (pRange->minVer == pRange->maxVer) {
        streamTaskRecoverSetAllStepFinished(pTask);
1148
        tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest", pId);
L
liuyao 已提交
1149 1150 1151 1152
      }
    }
    
    if (!streamTaskRecoverScanStep1Finished(pTask)) {
1153 1154
      tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64
              " do secondary scan-history-data after halt the related stream task:%s",
1155
              pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr);
1156 1157

      ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
H
Haojun Liao 已提交
1158

1159
      st = taosGetTimestampMs();
L
liuyao 已提交
1160 1161
      streamSetParamForStreamScannerStep2(pTask, pRange, &pTask->dataRange.window);
    }
1162

L
liuyao 已提交
1163
    if(!streamTaskRecoverScanStep2Finished(pTask)) {
1164

1165
      streamSourceScanHistoryData(pTask);
L
liuyao 已提交
1166
      if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
1167
        tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId);
1168 1169 1170
        streamMetaReleaseTask(pMeta, pTask);
        return 0;
      }
1171

L
liuyao 已提交
1172
      streamTaskRecoverSetAllStepFinished(pTask);
1173
    }
H
Haojun Liao 已提交
1174

L
liuyao 已提交
1175
    el = (taosGetTimestampMs() - st) / 1000.0;
1176
    tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pId, el);
L
liuyao 已提交
1177

1178
    // 3. notify the downstream tasks to transfer executor state after handle all history blocks.
L
liuyao 已提交
1179 1180 1181 1182 1183
    if (!pTask->status.transferState) {
      code = streamDispatchTransferStateMsg(pTask);
      if (code != TSDB_CODE_SUCCESS) {
        // todo handle error
      }
1184 1185

      pTask->status.transferState = true;
H
Haojun Liao 已提交
1186
    }
H
Haojun Liao 已提交
1187 1188 1189

    // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
    // 5. resume the related stream task.
1190
    streamTryExec(pTask);
H
Haojun Liao 已提交
1191

1192
    pTask->status.taskStatus = TASK_STATUS__DROPPING;
1193
    tqDebug("s-task:%s scan-history-task set status to be dropping", pId);
1194

1195
    streamMetaSaveTask(pMeta, pTask);
1196
    streamMetaSaveTask(pMeta, pStreamTask);
1197

1198
    streamMetaReleaseTask(pMeta, pTask);
1199 1200
    streamMetaReleaseTask(pMeta, pStreamTask);

1201
    taosWLockLatch(&pMeta->lock);
1202 1203
    if (streamMetaCommit(pTask->pMeta) < 0) {
      // persist to disk
1204
    }
1205
    taosWUnLockLatch(&pMeta->lock);
1206 1207 1208 1209
  } else {
    // todo update the chkInfo version for current task.
    // this task has an associated history stream task, so we need to scan wal from the end version of
    // history scan. The current version of chkInfo.current is not updated during the history scan
1210 1211
    STimeWindow* pWindow = &pTask->dataRange.window;

1212
    if (pTask->historyTaskId.taskId == 0) {
1213
      *pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
1214 1215
      tqDebug("s-task:%s no associated task, reset the time window:%" PRId64 " - %" PRId64, pId, pWindow->skey,
              pWindow->ekey);
1216 1217
    } else {
      tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64
1218
              ", window:%" PRId64 " - %" PRId64, pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
1219
    }
1220 1221 1222

    code = streamTaskScanHistoryDataComplete(pTask);
    streamMetaReleaseTask(pMeta, pTask);
1223 1224 1225 1226 1227 1228

    // let's start the stream task by extracting data from wal
    if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
      tqStartStreamTasks(pTq);
    }

1229 1230
    return code;
  }
H
Haojun Liao 已提交
1231

1232 1233 1234
  return 0;
}

H
Haojun Liao 已提交
1235 1236
// notify the downstream tasks to transfer executor state after handle all history blocks.
int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1237 1238 1239 1240 1241
  SStreamTransferReq req;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  int32_t code = tDecodeStreamRecoverFinishReq(&decoder, &req);
H
Haojun Liao 已提交
1242
  tDecoderClear(&decoder);
H
Haojun Liao 已提交
1243

1244
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1245
  if (pTask == NULL) {
1246
    tqError("failed to find task:0x%x, it may have been dropped already", req.taskId);
1247 1248
    return -1;
  }
H
Haojun Liao 已提交
1249

L
liuyao 已提交
1250
  // transfer the ownership of executor state
L
liuyao 已提交
1251
  streamTaskReleaseState(pTask);
1252 1253
  tqDebug("s-task:%s receive state transfer req", pTask->id.idStr);

L
liuyao 已提交
1254 1255
  SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId);
  streamTaskReloadState(pStreamTask);
1256

H
Haojun Liao 已提交
1257 1258 1259 1260 1261
  ASSERT(pTask->streamTaskId.taskId != 0);
  pTask->status.transferState = true;  // persistent data?

#if 0
  // do check if current task handle all data in the input queue
H
Haojun Liao 已提交
1262
  int64_t st = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
1263
  tqDebug("s-task:%s start step2 recover, ts:%" PRId64, pTask->id.idStr, st);
H
Haojun Liao 已提交
1264

1265
  code = streamSourceRecoverScanStep2(pTask, sversion);
1266
  if (code < 0) {
L
Liu Jicong 已提交
1267
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1268 1269 1270
    return -1;
  }

1271
  qDebug("s-task:%s set start wal scan start ver:%"PRId64, pTask->id.idStr, sversion);
1272

1273
  walReaderSeekVer(pTask->exec.pWalReader, sversion);
L
liuyao 已提交
1274
  pTask->chkInfo.currentVer = sversion;
1275

1276
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1277 1278 1279 1280
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1281 1282 1283
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1284
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1285 1286 1287 1288
    return -1;
  }

  // set status normal
H
Haojun Liao 已提交
1289
  tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr);
1290 1291
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1292
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1293 1294 1295
    return -1;
  }

dengyihao's avatar
dengyihao 已提交
1296
  double el = (taosGetTimestampMs() - st) / 1000.0;
H
Haojun Liao 已提交
1297
  tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1298

1299
  // dispatch recover finish req to all related downstream task
1300
  code = streamDispatchScanHistoryFinishMsg(pTask);
1301
  if (code < 0) {
L
Liu Jicong 已提交
1302
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1303 1304 1305
    return -1;
  }

1306
  atomic_store_8(&pTask->info.fillHistory, 0);
L
Liu Jicong 已提交
1307
  streamMetaSaveTask(pTq->pStreamMeta, pTask);
H
Haojun Liao 已提交
1308
#endif
L
Liu Jicong 已提交
1309

H
Haojun Liao 已提交
1310
  streamSchedExec(pTask);
L
Liu Jicong 已提交
1311
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
H
Haojun Liao 已提交
1312

1313 1314 1315
  return 0;
}

L
Liu Jicong 已提交
1316 1317 1318
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1319 1320

  // deserialize
1321 1322 1323
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1324
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
H
Haojun Liao 已提交
1325
  tDecodeStreamRecoverFinishReq(&decoder, &req);
1326 1327
  tDecoderClear(&decoder);

1328
  // find task
L
Liu Jicong 已提交
1329
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1330 1331 1332
  if (pTask == NULL) {
    return -1;
  }
1333
  // do process request
1334
  if (streamProcessRecoverFinishReq(pTask, req.taskId, req.childId) < 0) {
L
Liu Jicong 已提交
1335
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1336 1337 1338
    return -1;
  }

L
Liu Jicong 已提交
1339
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1340
  return 0;
L
Liu Jicong 已提交
1341
}
L
Liu Jicong 已提交
1342

L
Liu Jicong 已提交
1343 1344 1345 1346 1347
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

1348 1349 1350 1351
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

H
Haojun Liao 已提交
1352 1353
  *pRefBlock = NULL;

1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tDecoderInit(pCoder, (uint8_t*)pData, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
  if (numOfTables == 0 || pRes->affectedRows == 0) {
    taosArrayDestroy(pRes->uidList);
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, numOfTables);
  pDelBlock->info.rows = numOfTables;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < numOfTables; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataSetVal(pUidCol, i, (const char*)pUid, false);

    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

  taosArrayDestroy(pRes->uidList);
  *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
  if (pRefBlock == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
  (*pRefBlock)->pBlock = pDelBlock;
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1401 1402
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1403 1404 1405 1406

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1407 1408 1409 1410 1411 1412
  if (taskId == STREAM_TASK_STATUS_CHECK_ID) {
    tqStreamTasksStatusCheck(pTq);
    return 0;
  }

  if (taskId == EXTRACT_DATA_FROM_WAL_ID) {  // all tasks are extracted submit data from the wal
1413
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1414
    return 0;
1415
  }
1416

1417
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1418
  if (pTask != NULL) {
1419
    // even in halt status, the data in inputQ must be processed
1420 1421 1422
    int8_t status = pTask->status.taskStatus;
    if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) {
      tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
dengyihao's avatar
dengyihao 已提交
1423
              pTask->chkInfo.version);
1424
      streamProcessRunReq(pTask);
1425
    } else {
1426
//      if (streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
1427
        atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
1428
//      }
1429 1430 1431

      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
              pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
1432
    }
1433

L
Liu Jicong 已提交
1434
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1435
    tqStartStreamTasks(pTq);
L
Liu Jicong 已提交
1436
    return 0;
1437
  } else {
1438
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
1439
    return -1;
L
Liu Jicong 已提交
1440
  }
L
Liu Jicong 已提交
1441 1442
}

L
Liu Jicong 已提交
1443
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
dengyihao's avatar
dengyihao 已提交
1444 1445 1446
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1447 1448 1449 1450

  SStreamDispatchReq req = {0};

  SDecoder decoder;
L
Liu Jicong 已提交
1451
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1452
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1453

1454
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1455
  if (pTask) {
1456
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1457
    streamProcessDispatchMsg(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1458
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1459
    return 0;
1460
  } else {
L
liuyao 已提交
1461
    tDeleteStreamDispatchReq(&req);
1462
    return -1;
L
Liu Jicong 已提交
1463
  }
L
Liu Jicong 已提交
1464 1465
}

L
Liu Jicong 已提交
1466 1467
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1468
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1469
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1470 1471

  int32_t vgId = pTq->pStreamMeta->vgId;
L
Liu Jicong 已提交
1472
  if (pTask) {
1473
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1474
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1475
    return 0;
1476
  } else {
1477
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
1478
    return TSDB_CODE_INVALID_MSG;
L
Liu Jicong 已提交
1479
  }
L
Liu Jicong 已提交
1480
}
L
Liu Jicong 已提交
1481

1482
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1483
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1484 1485
  tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);

1486
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1487
  return 0;
L
Liu Jicong 已提交
1488
}
L
Liu Jicong 已提交
1489

L
liuyao 已提交
1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503
int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) {
  if (pTask) {
    if (!streamTaskShouldPause(&pTask->status)) {
      tqDebug("vgId:%d s-task:%s set pause flag", pStreamMeta->vgId, pTask->id.idStr);
      atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
      atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
    }
    streamMetaReleaseTask(pStreamMeta, pTask);
  } else {
    return -1;
  }
  return 0;
}

5
54liuyao 已提交
1504 1505
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
dengyihao's avatar
dengyihao 已提交
1506
  SStreamTask*          pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
L
liuyao 已提交
1507 1508 1509 1510 1511
  int32_t code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pTask);
  if (code != 0) {
    return code;
  }
  SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId);
L
liuyao 已提交
1512 1513 1514
  if (pHistoryTask) {
    code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pHistoryTask);
  }
L
liuyao 已提交
1515 1516 1517 1518 1519
  return code;
}

int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int8_t igUntreated) {
  int32_t vgId = pTq->pStreamMeta->vgId;
L
liuyao 已提交
1520
  if (pTask) {
L
liuyao 已提交
1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543
    if (streamTaskShouldPause(&pTask->status)) {
      atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);

      // no lock needs to secure the access of the version
      if (igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
        // discard all the data  when the stream task is suspended.
        walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
        tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
                ", schedStatus:%d",
                vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
      } else {  // from the previous paused version and go on
        tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
                vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
      }

      if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
        streamStartRecoverTask(pTask, igUntreated);
      } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
        tqStartStreamTasks(pTq);
      } else {
        streamSchedExec(pTask);
      }
    }
L
liuyao 已提交
1544
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
liuyao 已提交
1545 1546
  } else {
    return -1;
L
liuyao 已提交
1547
  }
5
54liuyao 已提交
1548 1549 1550 1551 1552
  return 0;
}

int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
L
liuyao 已提交
1553
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
L
liuyao 已提交
1554 1555 1556
  int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
  if (code != 0) {
    return code;
L
liuyao 已提交
1557
  }
1558

L
liuyao 已提交
1559
  SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId);
L
liuyao 已提交
1560 1561 1562
  if (pHistoryTask) {
    code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
  }
L
liuyao 已提交
1563
  return code;
5
54liuyao 已提交
1564 1565
}

L
Liu Jicong 已提交
1566
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
1567 1568 1569 1570 1571
  char*    msgStr = pMsg->pCont;
  char*    msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t  msgLen = pMsg->contLen - sizeof(SMsgHead);
  SDecoder decoder;

L
Liu Jicong 已提交
1572
  SStreamRetrieveReq req;
1573
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1574
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1575
  tDecoderClear(&decoder);
1576

L
Liu Jicong 已提交
1577
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1578
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1579

L
Liu Jicong 已提交
1580
  if (pTask) {
1581
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
1582
    streamProcessRetrieveReq(pTask, &req, &rsp);
1583

L
Liu Jicong 已提交
1584
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1585
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1586
    return 0;
L
Liu Jicong 已提交
1587
  } else {
L
liuyao 已提交
1588
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1589
    return -1;
L
Liu Jicong 已提交
1590 1591 1592 1593 1594 1595 1596
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1597

1598 1599 1600 1601 1602 1603
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1604 1605 1606

  SStreamDispatchReq req;
  SDecoder           decoder;
1607
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1608 1609
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1610
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1611 1612
    goto FAIL;
  }
L
Liu Jicong 已提交
1613
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1614

L
Liu Jicong 已提交
1615
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1616

L
Liu Jicong 已提交
1617
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1618
  if (pTask) {
1619
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1620
    streamProcessDispatchMsg(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1621
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1622 1623
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1624
    return 0;
5
54liuyao 已提交
1625 1626
  } else {
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1627
  }
L
Liu Jicong 已提交
1628

1629 1630
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1631
FAIL:
1632 1633 1634 1635
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1636
    SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info};
1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1653
  SRpcMsg rsp = {
1654
      .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
1655
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1656
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1657 1658
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1659
  return -1;
L
Liu Jicong 已提交
1660
}
L
Liu Jicong 已提交
1661

1662
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1663