tq.c 47.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
#define WAL_READ_TASKS_ID (-1)
22

23
static int32_t tqInitialize(STQ* pTq);
dengyihao's avatar
dengyihao 已提交
24

wmmhello's avatar
wmmhello 已提交
25
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
dengyihao's avatar
dengyihao 已提交
26 27
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
wmmhello's avatar
wmmhello 已提交
28

L
Liu Jicong 已提交
29
int32_t tqInit() {
L
Liu Jicong 已提交
30 31 32 33 34 35
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

36 37 38 39 40 41
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
42 43 44
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
45
    atomic_store_8(&tqMgmt.inited, 1);
46
  }
47

L
Liu Jicong 已提交
48 49
  return 0;
}
L
Liu Jicong 已提交
50

51
void tqCleanUp() {
L
Liu Jicong 已提交
52 53 54 55 56 57 58 59
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
60
    streamCleanUp();
L
Liu Jicong 已提交
61 62
    atomic_store_8(&tqMgmt.inited, 0);
  }
63
}
L
Liu Jicong 已提交
64

65
void tqDestroyTqHandle(void* data) {
66 67
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
wmmhello's avatar
wmmhello 已提交
68

69
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
70
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
71
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
72
    tqReaderClose(pData->execHandle.pTqReader);
73 74
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
75
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
76
    walCloseReader(pData->pWalReader);
77
    tqReaderClose(pData->execHandle.pTqReader);
78 79
    taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
    nodesDestroyNode(pData->execHandle.execTb.node);
80
  }
dengyihao's avatar
dengyihao 已提交
81
  if (pData->msg != NULL) {
82 83 84
    rpcFreeCont(pData->msg->pCont);
    taosMemoryFree(pData->msg);
    pData->msg = NULL;
D
dapan1121 已提交
85
  }
L
Liu Jicong 已提交
86 87
}

88 89 90 91 92
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
93
STQ* tqOpen(const char* path, SVnode* pVnode) {
94
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
95
  if (pTq == NULL) {
S
Shengliang Guan 已提交
96
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
97 98
    return NULL;
  }
99

100
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
101
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
102
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
103

104
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
105
  taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle);
106

107
  taosInitRWLatch(&pTq->lock);
108
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
L
Liu Jicong 已提交
109

110
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
111
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
112

113 114 115 116 117 118 119
  int32_t code = tqInitialize(pTq);
  if (code != TSDB_CODE_SUCCESS) {
    tqClose(pTq);
    return NULL;
  } else {
    return pTq;
  }
120 121 122
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
123
  if (tqMetaOpen(pTq) < 0) {
124
    return -1;
125 126
  }

L
Liu Jicong 已提交
127 128
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
129
    return -1;
130 131
  }

132
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
133
  if (pTq->pStreamMeta == NULL) {
134
    return -1;
L
Liu Jicong 已提交
135 136
  }

137 138
  // the version is kept in task's meta data
  // todo check if this version is required or not
139 140
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
141 142
  }

143
  return 0;
L
Liu Jicong 已提交
144
}
L
Liu Jicong 已提交
145

L
Liu Jicong 已提交
146
void tqClose(STQ* pTq) {
147 148
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
149
  }
150 151 152 153 154 155 156 157 158

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
159
}
L
Liu Jicong 已提交
160

H
Haojun Liao 已提交
161 162 163 164 165 166 167 168 169 170 171 172 173
void tqNotifyClose(STQ* pTq) {
  if (pTq != NULL) {
    taosWLockLatch(&pTq->pStreamMeta->lock);

    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
      if (pIter == NULL) {
        break;
      }

      SStreamTask* pTask = *(SStreamTask**)pIter;
      tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
174 175 176
      pTask->status.taskStatus = TASK_STATUS__STOP;

      int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
177
      qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
178
      int64_t el = taosGetTimestampMs() - st;
H
Haojun Liao 已提交
179
      tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
H
Haojun Liao 已提交
180 181 182 183 184 185
    }

    taosWUnLockLatch(&pTq->pStreamMeta->lock);
  }
}

D
dapan1121 已提交
186 187
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
188 189
  int32_t len = 0;
  int32_t code = 0;
D
dapan1121 已提交
190 191

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
192
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
D
dapan1121 已提交
193 194 195
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
196 197 198 199 200 201 202 203 204 205 206

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

D
dapan1121 已提交
207 208 209
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
210 211 212 213 214 215

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);

