tq.c 46.2 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

dengyihao's avatar
dengyihao 已提交
18 19 20
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
21
#define WAL_READ_TASKS_ID       (-1)
22

23 24
static int32_t tqInitialize(STQ* pTq);

L
Liu Jicong 已提交
25
int32_t tqInit() {
L
Liu Jicong 已提交
26 27 28 29 30 31
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

32 33 34 35 36 37
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
38 39 40
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
41
    atomic_store_8(&tqMgmt.inited, 1);
42
  }
43

L
Liu Jicong 已提交
44 45
  return 0;
}
L
Liu Jicong 已提交
46

47
void tqCleanUp() {
L
Liu Jicong 已提交
48 49 50 51 52 53 54 55
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
56
    streamCleanUp();
L
Liu Jicong 已提交
57 58
    atomic_store_8(&tqMgmt.inited, 0);
  }
59
}
L
Liu Jicong 已提交
60

61
static void destroyTqHandle(void* data) {
62 63 64
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
65
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
66
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
67
    tqCloseReader(pData->execHandle.pTqReader);
68 69
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
70
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
71
    walCloseReader(pData->pWalReader);
72
    tqCloseReader(pData->execHandle.pTqReader);
73 74 75
  }
}

L
Liu Jicong 已提交
76
static void tqPushEntryFree(void* data) {
L
Liu Jicong 已提交
77
  STqPushEntry* p = *(void**)data;
H
Haojun Liao 已提交
78
  if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
79
    tDeleteMqDataRsp(p->pDataRsp);
H
Haojun Liao 已提交
80 81 82 83 84
  } else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) {
    tDeleteSTaosxRsp((STaosxRsp*)p->pDataRsp);
  }

  taosMemoryFree(p->pDataRsp);
L
Liu Jicong 已提交
85 86 87
  taosMemoryFree(p);
}

88 89 90 91 92
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

L
Liu Jicong 已提交
93
STQ* tqOpen(const char* path, SVnode* pVnode) {
94
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
95
  if (pTq == NULL) {
S
Shengliang Guan 已提交
96
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
97 98
    return NULL;
  }
99

100
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
101
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
102
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
103

104
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
105
  taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
106

107
  taosInitRWLatch(&pTq->lock);
L
Liu Jicong 已提交
108
  pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
L
Liu Jicong 已提交
109
  taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree);
L
Liu Jicong 已提交
110

111
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
112
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
113

114
  tqInitialize(pTq);
115 116 117 118
  return pTq;
}

int32_t tqInitialize(STQ* pTq) {
L
Liu Jicong 已提交
119
  if (tqMetaOpen(pTq) < 0) {
120
    return -1;
121 122
  }

L
Liu Jicong 已提交
123 124
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
125
    return -1;
126 127
  }

128
  pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
129
  if (pTq->pStreamMeta == NULL) {
130
    return -1;
L
Liu Jicong 已提交
131 132
  }

133 134
  // the version is kept in task's meta data
  // todo check if this version is required or not
135 136
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
    return -1;
L
Liu Jicong 已提交
137 138
  }

139
  return 0;
L
Liu Jicong 已提交
140
}
L
Liu Jicong 已提交
141

L
Liu Jicong 已提交
142
void tqClose(STQ* pTq) {
143 144
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
145
  }
146 147 148 149 150 151 152 153 154

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
155
}
L
Liu Jicong 已提交
156

157
int32_t tqPushDataRsp(STqPushEntry* pPushEntry, int32_t vgId) {
H
Haojun Liao 已提交
158 159
  SMqDataRsp* pRsp = pPushEntry->pDataRsp;
  SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
160 161 162 163 164

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pPushEntry->pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  tqDoSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType, sver, ever);
L
Liu Jicong 已提交
165

wmmhello's avatar
wmmhello 已提交
166 167
  char buf1[80] = {0};
  char buf2[80] = {0};
H
Haojun Liao 已提交
168 169 170
  tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
171
          vgId, pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
L
Liu Jicong 已提交
172 173 174
  return 0;
}

175 176 177 178 179 180
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
                      int32_t type, int32_t vgId) {
  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
181 182 183 184 185 186

  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);

X
Xiaoyu Wang 已提交
187
  tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
188
          vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
H
Haojun Liao 已提交
189

190 191 192
  return 0;
}

193
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
194 195
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
196

X
Xiaoyu Wang 已提交
197 198
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
199
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
200 201
    return -1;
  }