D
dapan1121 已提交
216
  if (type == TMQ_MSG_TYPE__POLL_RSP) {
H
Haojun Liao 已提交
217
    tEncodeMqDataRsp(&encoder, pRsp);
D
dapan1121 已提交
218
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
X
Xiaoyu Wang 已提交
219
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
dengyihao's avatar
dengyihao 已提交
220
  }
L
Liu Jicong 已提交
221

wmmhello's avatar
wmmhello 已提交
222
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
223 224

  SRpcMsg rsp = {
D
dapan1121 已提交
225
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
226 227 228 229
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
L
Liu Jicong 已提交
230

L
Liu Jicong 已提交
231
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
232 233
  return 0;
}
L
Liu Jicong 已提交
234

H
Haojun Liao 已提交
235
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
236 237 238 239
  SMqDataRsp dataRsp = {0};
  dataRsp.head.consumerId = pHandle->consumerId;
  dataRsp.head.epoch = pHandle->epoch;
  dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
240 241

  int64_t sver = 0, ever = 0;
H
Haojun Liao 已提交
242
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
dengyihao's avatar
dengyihao 已提交
243 244
  tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
                  ever);
D
dapan1121 已提交
245

246 247
  char buf1[TSDB_OFFSET_LEN] = {0};
  char buf2[TSDB_OFFSET_LEN] = {0};
248 249
  tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
dengyihao's avatar
dengyihao 已提交
250 251
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
          dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
L
Liu Jicong 已提交
252 253 254
  return 0;
}

255 256 257 258 259 260
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
                      int32_t type, int32_t vgId) {
  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
261

262 263 264 265
  char buf1[TSDB_OFFSET_LEN] = {0};
  char buf2[TSDB_OFFSET_LEN] = {0};
  tFormatOffset(buf1, TSDB_OFFSET_LEN, &pRsp->reqOffset);
  tFormatOffset(buf2, TSDB_OFFSET_LEN, &pRsp->rspOffset);
266

267
  tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId,
dengyihao's avatar
dengyihao 已提交
268
          pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
269 270 271 272

  return 0;
}

273
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
274 275
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
276

X
Xiaoyu Wang 已提交
277 278
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
279
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
280 281
    return -1;
  }
282

283 284
  tDecoderClear(&decoder);

285 286 287
  STqOffset* pOffset = &vgOffset.offset;

  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
288
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
289 290 291 292 293 294
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
            pOffset->val.version);
    if (pOffset->val.version + 1 == sversion) {
      pOffset->val.version += 1;
295
    }
296
  } else {
297
    tqError("invalid commit offset type:%d", pOffset->val.type);
298
    return -1;
299
  }
300

301 302
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
303
    tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
304
            vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
305
    return 0;  // no need to update the offset value
306 307
  }

308
  // save the new offset value
309
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
310
    return -1;
311
  }
312

313 314 315
  return 0;
}

316
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
317 318
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
319 320 321

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
322
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
H
Haojun Liao 已提交
323
    tqError("vgId:%d failed to decode seek msg", vgId);
324 325 326 327 328
    return -1;
  }

  tDecoderClear(&decoder);

H
Haojun Liao 已提交
329 330 331
  tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
          vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);

332 333 334
  STqOffset* pOffset = &vgOffset.offset;
  if (pOffset->val.type != TMQ_OFFSET__LOG) {
    tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
335 336 337
    return -1;
  }

338 339
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
  if (pHandle == NULL) {
dengyihao's avatar
dengyihao 已提交
340
    tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey);
341 342
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
343 344
  }

345 346 347 348 349 350 351 352
  // 2. check consumer-vg assignment status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != vgOffset.consumerId) {
    tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
353
  }
354 355
  taosRUnLockLatch(&pTq->lock);

dengyihao's avatar
dengyihao 已提交
356
  // 3. check the offset info
357 358 359 360 361 362
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL) {
    if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
      return 0;  // no need to update the offset value
    }
363

364 365 366 367 368 369
    if (pSavedOffset->val.version == pOffset->val.version) {
      tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
              pOffset->val.version, pSavedOffset->val.version);
      return 0;
    }
  }
370 371 372

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
373 374 375 376
  if (pOffset->val.version < sver) {
    pOffset->val.version = sver;
  } else if (pOffset->val.version > ever) {
    pOffset->val.version = ever;
377 378 379
  }

  // save the new offset value
380 381 382 383
  if (pSavedOffset != NULL) {
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
            pSavedOffset->val.version);
  } else {
dengyihao's avatar
dengyihao 已提交
384
    tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version);
385
  }
386

387 388
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
    tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
389 390 391
    return -1;
  }

H
Haojun Liao 已提交
392 393 394
  tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
          vgOffset.consumerId, vgOffset.offset.val.version);

395 396 397
  return 0;
}

L
Liu Jicong 已提交
398
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
399
  void* pIter = NULL;
400

L
Liu Jicong 已提交
401
  while (1) {
402
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
403 404 405 406
    if (pIter == NULL) {
      break;
    }

407
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
408

L
Liu Jicong 已提交
409 410
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
411
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
412 413
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
414
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
415 416 417 418 419
          return -1;
        }
      }
    }
  }
420

L
Liu Jicong 已提交
421 422 423
  return 0;
}

424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
  int32_t vgId = TD_VID(pTq->pVnode);
  taosWLockLatch(&pTq->lock);
  if (taosHashGetSize(pTq->pPushMgr) > 0) {
    void* pIter = taosHashIterate(pTq->pPushMgr, NULL);

    while (pIter) {
      STqHandle* pHandle = *(STqHandle**)pIter;
      tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);

      if (ASSERT(pHandle->msg != NULL)) {
        tqError("pHandle->msg should not be null");
        break;
      }else{
        SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
        tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
        taosMemoryFree(pHandle->msg);
        pHandle->msg = NULL;
      }

      pIter = taosHashIterate(pTq->pPushMgr, pIter);
    }

    taosHashClear(pTq->pPushMgr);
  }
wmmhello's avatar
wmmhello 已提交
449
  taosWUnLockLatch(&pTq->lock);
450 451 452
  return 0;
}

D
dapan1121 已提交
453
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
454
  SMqPollReq req = {0};
dengyihao's avatar
dengyihao 已提交
455
  int        code = 0;
D
dapan1121 已提交
456 457 458 459 460 461 462 463 464 465
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);
wmmhello's avatar
wmmhello 已提交
466
  STqHandle*   pHandle = NULL;
D
dapan1121 已提交
467

wmmhello's avatar
wmmhello 已提交
468 469 470 471 472 473 474 475 476 477
  while (1) {
    taosWLockLatch(&pTq->lock);
    // 1. find handle
    pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
    if (pHandle == NULL) {
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_MSG;
      taosWUnLockLatch(&pTq->lock);
      return -1;
    }
D
dapan1121 已提交
478

479 480
    // 2. check re-balance status
    if (pHandle->consumerId != consumerId) {
dengyihao's avatar
dengyihao 已提交
481 482
      tqError("ERROR tmq poll: consumer:0x%" PRIx64
              " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
483
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
484
      terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
485 486 487
      taosWUnLockLatch(&pTq->lock);
      return -1;
    }
wmmhello's avatar
wmmhello 已提交
488

wmmhello's avatar
wmmhello 已提交
489
    bool exec = tqIsHandleExec(pHandle);
dengyihao's avatar
dengyihao 已提交
490
    if (!exec) {
wmmhello's avatar
wmmhello 已提交
491
      tqSetHandleExec(pHandle);
dengyihao's avatar
dengyihao 已提交
492 493 494
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
      tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
              req.subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
495 496 497
      taosWUnLockLatch(&pTq->lock);
      break;
    }
498
    taosWUnLockLatch(&pTq->lock);
499

dengyihao's avatar
dengyihao 已提交
500 501 502
    tqDebug("tmq poll: consumer:0x%" PRIx64
            "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
            consumerId, vgId, req.subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
503
    taosMsleep(10);
D
dapan1121 已提交
504 505 506
  }

  // 3. update the epoch value
507
  if (pHandle->epoch < reqEpoch) {
dengyihao's avatar
dengyihao 已提交
508
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
X
Xiaoyu Wang 已提交
509
            reqEpoch);
D
dapan1121 已提交
510 511 512
    pHandle->epoch = reqEpoch;
  }

513 514
  char buf[TSDB_OFFSET_LEN];
  tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
D
dapan1121 已提交
515 516 517
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);

wmmhello's avatar
wmmhello 已提交
518
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
519
  tqSetHandleIdle(pHandle);
520

dengyihao's avatar
dengyihao 已提交
521 522
  tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId,
          req.subKey, pHandle);
523
  return code;
D
dapan1121 已提交
524 525
}