202

203 204
  tDecoderClear(&decoder);

205 206 207
  STqOffset* pOffset = &vgOffset.offset;

  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
208
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
209 210 211 212 213 214
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
            pOffset->val.version);
    if (pOffset->val.version + 1 == sversion) {
      pOffset->val.version += 1;
215
    }
216
  } else {
217
    tqError("invalid commit offset type:%d", pOffset->val.type);
218
    return -1;
219
  }
220

221 222
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
223
    tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
224
            vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
225
    return 0;  // no need to update the offset value
226 227
  }

228
  // save the new offset value
229
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
230
    return -1;
231
  }
232

233 234 235
  if (pOffset->val.type == TMQ_OFFSET__LOG) {
    STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
    if (pHandle && (walRefVer(pHandle->pRef, pOffset->val.version) < 0)) {
236
      return -1;
237 238 239
    }
  }

240 241 242
  return 0;
}

243
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
244 245
  SMqVgOffset vgOffset = {0};
  int32_t     vgId = TD_VID(pTq->pVnode);
246 247 248

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
249
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
250 251 252 253 254
    return -1;
  }

  tDecoderClear(&decoder);

255 256 257
  STqOffset* pOffset = &vgOffset.offset;
  if (pOffset->val.type != TMQ_OFFSET__LOG) {
    tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
258 259 260
    return -1;
  }

261 262 263 264 265 266
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
  if (pHandle == NULL) {
    tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId,
        pOffset->subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
267 268
  }

269 270 271 272 273 274 275 276
  // 2. check consumer-vg assignment status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != vgOffset.consumerId) {
    tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
277
  }
278 279 280 281 282 283 284 285 286
  taosRUnLockLatch(&pTq->lock);

  //3. check the offset info
  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
  if (pSavedOffset != NULL) {
    if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
      return 0;  // no need to update the offset value
    }
287

288 289 290 291 292 293
    if (pSavedOffset->val.version == pOffset->val.version) {
      tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
              pOffset->val.version, pSavedOffset->val.version);
      return 0;
    }
  }
294 295 296

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
297 298 299 300
  if (pOffset->val.version < sver) {
    pOffset->val.version = sver;
  } else if (pOffset->val.version > ever) {
    pOffset->val.version = ever;
301 302 303
  }

  // save the new offset value
304
  tqDebug("vgId:%d sub:%s seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
305 306
          pSavedOffset->val.version);

307 308
  if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
    tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
309 310 311 312 313 314
    return -1;
  }

  return 0;
}

L
Liu Jicong 已提交
315
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
316
  void* pIter = NULL;
317

L
Liu Jicong 已提交
318
  while (1) {
319
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
320 321 322 323
    if (pIter == NULL) {
      break;
    }

324
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
325

L
Liu Jicong 已提交
326 327
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
328
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
329 330
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
331
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
332 333 334 335 336
          return -1;
        }
      }
    }
  }
337

L
Liu Jicong 已提交
338 339 340
  return 0;
}

341
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
X
Xiaoyu Wang 已提交
342
  SMqPollReq req = {0};
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

362
  // 2. check re-balance status
363
  taosRLockLatch(&pTq->lock);
364 365 366 367
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
368
    taosRUnLockLatch(&pTq->lock);
369 370
    return -1;
  }
371
  taosRUnLockLatch(&pTq->lock);
372

373
  // 3. update the epoch value
374
  taosWLockLatch(&pTq->lock);
H
Haojun Liao 已提交
375 376
  int32_t savedEpoch = pHandle->epoch;
  if (savedEpoch < reqEpoch) {
X
Xiaoyu Wang 已提交
377 378
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
            reqEpoch);
379
    pHandle->epoch = reqEpoch;
H
Haojun Liao 已提交
380
  }
381
  taosWUnLockLatch(&pTq->lock);
382 383 384

  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
H
Haojun Liao 已提交
385 386
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
387

388
  return tqExtractDataForMq(pTq, pHandle, &req, pMsg);
389 390
}