526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  // 2. check re-balance status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, vgId, req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
  }
  taosRUnLockLatch(&pTq->lock);

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);

  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
  if (pOffset != NULL) {
    if (pOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
    dataRsp.rspOffset.version = pOffset->val.version;
  } else {
    if (req.useSnapshot == true) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;

584
    if (reqOffset.type == TMQ_OFFSET__LOG) {
585
      int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
dengyihao's avatar
dengyihao 已提交
586
      if (currentVer == -1) {  // not start to read data from wal yet, return req offset directly
587 588 589 590
        dataRsp.rspOffset.version = reqOffset.version;
      } else {
        dataRsp.rspOffset.version = currentVer;  // return current consume offset value
      }
591
    } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
592
      dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
593 594 595 596 597 598 599 600 601 602 603 604 605 606 607
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      dataRsp.rspOffset.version = ever;
    } else {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
              reqOffset.type);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }
  }

  tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
  return 0;
}

608
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
609
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
dengyihao's avatar
dengyihao 已提交
610
  int32_t        vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
611

612
  tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
wmmhello's avatar
wmmhello 已提交
613
  int32_t code = 0;
L
Liu Jicong 已提交
614

wmmhello's avatar
wmmhello 已提交
615
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
616 617
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
618
    while (tqIsHandleExec(pHandle)) {
dengyihao's avatar
dengyihao 已提交
619 620
      tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId,
              pHandle->subKey, pHandle);
wmmhello's avatar
wmmhello 已提交
621
      taosMsleep(10);
622
    }
623

L
Liu Jicong 已提交
624 625 626
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
627

L
Liu Jicong 已提交
628 629 630 631
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
632
  }
633

L
Liu Jicong 已提交
634 635
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
636
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
637
  }
L
Liu Jicong 已提交
638

L
Liu Jicong 已提交
639
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
640
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
641
  }
wmmhello's avatar
wmmhello 已提交
642 643
  taosWUnLockLatch(&pTq->lock);

L
Liu Jicong 已提交
644
  return 0;
L
Liu Jicong 已提交
645 646
}

647
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
648 649
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
650
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
651
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
652 653 654 655
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
656 657 658 659 660
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
661 662 663 664 665 666
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

667
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
668 669 670 671 672 673 674 675 676 677 678
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

679
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
dengyihao's avatar
dengyihao 已提交
680
  int         ret = 0;
L
Liu Jicong 已提交
681
  SMqRebVgReq req = {0};
dengyihao's avatar
dengyihao 已提交
682
  SDecoder    dc = {0};
683 684 685 686 687 688 689 690 691

  tDecoderInit(&dc, msg, msgLen);

  // decode req
  if (tDecodeSMqRebVgReq(&dc, &req) < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    tDecoderClear(&dc);
    return -1;
  }
L
Liu Jicong 已提交
692

693
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
D
dapan1121 已提交
694
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
695

696 697 698 699 700 701 702 703
  STqHandle* pHandle = NULL;
  while(1){
    pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
    if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0){
      break;
    }
  }

L
Liu Jicong 已提交
704
  if (pHandle == NULL) {
L
Liu Jicong 已提交
705
    if (req.oldConsumerId != -1) {
706
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
707
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
708
    }
L
Liu Jicong 已提交
709
    if (req.newConsumerId == -1) {
710
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
711
      goto end;
L
Liu Jicong 已提交
712
    }
713 714 715 716
    STqHandle handle = {0};
    ret = tqCreateHandle(pTq, &req, &handle);
    if(ret < 0){
      tqDestroyTqHandle(&handle);
717
      goto end;
718
    }
719
    ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
L
Liu Jicong 已提交
720
  } else {
721
    taosWLockLatch(&pTq->lock);
wmmhello's avatar
wmmhello 已提交
722

D
dapan1121 已提交
723
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
dengyihao's avatar
dengyihao 已提交
724 725
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId,
             req.newConsumerId);
726 727 728
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
729 730
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
    }
dengyihao's avatar
dengyihao 已提交
731
    //    atomic_add_fetch_32(&pHandle->epoch, 1);
732

733
    // kill executing task
dengyihao's avatar
dengyihao 已提交
734 735 736 737 738 739 740 741 742 743
    //    if(tqIsHandleExec(pHandle)) {
    //      qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
    //      if (pTaskInfo != NULL) {
    //        qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
    //      }

    //      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
    //        qStreamCloseTsdbReader(pTaskInfo);
    //      }
    //    }
wmmhello's avatar
wmmhello 已提交
744 745
    // remove if it has been register in the push manager, and return one empty block to consumer
    tqUnregisterPushHandle(pTq, pHandle);
746
    taosWUnLockLatch(&pTq->lock);
747
    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
L
Liu Jicong 已提交
748
  }
L
Liu Jicong 已提交
749

750
end:
751
  tDecoderClear(&dc);
752
  return ret;
L
Liu Jicong 已提交
753
}
754

dengyihao's avatar
dengyihao 已提交
755
void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
L
liuyao 已提交
756

757
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
D
dapan1121 已提交
758
  int32_t vgId = TD_VID(pTq->pVnode);
759

760
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
761
  pTask->refCnt = 1;
762
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
dengyihao's avatar
dengyihao 已提交
763 764
  pTask->inputQueue = streamQueueOpen(512 << 10);
  pTask->outputQueue = streamQueueOpen(512 << 10);
L
Liu Jicong 已提交
765 766

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
767
    return -1;
L
Liu Jicong 已提交
768 769
  }

L
Liu Jicong 已提交
770 771
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
772
  pTask->pMsgCb = &pTq->pVnode->msgCb;
773
  pTask->pMeta = pTq->pStreamMeta;
774
  pTask->chkInfo.version = ver;
775
  pTask->chkInfo.currentVer = ver;
776

777
  // expand executor
dengyihao's avatar
dengyihao 已提交
778
  pTask->status.taskStatus = (pTask->fillHistory) ? TASK_STATUS__WAIT_DOWNSTREAM : TASK_STATUS__NORMAL;
779

780
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
781
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
782 783 784 785
    if (pTask->pState == NULL) {
      return -1;
    }

786
    SReadHandle handle = {.vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
787
    initStorageAPI(&handle.api);
788

789 790
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
791 792
      return -1;
    }
793

794
    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
795
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
796
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
797 798 799
    if (pTask->pState == NULL) {
      return -1;
    }
800

801
    int32_t     numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo);
802 803
    SReadHandle handle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};
    initStorageAPI(&handle.api);
804

805
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
806
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
807 808
      return -1;
    }
809 810

    qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
L
Liu Jicong 已提交
811
  }
L
Liu Jicong 已提交
812 813

  // sink
814
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
815
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
816
    pTask->smaSink.smaSink = smaHandleRes;
817
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
818
    pTask->tbSink.vnode = pTq->pVnode;
H
Haojun Liao 已提交
819
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
L
Liu Jicong 已提交
820

X
Xiaoyu Wang 已提交
821
    int32_t   ver1 = 1;
5
54liuyao 已提交
822
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
823
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
824
    if (code == TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
825
      ver1 = info.skmVer;
5
54liuyao 已提交
826
    }
L
Liu Jicong 已提交
827

828 829
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
830
    if (pTask->tbSink.pTSchema == NULL) {
D
dapan1121 已提交
831 832
      return -1;
    }
L
liuyao 已提交
833 834
    pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
    tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
L
Liu Jicong 已提交
835
  }
836

837
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
838
    SWalFilterCond cond = {.deleteMsg = 1};  // delete msg also extract from wal files
839
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
840 841
  }

842
  streamSetupTrigger(pTask);
843

dengyihao's avatar
dengyihao 已提交
844 845
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId,
         pTask->id.idStr, pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
846 847 848

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
849
  return 0;
L
Liu Jicong 已提交
850
}
L
Liu Jicong 已提交
851

852
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
853 854 855 856
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);

857 858
  SStreamTaskCheckReq req;
  SDecoder            decoder;
859

X
Xiaoyu Wang 已提交
860
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
861 862
  tDecodeSStreamTaskCheckReq(&decoder, &req);
  tDecoderClear(&decoder);
863

864 865 866 867 868 869 870 871 872 873
  int32_t             taskId = req.downstreamTaskId;
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
874

L
Liu Jicong 已提交
875
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
876

877
  if (pTask != NULL) {
878
    rsp.status = streamTaskCheckStatus(pTask);
879 880
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

881
    tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64
882
            ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d",
883 884
            pTask->id.idStr, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus,
            rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
885 886
  } else {
    rsp.status = 0;
887 888
    tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
            ") %d at node %d, check req from task:0x%x at node %d, rsp status %d",
889 890
            taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            rsp.status);
891 892 893 894 895
  }

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
896

897 898
  tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
  if (code < 0) {
899
    tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId);
L
Liu Jicong 已提交
900
    return -1;
901
  }
L
Liu Jicong 已提交
902

903 904 905 906 907 908 909 910
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
  tEncoderClear(&encoder);

911
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
912

913 914 915 916
  tmsgSendRsp(&rspMsg);
  return 0;
}

917
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
918 919 920 921 922 923
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
924

925 926 927 928 929
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

930
  tDecoderClear(&decoder);
931
  tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task:0x%x at node %d, status %d",
932 933
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