391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  // 2. check re-balance status
  taosRLockLatch(&pTq->lock);
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, vgId, req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
    taosRUnLockLatch(&pTq->lock);
    return -1;
  }
  taosRUnLockLatch(&pTq->lock);

  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);

  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, &req);

  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
  if (pOffset != NULL) {
    if (pOffset->val.type != TMQ_OFFSET__LOG) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
    dataRsp.rspOffset.version = pOffset->val.version;
  } else {
    if (req.useSnapshot == true) {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }

    dataRsp.rspOffset.type = TMQ_OFFSET__LOG;

    if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
      dataRsp.rspOffset.version = sver;
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      dataRsp.rspOffset.version = ever;
    } else {
      tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
              reqOffset.type);
      terrno = TSDB_CODE_INVALID_PARA;
      tDeleteMqDataRsp(&dataRsp);
      return -1;
    }
  }

  tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
  return 0;
}

466
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
467
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
L
Liu Jicong 已提交
468

L
Liu Jicong 已提交
469
  tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
L
Liu Jicong 已提交
470

471
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
472 473 474 475
  int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
  if (code != 0) {
    tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
  }
476
  taosWUnLockLatch(&pTq->lock);
L
Liu Jicong 已提交
477

L
Liu Jicong 已提交
478 479
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
X
Xiaoyu Wang 已提交
480
    // walCloseRef(pHandle->pWalReader->pWal, pHandle->pRef->refId);
L
Liu Jicong 已提交
481 482 483 484 485 486 487
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
488
  }
489

L
Liu Jicong 已提交
490 491
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
492
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
493
  }
L
Liu Jicong 已提交
494

L
Liu Jicong 已提交
495
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
496
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
497
  }
L
Liu Jicong 已提交
498
  return 0;
L
Liu Jicong 已提交
499 500
}

501
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
502 503
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
504
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
505
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
506 507 508 509
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
510 511 512 513 514
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
515 516 517 518 519 520
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

521
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
522 523 524 525 526 527 528 529 530 531 532
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

533
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
534
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
535
  tDecodeSMqRebVgReq(msg, &req);
L
Liu Jicong 已提交
536

537 538 539
  SVnode* pVnode = pTq->pVnode;
  int32_t vgId = TD_VID(pVnode);

540
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
541
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
542

543
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
544
  if (pHandle == NULL) {
L
Liu Jicong 已提交
545
    if (req.oldConsumerId != -1) {
546
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
547
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
548
    }
549

L
Liu Jicong 已提交
550
    if (req.newConsumerId == -1) {
551
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
L
Liu Jicong 已提交
552
      taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
553 554
      return 0;
    }
555

L
Liu Jicong 已提交
556 557
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
558

H
Haojun Liao 已提交
559
    uint64_t oldConsumerId = pHandle->consumerId;
L
Liu Jicong 已提交
560 561 562
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
563

L
Liu Jicong 已提交
564
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
565
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
566

567
    // TODO version should be assigned and refed during preprocess
568
    SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
569
    if (pRef == NULL) {
H
Haojun Liao 已提交
570
      taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
571
      return -1;
572
    }
H
Haojun Liao 已提交
573

574 575
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
576

577
    SReadHandle handle = {
578
        .meta = pVnode->pMeta, .vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
wmmhello's avatar
wmmhello 已提交
579
    pHandle->snapshotVer = ver;
580

L
Liu Jicong 已提交
581
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
582
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
583
      req.qmsg = NULL;
584

X
Xiaoyu Wang 已提交
585 586
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId,
                                                          &pHandle->execHandle.numOfCols, req.newConsumerId);
L
Liu Jicong 已提交
587
      void* scanner = NULL;
588
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
589
      pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
L
Liu Jicong 已提交
590
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
591
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
592
      pHandle->execHandle.pTqReader = tqOpenReader(pVnode);
593

L
Liu Jicong 已提交
594
      pHandle->execHandle.execDb.pFilterOutTbUid =
L
Liu Jicong 已提交
595
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
596 597
      buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
598

599
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
600
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
601
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
602 603
      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
604
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
605 606
      vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
607 608
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
609
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
L
Liu Jicong 已提交
610
      }
611 612
      pHandle->execHandle.pTqReader = tqOpenReader(pVnode);
      tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
L
Liu Jicong 已提交
613
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
614

L
Liu Jicong 已提交
615 616
      buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
617
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
L
Liu Jicong 已提交
618
    }
H
Haojun Liao 已提交
619

620
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
621 622
    tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
            pHandle->consumerId, oldConsumerId);
L
Liu Jicong 已提交
623
    if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
H
Haojun Liao 已提交
624
      taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
625
      return -1;
L
Liu Jicong 已提交
626
    }
L
Liu Jicong 已提交
627
  } else {
628 629 630 631
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
      atomic_store_32(&pHandle->epoch, -1);
      atomic_add_fetch_32(&pHandle->epoch, 1);
H
Haojun Liao 已提交
632
      taosMemoryFree(req.qmsg);
633
      return tqMetaSaveHandle(pTq, req.subKey, pHandle);
634 635 636
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
637

638 639 640
      // kill executing task
      qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
      if (pTaskInfo != NULL) {
641
        qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
642 643
      }

644 645
      taosWLockLatch(&pTq->lock);
      atomic_store_32(&pHandle->epoch, -1);
646

647
      // remove if it has been register in the push manager, and return one empty block to consumer
648
      tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
649

650 651
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
      atomic_add_fetch_32(&pHandle->epoch, 1);
652

653
      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
654 655 656
        qStreamCloseTsdbReader(pTaskInfo);
      }

657 658 659 660 661
      taosWUnLockLatch(&pTq->lock);
      if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
        taosMemoryFree(req.qmsg);
        return -1;
      }
L
Liu Jicong 已提交
662
    }
L
Liu Jicong 已提交
663
  }
L
Liu Jicong 已提交
664

H
Haojun Liao 已提交
665
  taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
666
  return 0;
L
Liu Jicong 已提交
667
}
668

669
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
670
  int32_t vgId = TD_VID(pTq->pVnode);
671
  pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
L
Liu Jicong 已提交
672
  pTask->refCnt = 1;
673
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
L
Liu Jicong 已提交
674 675
  pTask->inputQueue = streamQueueOpen();
  pTask->outputQueue = streamQueueOpen();
L
Liu Jicong 已提交
676 677

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
678
    return -1;
L
Liu Jicong 已提交
679 680
  }

L
Liu Jicong 已提交
681 682
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
683
  pTask->pMsgCb = &pTq->pVnode->msgCb;
684
  pTask->pMeta = pTq->pStreamMeta;
685
  pTask->chkInfo.version = ver;
686

687
  // expand executor
688
  if (pTask->fillHistory) {
689
    pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
690
  } else {
691
    pTask->status.taskStatus = TASK_STATUS__RESTORE;
692 693
  }

694
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
695
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
696 697 698 699
    if (pTask->pState == NULL) {
      return -1;
    }

700
    SReadHandle handle = {
701
        .meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
702

703 704
    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
705 706
      return -1;
    }
707

708
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
709
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
710 711 712
    if (pTask->pState == NULL) {
      return -1;
    }
713

714 715 716 717 718
    int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo);
    SReadHandle mgHandle = { .vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};

    pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId);
    if (pTask->exec.pExecutor == NULL) {
L
Liu Jicong 已提交
719 720
      return -1;
    }
L
Liu Jicong 已提交
721
  }
L
Liu Jicong 已提交
722 723

  // sink
L
Liu Jicong 已提交
724
  /*pTask->ahandle = pTq->pVnode;*/
725
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
726
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
727
    pTask->smaSink.smaSink = smaHandleRes;
728
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
729
    pTask->tbSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
730
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
L
Liu Jicong 已提交
731

X
Xiaoyu Wang 已提交
732
    int32_t   ver1 = 1;
5
54liuyao 已提交
733
    SMetaInfo info = {0};
dengyihao's avatar
dengyihao 已提交
734
    int32_t   code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
5
54liuyao 已提交
735
    if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
736
      ver1 = info.skmVer;
5
54liuyao 已提交
737
    }
L
Liu Jicong 已提交
738

739 740
    SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
    pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
wmmhello's avatar
wmmhello 已提交
741
    if(pTask->tbSink.pTSchema == NULL) {
wmmhello's avatar
wmmhello 已提交
742
      return -1;
wmmhello's avatar
wmmhello 已提交
743
    }
L
Liu Jicong 已提交
744
  }
745

746
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
747
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
748 749
  }

750
  streamSetupTrigger(pTask);
751
  tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
752
         pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
753 754 755

  // next valid version will add one
  pTask->chkInfo.version += 1;
L
Liu Jicong 已提交
756
  return 0;
L
Liu Jicong 已提交
757
}
L
Liu Jicong 已提交
758

759 760 761 762 763 764
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
  char*               msgStr = pMsg->pCont;
  char*               msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t             msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamTaskCheckReq req;
  SDecoder            decoder;