L
Liu Jicong 已提交
934
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
935
  if (pTask == NULL) {
936 937
    tqError("tq failed to locate the stream task:0x%x vgId:%d, it may have been destroyed", rsp.upstreamTaskId,
            pTq->pStreamMeta->vgId);
938 939 940
    return -1;
  }

941
  code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
L
Liu Jicong 已提交
942 943
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
944 945
}

946
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
947 948 949
  int32_t code = 0;
  int32_t vgId = TD_VID(pTq->pVnode);

5
54liuyao 已提交
950 951 952
  if (tsDisableStream) {
    return 0;
  }
953 954 955 956

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
957
    terrno = TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
958 959
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId,
            (int32_t)sizeof(SStreamTask));
960 961
    return -1;
  }
962

963 964
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
965
  code = tDecodeStreamTask(&decoder, pTask);
966 967 968 969 970
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
971

972 973
  tDecoderClear(&decoder);

974
  // 2.save task, use the newest commit version as the initial start version of stream task.
975
  taosWLockLatch(&pTq->pStreamMeta->lock);
976
  code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
977
  int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
978
  if (code < 0) {
979
    tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
980
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
981 982 983
    return -1;
  }

984 985
  taosWUnLockLatch(&pTq->pStreamMeta->lock);

986 987
  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
988
    streamTaskCheckDownstream(pTask, sversion);
989 990
  }

991
  tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr,
992
          pTask->status.taskStatus, numOfTasks);
993 994 995
  return 0;
}

L
Liu Jicong 已提交
996 997 998 999 1000
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
  int32_t code;
  char*   msg = pMsg->pCont;
  int32_t msgLen = pMsg->contLen;

1001
  SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
L
Liu Jicong 已提交
1002
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1003 1004 1005 1006 1007
  if (pTask == NULL) {
    return -1;
  }

  // check param
1008
  int64_t fillVer1 = pTask->chkInfo.version;
1009
  if (fillVer1 <= 0) {
L
Liu Jicong 已提交
1010
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1011 1012 1013 1014
    return -1;
  }

  // do recovery step 1
H
Haojun Liao 已提交
1015
  tqDebug("s-task:%s start non-blocking recover stage(step 1) scan", pTask->id.idStr);
H
Haojun Liao 已提交
1016
  int64_t st = taosGetTimestampMs();
1017

H
Haojun Liao 已提交
1018
  streamSourceRecoverScanStep1(pTask);
1019
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
H
Haojun Liao 已提交
1020 1021
    tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);

L
Liu Jicong 已提交
1022 1023 1024 1025
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

H
Haojun Liao 已提交
1026
  double el = (taosGetTimestampMs() - st) / 1000.0;
H
Haojun Liao 已提交
1027
  tqDebug("s-task:%s non-blocking recover stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1028

1029 1030 1031 1032
  // build msg to launch next step
  SStreamRecoverStep2Req req;
  code = streamBuildSourceRecover2Req(pTask, &req);
  if (code < 0) {
L
Liu Jicong 已提交
1033
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1034 1035 1036
    return -1;
  }

L
Liu Jicong 已提交
1037
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1038
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1039 1040 1041
    return 0;
  }

1042
  // serialize msg
L
Liu Jicong 已提交
1043 1044 1045 1046
  int32_t len = sizeof(SStreamRecoverStep1Req);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
H
Haojun Liao 已提交
1047
    tqError("s-task:%s failed to prepare the step2 stage, out of memory", pTask->id.idStr);
L
Liu Jicong 已提交
1048 1049 1050 1051
    return -1;
  }

  memcpy(serializedReq, &req, len);
1052 1053

  // dispatch msg
H
Haojun Liao 已提交
1054
  tqDebug("s-task:%s start recover block stage", pTask->id.idStr);
1055

H
Haojun Liao 已提交
1056 1057
  SRpcMsg rpcMsg = {
      .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
1058 1059 1060 1061
  tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
  return 0;
}

1062
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
H
Haojun Liao 已提交
1063 1064
  int32_t code = 0;

1065
  SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
H
Haojun Liao 已提交
1066 1067

  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1068 1069 1070 1071 1072
  if (pTask == NULL) {
    return -1;
  }

  // do recovery step 2
H
Haojun Liao 已提交
1073
  int64_t st = taosGetTimestampMs();
dengyihao's avatar
dengyihao 已提交
1074
  tqDebug("s-task:%s start step2 recover, ts:%" PRId64, pTask->id.idStr, st);
H
Haojun Liao 已提交
1075

1076
  code = streamSourceRecoverScanStep2(pTask, sversion);