X
Xiaoyu Wang 已提交
765
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
766 767 768 769 770 771 772 773 774 775 776 777
  tDecodeSStreamTaskCheckReq(&decoder, &req);
  tDecoderClear(&decoder);
  int32_t             taskId = req.downstreamTaskId;
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
778

L
Liu Jicong 已提交
779
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
780 781 782 783 784 785 786 787
  if (pTask) {
    rsp.status = (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL) ? 1 : 0;
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);

    tqDebug("tq recv task check req(reqId:0x%" PRIx64
            ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d",
            rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus, rsp.upstreamTaskId,
            rsp.upstreamNodeId, rsp.status);
788 789
  } else {
    rsp.status = 0;
790 791 792 793
    tqDebug("tq recv task check(taskId:%d not built yet) req(reqId:0x%" PRIx64
            ") %d at node %d, check req from task %d at node %d, rsp status %d",
            taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId,
            rsp.status);
794 795 796 797 798 799 800
  }

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
  tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
  if (code < 0) {
L
Liu Jicong 已提交
801
    tqError("unable to encode rsp %d", __LINE__);
L
Liu Jicong 已提交
802
    return -1;
803
  }
L
Liu Jicong 已提交
804

805 806 807 808 809 810 811 812
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
  tEncoderClear(&encoder);

813
  SRpcMsg rspMsg = { .code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info };
814 815 816 817
  tmsgSendRsp(&rspMsg);
  return 0;
}

818
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
819 820 821 822 823 824 825 826 827 828 829
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }

830
  tDecoderClear(&decoder);
831
  tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
832 833
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

L
Liu Jicong 已提交
834
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
835 836 837 838
  if (pTask == NULL) {
    return -1;
  }

839
  code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
L
Liu Jicong 已提交
840 841
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
842 843
}

844
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
845 846 847 848 849
  int32_t code;
#if 0
  code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
  if (code < 0) return code;
#endif
5
54liuyao 已提交
850 851 852
  if (tsDisableStream) {
    return 0;
  }
853 854 855 856 857 858

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
859

860 861
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
862
  code = tDecodeStreamTask(&decoder, pTask);
863 864 865 866 867
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
868

869 870
  tDecoderClear(&decoder);

871
  // 2.save task, use the newest commit version as the initial start version of stream task.
872
  code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
873
  if (code < 0) {
874 875
    tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr,
            streamMetaGetNumOfTasks(pTq->pStreamMeta));
876 877 878 879 880
    return -1;
  }

  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
881
    streamTaskCheckDownstream(pTask, sversion);
882 883
  }

884 885
  tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", TD_VID(pTq->pVnode),
          pTask->id.idStr, pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta));
886 887 888
  return 0;
}

L
Liu Jicong 已提交
889 890 891 892 893
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
  int32_t code;
  char*   msg = pMsg->pCont;
  int32_t msgLen = pMsg->contLen;

894
  SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
L
Liu Jicong 已提交
895
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
896 897 898 899 900
  if (pTask == NULL) {
    return -1;
  }

  // check param
901
  int64_t fillVer1 = pTask->chkInfo.version;
902
  if (fillVer1 <= 0) {
L
Liu Jicong 已提交
903
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
904 905 906 907 908 909
    return -1;
  }

  // do recovery step 1
  streamSourceRecoverScanStep1(pTask);

910
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
911 912 913 914
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

915 916 917 918
  // build msg to launch next step
  SStreamRecoverStep2Req req;
  code = streamBuildSourceRecover2Req(pTask, &req);
  if (code < 0) {
L
Liu Jicong 已提交
919
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
920 921 922
    return -1;
  }

L
Liu Jicong 已提交
923
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
924

925
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
926 927 928
    return 0;
  }

929
  // serialize msg
L
Liu Jicong 已提交
930 931 932 933 934 935 936 937
  int32_t len = sizeof(SStreamRecoverStep1Req);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);
938 939 940 941 942

  // dispatch msg
  SRpcMsg rpcMsg = {
      .code = 0,
      .contLen = len,
L
Liu Jicong 已提交
943
      .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE,
L
Liu Jicong 已提交
944
      .pCont = serializedReq,
945 946 947 948 949 950 951
  };

  tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);

  return 0;
}

952
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
953 954
  int32_t                 code;
  SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
L
Liu Jicong 已提交
955
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
956 957 958 959 960
  if (pTask == NULL) {
    return -1;
  }

  // do recovery step 2
961
  code = streamSourceRecoverScanStep2(pTask, sversion);
962
  if (code < 0) {
L
Liu Jicong 已提交
963
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
964 965 966
    return -1;
  }

967
  if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
L
Liu Jicong 已提交
968 969 970 971
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

972 973 974
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
975
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
976 977 978 979 980 981
    return -1;
  }

  // set status normal
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
982
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
983 984 985 986 987 988
    return -1;
  }

  // dispatch recover finish req to all related downstream task
  code = streamDispatchRecoverFinishReq(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
989
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
990 991 992
    return -1;
  }

L
Liu Jicong 已提交
993 994 995
  atomic_store_8(&pTask->fillHistory, 0);
  streamMetaSaveTask(pTq->pStreamMeta, pTask);

L
Liu Jicong 已提交
996 997
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);

998 999 1000
  return 0;
}

L
Liu Jicong 已提交
1001 1002 1003
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1004 1005

  // deserialize
1006 1007 1008
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1009
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1010 1011 1012
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

1013
  // find task
L
Liu Jicong 已提交
1014
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1015 1016 1017
  if (pTask == NULL) {
    return -1;
  }
1018
  // do process request
1019
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
1020
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1021 1022 1023
    return -1;
  }

L
Liu Jicong 已提交
1024
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1025
  return 0;
L
Liu Jicong 已提交
1026
}
L
Liu Jicong 已提交
1027

L
Liu Jicong 已提交
1028 1029 1030 1031 1032
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

L
Liu Jicong 已提交
1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
  bool        failed = false;
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    failed = true;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t sz = taosArrayGetSize(pRes->uidList);
L
Liu Jicong 已提交
1049
  if (sz == 0 || pRes->affectedRows == 0) {
L
Liu Jicong 已提交
1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060
    taosArrayDestroy(pRes->uidList);
    return 0;
  }
  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, sz);
  pDelBlock->info.rows = sz;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < sz; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
1061
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
L
Liu Jicong 已提交
1062
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
1063
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
L
Liu Jicong 已提交
1064 1065 1066
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
1067
    colDataSetVal(pUidCol, i, (const char*)pUid, false);
L
Liu Jicong 已提交
1068

1069 1070 1071
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
L
Liu Jicong 已提交
1072 1073
  }

L
Liu Jicong 已提交
1074 1075
  taosArrayDestroy(pRes->uidList);

L
Liu Jicong 已提交
1076 1077 1078
  int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
  *pRef = 1;

L
Liu Jicong 已提交
1079 1080 1081 1082 1083 1084 1085
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
    if (pIter == NULL) break;
    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;

1086
    qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->id.taskId, ver);
L
Liu Jicong 已提交
1087

L
Liu Jicong 已提交
1088
    if (!failed) {
S
Shengliang Guan 已提交
1089
      SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1090 1091 1092 1093 1094
      pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
      pRefBlock->pBlock = pDelBlock;
      pRefBlock->dataRef = pRef;
      atomic_add_fetch_32(pRefBlock->dataRef, 1);

1095
      if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
1096
        qError("stream task input del failed, task id %d", pTask->id.taskId);
L
Liu Jicong 已提交
1097

L
Liu Jicong 已提交
1098
        atomic_sub_fetch_32(pRef, 1);
L
Liu Jicong 已提交
1099
        taosFreeQitem(pRefBlock);
L
Liu Jicong 已提交
1100 1101
        continue;
      }
L
Liu Jicong 已提交
1102

L
Liu Jicong 已提交
1103
      if (streamSchedExec(pTask) < 0) {
1104
        qError("stream task launch failed, task id %d", pTask->id.taskId);
L
Liu Jicong 已提交
1105 1106
        continue;
      }
L
Liu Jicong 已提交
1107

L
Liu Jicong 已提交
1108 1109 1110 1111
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1112

L
Liu Jicong 已提交
1113
  int32_t ref = atomic_sub_fetch_32(pRef, 1);
L
Liu Jicong 已提交
1114
  /*A(ref >= 0);*/
L
Liu Jicong 已提交
1115
  if (ref == 0) {
L
Liu Jicong 已提交
1116
    blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1117 1118 1119 1120
    taosMemoryFree(pRef);
  }