1077
  if (code < 0) {
L
Liu Jicong 已提交
1078
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1079 1080 1081
    return -1;
  }

1082
  qDebug("s-task:%s set start wal scan start ver:%"PRId64, pTask->id.idStr, sversion);
1083

1084
  walReaderSeekVer(pTask->exec.pWalReader, sversion);
L
liuyao 已提交
1085
  pTask->chkInfo.currentVer = sversion;
1086

1087
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
1088 1089 1090 1091
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1092 1093 1094
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1095
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1096 1097 1098 1099
    return -1;
  }

  // set status normal
H
Haojun Liao 已提交
1100
  tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr);
1101 1102
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1103
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1104 1105 1106
    return -1;
  }

dengyihao's avatar
dengyihao 已提交
1107
  double el = (taosGetTimestampMs() - st) / 1000.0;
H
Haojun Liao 已提交
1108
  tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
H
Haojun Liao 已提交
1109

1110 1111 1112
  // dispatch recover finish req to all related downstream task
  code = streamDispatchRecoverFinishReq(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1113
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1114 1115 1116
    return -1;
  }

L
Liu Jicong 已提交
1117 1118 1119
  atomic_store_8(&pTask->fillHistory, 0);
  streamMetaSaveTask(pTq->pStreamMeta, pTask);

L
Liu Jicong 已提交
1120
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1121 1122 1123
  return 0;
}

L
Liu Jicong 已提交
1124 1125 1126
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1127 1128

  // deserialize
1129 1130 1131
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1132
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1133 1134 1135
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

1136
  // find task
L
Liu Jicong 已提交
1137
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1138 1139 1140
  if (pTask == NULL) {
    return -1;
  }
1141
  // do process request
1142
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
1143
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1144 1145 1146
    return -1;
  }

L
Liu Jicong 已提交
1147
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1148
  return 0;
L
Liu Jicong 已提交
1149
}
L
Liu Jicong 已提交
1150

L
Liu Jicong 已提交
1151 1152 1153 1154 1155
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

1156 1157 1158 1159
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

H
Haojun Liao 已提交
1160 1161
  *pRefBlock = NULL;

1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tDecoderInit(pCoder, (uint8_t*)pData, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
  if (numOfTables == 0 || pRes->affectedRows == 0) {
    taosArrayDestroy(pRes->uidList);
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, numOfTables);
  pDelBlock->info.rows = numOfTables;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < numOfTables; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataSetVal(pUidCol, i, (const char*)pUid, false);

    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

  taosArrayDestroy(pRes->uidList);
  *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
  if (pRefBlock == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
  (*pRefBlock)->pBlock = pDelBlock;
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1209 1210
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1211 1212 1213 1214

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1215 1216
  if (taskId == WAL_READ_TASKS_ID) {  // all tasks are extracted submit data from the wal
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1217
    return 0;
1218
  }
1219

1220
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1221 1222
  if (pTask != NULL) {
    if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
dengyihao's avatar
dengyihao 已提交
1223 1224
      tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, pTask->id.idStr,
              pTask->chkInfo.version);
1225
      streamProcessRunReq(pTask);
1226
    } else {
L
liuyao 已提交
1227
      if (streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
1228
        atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
L
liuyao 已提交
1229
      }
1230
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
1231
    }
1232

L
Liu Jicong 已提交
1233
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1234
    tqStartStreamTasks(pTq);
L
Liu Jicong 已提交
1235
    return 0;
1236
  } else {
1237
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
1238
    return -1;
L
Liu Jicong 已提交
1239
  }
L
Liu Jicong 已提交
1240 1241
}

L
Liu Jicong 已提交
1242
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
dengyihao's avatar
dengyihao 已提交
1243 1244 1245
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1246 1247 1248 1249

  SStreamDispatchReq req = {0};

  SDecoder decoder;
L
Liu Jicong 已提交
1250
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1251
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1252

1253
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1254
  if (pTask) {
1255
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1256
    streamProcessDispatchMsg(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1257
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1258
    return 0;
1259
  } else {
L
liuyao 已提交
1260
    tDeleteStreamDispatchReq(&req);
1261
    return -1;
L
Liu Jicong 已提交
1262
  }
L
Liu Jicong 已提交
1263 1264
}

L
Liu Jicong 已提交
1265 1266
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1267
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1268
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1269 1270

  int32_t vgId = pTq->pStreamMeta->vgId;
L
Liu Jicong 已提交
1271
  if (pTask) {
1272
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1273
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1274
    return 0;
1275
  } else {
1276
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
1277
    return TSDB_CODE_INVALID_MSG;
L
Liu Jicong 已提交
1278
  }