#if 0
S
Shengliang Guan 已提交
1121
    SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1122 1123 1124 1125 1126 1127 1128 1129
    pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
    pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
    SSDataBlock block = {0};
    assignOneDataBlock(&block, pDelBlock);
    block.info.type = STREAM_DELETE_DATA;
    taosArrayPush(pStreamBlock->blocks, &block);

    if (!failed) {
1130
      if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
1131
        qError("stream task input del failed, task id %d", pTask->id.taskId);
L
Liu Jicong 已提交
1132 1133 1134 1135
        continue;
      }

      if (streamSchedExec(pTask) < 0) {
1136
        qError("stream task launch failed, task id %d", pTask->id.taskId);
L
Liu Jicong 已提交
1137 1138 1139 1140 1141 1142
        continue;
      }
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1143
  blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1144
#endif
L
Liu Jicong 已提交
1145 1146 1147 1148

  return 0;
}

1149 1150 1151
static int32_t addSubmitBlockNLaunchTask(STqOffsetStore* pOffsetStore, SStreamTask* pTask, SStreamDataSubmit2* pSubmit,
                                         const char* key, int64_t ver) {
  doSaveTaskOffset(pOffsetStore, key, ver);
1152
  int32_t code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pSubmit, ver);
1153 1154 1155 1156 1157

  // remove the offset, if all functions are completed successfully.
  if (code == TSDB_CODE_SUCCESS) {
    tqOffsetDelete(pOffsetStore, key);
  }
1158 1159

  return code;
1160 1161
}

L
Liu Jicong 已提交
1162
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
1163
#if 0
1164
  void* pIter = NULL;
1165
  SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit, STREAM_INPUT__DATA_SUBMIT);
L
Liu Jicong 已提交
1166
  if (pSubmit == NULL) {
L
Liu Jicong 已提交
1167
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1168
    tqError("failed to create data submit for stream since out of memory");
1169
    saveOffsetForAllTasks(pTq, submit.ver);
1170
    return -1;
L
Liu Jicong 已提交
1171 1172
  }

1173 1174
  SArray* pInputQueueFullTasks = taosArrayInit(4, POINTER_BYTES);

L
Liu Jicong 已提交
1175
  while (1) {
L
Liu Jicong 已提交
1176
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
1177 1178 1179 1180
    if (pIter == NULL) {
      break;
    }

1181
    SStreamTask* pTask = *(SStreamTask**)pIter;
1182 1183 1184 1185
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
      continue;
    }

1186
    if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
1187
      tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId,
1188
              pTask->status.taskStatus);
L
Liu Jicong 已提交
1189 1190
      continue;
    }
L
Liu Jicong 已提交
1191

1192 1193 1194
    // check if offset value exists
    char key[128] = {0};
    createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
1195

1196 1197 1198 1199 1200 1201 1202 1203
    if (tInputQueueIsFull(pTask)) {
      STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);

      int64_t ver = submit.ver;
      if (pOffset == NULL) {
        doSaveTaskOffset(pTq->pOffsetStore, key, submit.ver);
      } else {
        ver = pOffset->val.version;
L
Liu Jicong 已提交
1204 1205
      }

1206 1207
      tqDebug("s-task:%s input queue is full, discard submit block, ver:%" PRId64, pTask->id.idStr, ver);
      taosArrayPush(pInputQueueFullTasks, &pTask);
1208 1209 1210 1211 1212
      continue;
    }

    // check if offset value exists
    STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
1213
    ASSERT(pOffset == NULL);
1214

1215
    addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver);
L
Liu Jicong 已提交
1216 1217
  }

1218 1219
  streamDataSubmitDestroy(pSubmit);
  taosFreeQitem(pSubmit);
1220
#endif
L
Liu Jicong 已提交
1221

1222
  tqStartStreamTasks(pTq);
1223
  return 0;
L
Liu Jicong 已提交
1224 1225
}

L
Liu Jicong 已提交
1226 1227
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
1228 1229 1230 1231

  int32_t taskId = pReq->taskId;
  int32_t vgId = TD_VID(pTq->pVnode);

1232 1233
  if (taskId == WAL_READ_TASKS_ID) {  // all tasks are extracted submit data from the wal
    tqStreamTasksScanWal(pTq);
L
Liu Jicong 已提交
1234
    return 0;
1235
  }
1236

1237
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1238 1239 1240 1241 1242 1243 1244 1245
  if (pTask != NULL) {
    if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
      tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr);
      streamProcessRunReq(pTask);
    } else if (pTask->status.taskStatus == TASK_STATUS__RESTORE) {
      tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId,
              pTask->id.idStr, pTask->chkInfo.version);
      streamProcessRunReq(pTask);
1246
    } else {
1247
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
1248
    }
1249 1250 1251 1252 1253 1254 1255

    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    tqStartStreamTasks(pTq);
    return 0;
  } else {
    tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
    return -1;
L
Liu Jicong 已提交
1256
  }
L
Liu Jicong 已提交
1257 1258
}

L
Liu Jicong 已提交
1259
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
1260 1261 1262 1263 1264
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamDispatchReq req;
  SDecoder           decoder;
L
Liu Jicong 已提交
1265
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1266
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1267

1268
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
L
Liu Jicong 已提交
1269
  if (pTask) {
1270
    SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
L
Liu Jicong 已提交
1271
    streamProcessDispatchReq(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1272
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1273
    return 0;
1274 1275
  } else {
    return -1;
L
Liu Jicong 已提交
1276
  }
L
Liu Jicong 已提交
1277 1278
}

L
Liu Jicong 已提交
1279 1280
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1281
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1282
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1283
  tqDebug("recv dispatch rsp, code:%x", pMsg->code);
L
Liu Jicong 已提交
1284
  if (pTask) {
1285
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1286
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1287
    return 0;
1288 1289
  } else {
    return -1;
L
Liu Jicong 已提交
1290
  }
L
Liu Jicong 已提交
1291
}
L
Liu Jicong 已提交
1292

1293
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1294
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1295
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1296
  return 0;
L
Liu Jicong 已提交
1297
}
L
Liu Jicong 已提交
1298 1299 1300 1301 1302 1303 1304

int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1305
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1306
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1307
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1308
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1309
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1310
  if (pTask) {
1311
    SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
L
Liu Jicong 已提交
1312
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1313
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1314
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1315
    return 0;
L
Liu Jicong 已提交
1316 1317
  } else {
    return -1;
L
Liu Jicong 已提交
1318 1319 1320 1321 1322 1323 1324
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1325

1326 1327 1328 1329 1330 1331
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1332 1333 1334

  SStreamDispatchReq req;
  SDecoder           decoder;
1335
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1336 1337
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1338
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1339 1340
    goto FAIL;
  }
L
Liu Jicong 已提交
1341
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1342

L
Liu Jicong 已提交
1343
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1344

L
Liu Jicong 已提交
1345
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1346
  if (pTask) {
1347
    SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
L
Liu Jicong 已提交
1348
    streamProcessDispatchReq(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1349
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1350 1351
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1352
    return 0;
5
54liuyao 已提交
1353 1354
  } else {
    tDeleteStreamDispatchReq(&req);
L
Liu Jicong 已提交
1355
  }
L
Liu Jicong 已提交
1356

1357 1358
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1359
FAIL:
1360 1361 1362 1363
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
1364
    SRpcMsg rsp = { .code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info };
1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1381
  SRpcMsg rsp = {
1382
      .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
1383
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1384
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1385 1386
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1387
  return -1;
L
Liu Jicong 已提交
1388
}
L
Liu Jicong 已提交
1389

1390
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
1391

1392
int32_t tqStartStreamTasks(STQ* pTq) {
1393 1394
  int32_t vgId = TD_VID(pTq->pVnode);

1395 1396
  SStreamMeta* pMeta = pTq->pStreamMeta;
  taosWLockLatch(&pMeta->lock);
1397 1398 1399 1400 1401 1402 1403
  int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks);
  if (numOfTasks == 0) {
    tqInfo("vgId:%d no stream tasks exists", vgId);
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1404 1405 1406 1407 1408 1409 1410 1411
  pMeta->walScan += 1;

  if (pMeta->walScan > 1) {
    tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScan);
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
    return 0;
  }

1412 1413 1414 1415
  SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
  if (pRunReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tqError("vgId:%d failed restore stream tasks, code:%s", vgId, terrstr(terrno));
1416
    taosWUnLockLatch(&pTq->pStreamMeta->lock);
1417 1418 1419
    return -1;
  }

1420
  tqInfo("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks);
1421 1422
  initOffsetForAllRestoreTasks(pTq);

1423 1424
  pRunReq->head.vgId = vgId;
  pRunReq->streamId = 0;
1425
  pRunReq->taskId = WAL_READ_TASKS_ID;
1426 1427 1428

  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
  tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
1429
  taosWUnLockLatch(&pTq->pStreamMeta->lock);
1430 1431 1432

  return 0;
}