L
Liu Jicong 已提交
1279
}
L
Liu Jicong 已提交
1280

1281
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1282
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1283 1284
  tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);

1285
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1286
  return 0;
L
Liu Jicong 已提交
1287
}
L
Liu Jicong 已提交
1288

5
54liuyao 已提交
1289 1290
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
dengyihao's avatar
dengyihao 已提交
1291
  SStreamTask*          pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
L
liuyao 已提交
1292 1293
  if (pTask) {
    tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
L
liuyao 已提交
1294
    atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
L
liuyao 已提交
1295 1296 1297
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  }
5
54liuyao 已提交
1298 1299 1300 1301 1302
  return 0;
}

int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
1303 1304

  int32_t      vgId = pTq->pStreamMeta->vgId;
L
liuyao 已提交
1305 1306
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
  if (pTask) {
L
liuyao 已提交
1307
    atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
1308 1309

    // no lock needs to secure the access of the version
1310 1311 1312 1313 1314 1315
    if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) {
      // discard all the data  when the stream task is suspended.
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
              ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
1316
    } else {  // from the previous paused version and go on
1317 1318
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
              vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
1319 1320
    }

L
liuyao 已提交
1321
    if (pTask->taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
L
liuyao 已提交
1322 1323 1324 1325
      tqStartStreamTasks(pTq);
    } else {
      streamSchedExec(pTask);
    }
L
liuyao 已提交
1326
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1327 1328
  } else {
    tqError("vgId:%d failed to find the s-task:0x%x for resume stream task", vgId, pReq->taskId);
L
liuyao 已提交
1329
  }
1330

5
54liuyao 已提交
1331 1332 1333
  return 0;
}

L
Liu Jicong 已提交
1334 1335 1336 1337 1338 1339
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1340
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1341
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1342
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1343
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1344
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1345
  if (pTask) {
1346
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
L
Liu Jicong 已提交
1347
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1348
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1349
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1350
    return 0;
L
Liu Jicong 已提交
1351
  } else {
L
liuyao 已提交
1352
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1353
    return -1;
L
Liu Jicong 已提交
1354 1355 1356 1357 1358 1359 1360
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1361

1362 1363 1364 1365 1366 1367
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1368 1369 1370

  SStreamDispatchReq req;
  SDecoder           decoder;
1371
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1372 1373
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1374
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1375 1376
    goto FAIL;
  }
L
Liu Jicong 已提交
1377
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1378

L
Liu Jicong 已提交
1379
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1380

L
Liu Jicong 已提交
1381
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1382
  if (pTask) {
1383
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
1384
    streamProcessDispatchMsg(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1385
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1386 1387
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1388
    return 0;
5
54liuyao 已提交
1389 1390
  } else {
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1391
  }
L
Liu Jicong 已提交
1392

1393 1394
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1395
FAIL:
1396 1397 1398 1399
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1400
    SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info};
1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1417
  SRpcMsg rsp = {
1418
      .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
1419
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1420
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1421 1422
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1423
  return -1;
L
Liu Jicong 已提交
1424
}
L
Liu Jicong 已提交
1425

1426
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1427

1428
int32_t tqStartStreamTasks(STQ* pTq) {
1429
  int32_t      vgId = TD_VID(pTq->pVnode);
1430
  SStreamMeta* pMeta = pTq->pStreamMeta;
1431

1432
  taosWLockLatch(&pMeta->lock);
1433

1434
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
1435
  if (numOfTasks == 0) {
1436
    tqInfo("vgId:%d no stream tasks exist", vgId);
1437
    taosWUnLockLatch(&pMeta->lock);
1438 1439 1440
    return 0;
  }

1441
  pMeta->walScanCounter += 1;
1442

1443 1444
  if (pMeta->walScanCounter > 1) {
    tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
1445
    taosWUnLockLatch(&pMeta->lock);
1446 1447 1448
    return 0;
  }

1449 1450 1451
  SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
  if (pRunReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1452
    tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
1453
    taosWUnLockLatch(&pMeta->lock);
1454 1455 1456
    return -1;
  }

H
Haojun Liao 已提交
1457
  tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks);
1458 1459
  pRunReq->head.vgId = vgId;
  pRunReq->streamId = 0;
1460
  pRunReq->taskId = WAL_READ_TASKS_ID;
1461 1462 1463

  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
  tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
1464
  taosWUnLockLatch(&pMeta->lock);
1465 1466 1467

  return 0;
}