tq.c 50.3 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

L
Liu Jicong 已提交
18
int32_t tqInit() {
L
Liu Jicong 已提交
19 20 21 22 23 24
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

25 26 27 28 29 30
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
31 32 33
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
34
    atomic_store_8(&tqMgmt.inited, 1);
35
  }
36

L
Liu Jicong 已提交
37 38
  return 0;
}
L
Liu Jicong 已提交
39

40
void tqCleanUp() {
L
Liu Jicong 已提交
41 42 43 44 45 46 47 48
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
49
    streamCleanUp();
L
Liu Jicong 已提交
50 51
    atomic_store_8(&tqMgmt.inited, 0);
  }
52
}
L
Liu Jicong 已提交
53

54
static void destroyTqHandle(void* data) {
55 56 57
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
58
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
59 60 61 62
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
    tqCloseReader(pData->execHandle.pExecReader);
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
63
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
64 65 66 67 68
    walCloseReader(pData->pWalReader);
    tqCloseReader(pData->execHandle.pExecReader);
  }
}

L
Liu Jicong 已提交
69
static void tqPushEntryFree(void* data) {
L
Liu Jicong 已提交
70
  STqPushEntry* p = *(void**)data;
H
Haojun Liao 已提交
71 72 73 74 75 76 77
  if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
    tDeleteSMqDataRsp(p->pDataRsp);
  } else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) {
    tDeleteSTaosxRsp((STaosxRsp*)p->pDataRsp);
  }

  taosMemoryFree(p->pDataRsp);
L
Liu Jicong 已提交
78 79 80
  taosMemoryFree(p);
}

L
Liu Jicong 已提交
81
STQ* tqOpen(const char* path, SVnode* pVnode) {
82
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
83
  if (pTq == NULL) {
S
Shengliang Guan 已提交
84
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
85 86
    return NULL;
  }
87
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
88
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
89
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
90

91
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
92
  taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
93

94
  taosInitRWLatch(&pTq->lock);
L
Liu Jicong 已提交
95
  pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
L
Liu Jicong 已提交
96
  taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree);
L
Liu Jicong 已提交
97

98
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
99
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
100

L
Liu Jicong 已提交
101
  if (tqMetaOpen(pTq) < 0) {
L
Liu Jicong 已提交
102
    return NULL;
103 104
  }

L
Liu Jicong 已提交
105 106
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
L
Liu Jicong 已提交
107
    return NULL;
108 109
  }

110
  pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
111
  if (pTq->pStreamMeta == NULL) {
L
Liu Jicong 已提交
112
    return NULL;
L
Liu Jicong 已提交
113 114
  }

L
Liu Jicong 已提交
115
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pVnode->pWal)) < 0) {
L
Liu Jicong 已提交
116
    return NULL;
L
Liu Jicong 已提交
117 118
  }

L
Liu Jicong 已提交
119 120
  return pTq;
}
L
Liu Jicong 已提交
121

L
Liu Jicong 已提交
122
void tqClose(STQ* pTq) {
123 124
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
125
  }
126 127 128 129 130 131 132 133 134

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
135
}
L
Liu Jicong 已提交
136

L
Liu Jicong 已提交
137
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
138 139 140 141 142 143 144
  int32_t len = 0;
  int32_t code = 0;
  tEncodeSize(tEncodeSMqMetaRsp, pRsp, len, code);
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
L
Liu Jicong 已提交
145 146 147 148 149 150 151 152 153 154
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_META_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
155 156 157 158 159

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
  tEncodeSMqMetaRsp(&encoder, pRsp);
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
160 161 162 163 164 165 166 167 168

  SRpcMsg resp = {
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
  tmsgSendRsp(&resp);

169
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d",
170
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
L
Liu Jicong 已提交
171 172 173 174

  return 0;
}

H
Haojun Liao 已提交
175 176
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
177 178
  int32_t len = 0;
  int32_t code = 0;
H
Haojun Liao 已提交
179 180 181 182 183 184

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
    tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
185 186 187 188 189 190 191 192 193 194 195

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

H
Haojun Liao 已提交
196 197 198
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
199 200 201 202 203

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
H
Haojun Liao 已提交
204 205 206 207 208 209 210

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
    tEncodeSMqDataRsp(&encoder, pRsp);
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*) pRsp);
  }

L
Liu Jicong 已提交
211 212 213
  tEncoderClear(&encoder);

  SRpcMsg rsp = {
H
Haojun Liao 已提交
214
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
215 216 217 218 219 220 221 222 223
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };

  tmsgSendRsp(&rsp);
  return 0;
}

H
Haojun Liao 已提交
224 225 226
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
  SMqDataRsp* pRsp = pPushEntry->pDataRsp;

L
Liu Jicong 已提交
227 228 229
#if 0
  A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
  A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
L
Liu Jicong 已提交
230

L
Liu Jicong 已提交
231 232
  A(!pRsp->withSchema);
  A(taosArrayGetSize(pRsp->blockSchema) == 0);
L
Liu Jicong 已提交
233

234
  if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
H
Haojun Liao 已提交
235
    A(pRsp->rspOffset.version > pRsp->reqOffset.version);
L
Liu Jicong 已提交
236
  }
L
Liu Jicong 已提交
237
#endif
L
Liu Jicong 已提交
238

H
Haojun Liao 已提交
239 240
  SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
  doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType);
L
Liu Jicong 已提交
241

wmmhello's avatar
wmmhello 已提交
242 243
  char buf1[80] = {0};
  char buf2[80] = {0};
H
Haojun Liao 已提交
244 245 246 247
  tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
          TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
L
Liu Jicong 已提交
248 249 250
  return 0;
}

H
Haojun Liao 已提交
251
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type) {
L
Liu Jicong 已提交
252 253 254
#if 0
  A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
  A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
255

H
Haojun Liao 已提交
256 257
  A(!pRsp->withSchema);
  A(taosArrayGetSize(pRsp->blockSchema) == 0);
258 259 260

  if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
    if (pRsp->blockNum > 0) {
L
Liu Jicong 已提交
261
      A(pRsp->rspOffset.version > pRsp->reqOffset.version);
262
    } else {
L
Liu Jicong 已提交
263
      A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
264 265
    }
  }
L
Liu Jicong 已提交
266
#endif
H
Haojun Liao 已提交
267
  doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type);
268 269 270 271 272 273

  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);

H
Haojun Liao 已提交
274 275
  tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%"PRIx64,
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
H
Haojun Liao 已提交
276

277 278 279
  return 0;
}

280 281 282 283 284
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

285
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
286
  STqOffset offset = {0};
H
Haojun Liao 已提交
287
  int32_t vgId = TD_VID(pTq->pVnode);
288

X
Xiaoyu Wang 已提交
289 290
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
291 292 293
  if (tDecodeSTqOffset(&decoder, &offset) < 0) {
    return -1;
  }
294

295 296
  tDecoderClear(&decoder);

wmmhello's avatar
wmmhello 已提交
297
  if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
298
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
H
Haojun Liao 已提交
299
            offset.subKey, vgId, offset.val.uid, offset.val.ts);
L
Liu Jicong 已提交
300
  } else if (offset.val.type == TMQ_OFFSET__LOG) {
S
Shengliang Guan 已提交
301
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
H
Haojun Liao 已提交
302
            vgId, offset.val.version);
303
    if (offset.val.version + 1 == sversion) {
304 305
      offset.val.version += 1;
    }
306
  } else {
307 308
    tqError("invalid commit offset type:%d", offset.val.type);
    return -1;
309
  }
310 311 312 313

  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(&offset, pSavedOffset)) {
    return 0;  // no need to update the offset value
314 315
  }

316
  // save the new offset value
317 318
  if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
    return -1;
319
  }
320 321

  if (offset.val.type == TMQ_OFFSET__LOG) {
322
    STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
323 324
    if (pHandle && (walRefVer(pHandle->pRef, offset.val.version) < 0)) {
      return -1;
325 326 327
    }
  }

328 329 330
  return 0;
}

L
Liu Jicong 已提交
331
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
332
  void* pIter = NULL;
333

L
Liu Jicong 已提交
334
  while (1) {
335
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
336 337 338 339
    if (pIter == NULL) {
      break;
    }

340
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
341

L
Liu Jicong 已提交
342 343
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
344
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
345 346
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
347
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
348 349 350 351 352
          return -1;
        }
      }
    }
  }
353

L
Liu Jicong 已提交
354 355 356
  return 0;
}

L
Liu Jicong 已提交
357 358 359 360 361 362 363 364 365 366
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL) {
    return -1;
  }

L
Liu Jicong 已提交
367 368
  pRsp->withTbName = 0;
#if 0
L
Liu Jicong 已提交
369 370 371 372 373 374 375 376
  pRsp->withTbName = pReq->withTbName;
  if (pRsp->withTbName) {
    pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
    if (pRsp->blockTbName == NULL) {
      // TODO free
      return -1;
    }
  }
L
Liu Jicong 已提交
377
#endif
L
Liu Jicong 已提交
378

L
Liu Jicong 已提交
379
  pRsp->withSchema = false;
L
Liu Jicong 已提交
380 381 382
  return 0;
}

383 384 385 386 387 388 389 390 391 392 393 394 395
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->withTbName = 1;
  pRsp->withSchema = 1;
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
    return -1;
  }
396

397 398 399
  return 0;
}

400 401 402 403 404
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
  uint64_t     consumerId = pRequest->consumerId;
  STqOffsetVal reqOffset = pRequest->reqOffset;
  STqOffset*   pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
H
Haojun Liao 已提交
405 406
  int32_t      vgId = TD_VID(pTq->pVnode);

407 408 409 410 411 412 413 414
  *pBlockReturned = false;

  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
  if (pOffset != NULL) {
    *pOffsetVal = pOffset->val;

    char formatBuf[80];
    tFormatOffset(formatBuf, 80, pOffsetVal);
415
    tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.",
H
Haojun Liao 已提交
416
            consumerId, pHandle->subKey, vgId, formatBuf);
417 418 419 420 421
    return 0;
  } else {
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
    if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
      if (pRequest->useSnapshot) {
H
Haojun Liao 已提交
422 423 424
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
                consumerId, pHandle->subKey, vgId);

425 426 427 428 429 430 431 432 433 434 435
        if (pHandle->fetchMeta) {
          tqOffsetResetToMeta(pOffsetVal, 0);
        } else {
          tqOffsetResetToData(pOffsetVal, 0, 0);
        }
      } else {
        pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
        if (pHandle->pRef == NULL) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return -1;
        }
D
dapan1121 已提交
436

437 438 439 440 441 442 443 444
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer - 1);
      }
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
        SMqDataRsp dataRsp = {0};
        tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);

        tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
H
Haojun Liao 已提交
445 446
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
                pHandle->subKey, vgId, dataRsp.rspOffset.version);
H
Haojun Liao 已提交
447
        int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
448 449 450 451 452 453 454 455
        tDeleteSMqDataRsp(&dataRsp);

        *pBlockReturned = true;
        return code;
      } else {
        STaosxRsp taosxRsp = {0};
        tqInitTaosxRsp(&taosxRsp, pRequest);
        tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
H
Haojun Liao 已提交
456
        int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
457
        tDeleteSTaosxRsp(&taosxRsp);
L
Liu Jicong 已提交
458

459 460 461 462
        *pBlockReturned = true;
        return code;
      }
    } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
H
Haojun Liao 已提交
463 464
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed",
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
465 466 467
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return -1;
    }
L
Liu Jicong 已提交
468 469
  }

470 471
  return 0;
}
L
Liu Jicong 已提交
472

473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511
#define IS_OFFSET_RESET_TYPE(_t)  ((_t) < 0)

static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
  int32_t  code = 0;
  uint64_t consumerId = pRequest->consumerId;
  int32_t  vgId = TD_VID(pTq->pVnode);

  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);

  // lock
  taosWLockLatch(&pTq->lock);

  qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset);

  // till now, all data has been transferred to consumer, new data needs to push client once arrived.
  if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
      dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
    code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
    taosWUnLockLatch(&pTq->lock);
    return code;
  }

  taosWUnLockLatch(&pTq->lock);
  code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);

  // NOTE: this pHandle->consumerId may have been changed already.
  tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
          ", ts:%" PRId64 ", reqId:0x%" PRIx64,
          consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
          dataRsp.rspOffset.ts, pRequest->reqId);

  tDeleteSMqDataRsp(&dataRsp);
  return code;
}

static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
512
  int32_t      code = -1;
513
  STqOffsetVal offset = {0};
514
  SWalCkHead*  pCkHead = NULL;
515 516 517
  int32_t      vgId = TD_VID(pTq->pVnode);

  STqOffsetVal reqOffset = pRequest->reqOffset;
518
  uint64_t     consumerId = pRequest->consumerId;
L
Liu Jicong 已提交
519

520
  // 1. reset the offset if needed
521 522
  if (IS_OFFSET_RESET_TYPE(reqOffset.type)) {
    // handle the reset offset cases, according to the consumer's choice.
523
    bool blockReturned = false;
524
    code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
525 526 527 528 529 530 531
    if (code != 0) {
      return code;
    }

    // empty block returned, quit
    if (blockReturned) {
      return 0;
L
Liu Jicong 已提交
532
    }
533
  } else { // use the consumer specified offset
534
    // the offset value can not be monotonious increase??
535
    offset = reqOffset;
L
Liu Jicong 已提交
536 537
  }

538
  // this is a normal subscribe requirement
539
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
540
    return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
541 542 543 544 545 546 547 548 549 550
  } else { // for taosX
    // todo handle the case where re-balance occurs.
    SMqMetaRsp metaRsp = {0};
    STaosxRsp  taosxRsp = {0};
    tqInitTaosxRsp(&taosxRsp, pRequest);

    if (offset.type != TMQ_OFFSET__LOG) {
      if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) {
        return -1;
      }
wmmhello's avatar
wmmhello 已提交
551

552 553 554 555 556 557 558 559
      if (metaRsp.metaRspLen > 0) {
        code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
        tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
                ",ts:%" PRId64,
                consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts);
        taosMemoryFree(metaRsp.metaRsp);
        tDeleteSTaosxRsp(&taosxRsp);
        return code;
wmmhello's avatar
wmmhello 已提交
560 561
      }

562
      if (taosxRsp.blockNum > 0) {
563 564 565
        code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
        tDeleteSTaosxRsp(&taosxRsp);
        return code;
566 567
      } else {
        offset = taosxRsp.rspOffset;
wmmhello's avatar
wmmhello 已提交
568 569
      }

570 571 572 573
      tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
              ",version:%" PRId64,
              consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
              taosxRsp.rspOffset.version);
H
Haojun Liao 已提交
574
    }
wmmhello's avatar
wmmhello 已提交
575

H
Haojun Liao 已提交
576
    if (offset.type == TMQ_OFFSET__LOG) {
577 578 579 580 581 582 583
      int64_t fetchVer = offset.version + 1;
      pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
      if (pCkHead == NULL) {
        tDeleteSTaosxRsp(&taosxRsp);
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return -1;
      }
wmmhello's avatar
wmmhello 已提交
584

585
      walSetReaderCapacity(pHandle->pWalReader, 2048);
H
Haojun Liao 已提交
586

587 588 589 590 591 592 593 594
      while (1) {
        // todo refactor: this is not correct.
        int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
        if (savedEpoch > pRequest->epoch) {
          tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
                 ", found new consumer epoch %d, discard req epoch %d",
                 consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
          break;
wmmhello's avatar
wmmhello 已提交
595
        }
H
Haojun Liao 已提交
596

597
        if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
598
          tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
H
Haojun Liao 已提交
599
          code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
600
          tDeleteSTaosxRsp(&taosxRsp);
L
Liu Jicong 已提交
601
          taosMemoryFreeClear(pCkHead);
602
          return code;
wmmhello's avatar
wmmhello 已提交
603 604
        }

605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646
        SWalCont* pHead = &pCkHead->head;
        tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d",
                consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);

        if (pHead->msgType == TDMT_VND_SUBMIT) {
          SPackedData submit = {
              .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
              .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
              .ver = pHead->version,
          };

          if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
            tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId,
                    pRequest->subKey);
            return -1;
          }

          if (taosxRsp.blockNum > 0) {
            tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
            code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
            tDeleteSTaosxRsp(&taosxRsp);
            taosMemoryFreeClear(pCkHead);
            return code;
          } else {
            fetchVer++;
          }

        } else {
          /*A(pHandle->fetchMeta);*/
          /*A(IS_META_MSG(pHead->msgType));*/
          tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
          tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
          metaRsp.resMsgType = pHead->msgType;
          metaRsp.metaRspLen = pHead->bodyLen;
          metaRsp.metaRsp = pHead->body;
          if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) {
            code = -1;
            taosMemoryFreeClear(pCkHead);
            tDeleteSTaosxRsp(&taosxRsp);
            return code;
          }
          code = 0;
L
Liu Jicong 已提交
647
          taosMemoryFreeClear(pCkHead);
648
          tDeleteSTaosxRsp(&taosxRsp);
649
          return code;
wmmhello's avatar
wmmhello 已提交
650 651 652
        }
      }
    }
653

654 655 656 657
    tDeleteSTaosxRsp(&taosxRsp);
    taosMemoryFreeClear(pCkHead);
    return 0;
  }
L
Liu Jicong 已提交
658 659
}

660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq   req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

681
  // 2. check re-balance status
682
  taosRLockLatch(&pTq->lock);
683 684 685 686
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
687
    taosRUnLockLatch(&pTq->lock);
688 689
    return -1;
  }
690
  taosRUnLockLatch(&pTq->lock);
691

692
  // 3. update the epoch value
693
  taosWLockLatch(&pTq->lock);
H
Haojun Liao 已提交
694 695
  int32_t savedEpoch = pHandle->epoch;
  if (savedEpoch < reqEpoch) {
696 697
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
    pHandle->epoch = reqEpoch;
H
Haojun Liao 已提交
698
  }
699
  taosWUnLockLatch(&pTq->lock);
700 701 702

  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
H
Haojun Liao 已提交
703 704
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
705

706
  return doPollDataForMq(pTq, pHandle, &req, pMsg);
707 708
}

709
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
710
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
L
Liu Jicong 已提交
711

L
Liu Jicong 已提交
712
  tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
L
Liu Jicong 已提交
713

714
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
715 716 717 718
  int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
  if (code != 0) {
    tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
  }
719
  taosWUnLockLatch(&pTq->lock);
L
Liu Jicong 已提交
720

L
Liu Jicong 已提交
721 722
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
X
Xiaoyu Wang 已提交
723
    // walCloseRef(pHandle->pWalReader->pWal, pHandle->pRef->refId);
L
Liu Jicong 已提交
724 725 726 727 728 729 730
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
731
  }
732

L
Liu Jicong 已提交
733 734
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
735
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
736
  }
L
Liu Jicong 已提交
737

L
Liu Jicong 已提交
738
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
739
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
740
  }
L
Liu Jicong 已提交
741
  return 0;
L
Liu Jicong 已提交
742 743
}

744
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
745 746
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
747
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
748
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
749 750 751 752
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
753 754 755 756 757
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
758 759 760 761 762 763
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

764
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
765 766 767 768 769 770 771 772 773 774 775
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

776
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
777
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
778
  tDecodeSMqRebVgReq(msg, &req);
L
Liu Jicong 已提交
779

780 781 782
  SVnode* pVnode = pTq->pVnode;
  int32_t vgId = TD_VID(pVnode);

783
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
784
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
785

786
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
787
  if (pHandle == NULL) {
L
Liu Jicong 已提交
788
    if (req.oldConsumerId != -1) {
789
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
790
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
791
    }
792

L
Liu Jicong 已提交
793
    if (req.newConsumerId == -1) {
794
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
L
Liu Jicong 已提交
795
      taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
796 797
      return 0;
    }
798

L
Liu Jicong 已提交
799 800
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
801 802
    /*taosInitRWLatch(&pExec->lock);*/

H
Haojun Liao 已提交
803
    uint64_t oldConsumerId = pHandle->consumerId;
L
Liu Jicong 已提交
804 805 806
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
807

L
Liu Jicong 已提交
808
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
809
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
810

811
    // TODO version should be assigned and refed during preprocess
812
    SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
813
    if (pRef == NULL) {
H
Haojun Liao 已提交
814
      taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
815
      return -1;
816
    }
H
Haojun Liao 已提交
817

818 819
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
820

821
    SReadHandle handle = {
822 823
        .meta = pVnode->pMeta,
        .vnode = pVnode,
824 825 826 827
        .initTableReader = true,
        .initTqReader = true,
        .version = ver,
    };
828

wmmhello's avatar
wmmhello 已提交
829
    pHandle->snapshotVer = ver;
830

L
Liu Jicong 已提交
831
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
832
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
833
      req.qmsg = NULL;
834 835

      pHandle->execHandle.task =
836
          qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, NULL);
L
Liu Jicong 已提交
837
      void* scanner = NULL;
838
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
L
Liu Jicong 已提交
839
      pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
L
Liu Jicong 已提交
840
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
841 842
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
      pHandle->execHandle.pExecReader = tqOpenReader(pVnode);
843

L
Liu Jicong 已提交
844
      pHandle->execHandle.execDb.pFilterOutTbUid =
L
Liu Jicong 已提交
845
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
846 847
      buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
848

849
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
L
Liu Jicong 已提交
850
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
851
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
852 853
      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
854
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
855 856
      vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
857 858
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
859
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
L
Liu Jicong 已提交
860
      }
861
      pHandle->execHandle.pExecReader = tqOpenReader(pVnode);
L
Liu Jicong 已提交
862
      tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
L
Liu Jicong 已提交
863
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
864

L
Liu Jicong 已提交
865 866
      buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
867
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
L
Liu Jicong 已提交
868
    }
H
Haojun Liao 已提交
869

870
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
871 872
    tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
            pHandle->consumerId, oldConsumerId);
L
Liu Jicong 已提交
873
    if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
H
Haojun Liao 已提交
874
      taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
875
      return -1;
L
Liu Jicong 已提交
876
    }
L
Liu Jicong 已提交
877
  } else {
878 879 880 881
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
      atomic_store_32(&pHandle->epoch, -1);
      atomic_add_fetch_32(&pHandle->epoch, 1);
H
Haojun Liao 已提交
882
      taosMemoryFree(req.qmsg);
883
      return tqMetaSaveHandle(pTq, req.subKey, pHandle);
884 885 886
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
887

888 889 890
      // kill executing task
      qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
      if (pTaskInfo != NULL) {
891
        qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
892 893
      }

894 895
      taosWLockLatch(&pTq->lock);
      atomic_store_32(&pHandle->epoch, -1);
896

897 898
      // remove if it has been register in the push manager, and return one empty block to consumer
      tqRemovePushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
899

900 901
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
      atomic_add_fetch_32(&pHandle->epoch, 1);
902

903
      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
904 905 906
        qStreamCloseTsdbReader(pTaskInfo);
      }

907 908 909 910 911
      taosWUnLockLatch(&pTq->lock);
      if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
        taosMemoryFree(req.qmsg);
        return -1;
      }
L
Liu Jicong 已提交
912
    }
L
Liu Jicong 已提交
913
  }
L
Liu Jicong 已提交
914

H
Haojun Liao 已提交
915
  taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
916
  return 0;
L
Liu Jicong 已提交
917
}
918

919
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
L
Liu Jicong 已提交
920
#if 0
921
  if (pTask->taskLevel == TASK_LEVEL__AGG) {
L
Liu Jicong 已提交
922
    A(taosArrayGetSize(pTask->childEpInfo) != 0);
L
Liu Jicong 已提交
923
  }
L
Liu Jicong 已提交
924
#endif
L
Liu Jicong 已提交
925

926
  int32_t vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
927
  pTask->refCnt = 1;
L
Liu Jicong 已提交
928
  pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
L
Liu Jicong 已提交
929 930 931

  pTask->inputQueue = streamQueueOpen();
  pTask->outputQueue = streamQueueOpen();
L
Liu Jicong 已提交
932 933

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
934
    return -1;
L
Liu Jicong 已提交
935 936
  }

L
Liu Jicong 已提交
937 938
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
939
  pTask->pMsgCb = &pTq->pVnode->msgCb;
940 941
  pTask->startVer = ver;

942
  // expand executor
943 944 945 946
  if (pTask->fillHistory) {
    pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
  }

947
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
948
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
949 950 951 952
    if (pTask->pState == NULL) {
      return -1;
    }

953 954 955 956
    SReadHandle handle = {
        .meta = pTq->pVnode->pMeta,
        .vnode = pTq->pVnode,
        .initTqReader = 1,
957
        .pStateBackend = pTask->pState,
958
    };
959 960

    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
L
Liu Jicong 已提交
961 962 963
    if (pTask->exec.executor == NULL) {
      return -1;
    }
964

965
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
966
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
967 968 969
    if (pTask->pState == NULL) {
      return -1;
    }
970 971 972
    SReadHandle mgHandle = {
        .vnode = NULL,
        .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
973
        .pStateBackend = pTask->pState,
974
    };
975 976

    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId);
L
Liu Jicong 已提交
977 978 979
    if (pTask->exec.executor == NULL) {
      return -1;
    }
L
Liu Jicong 已提交
980
  }
L
Liu Jicong 已提交
981 982

  // sink
L
Liu Jicong 已提交
983
  /*pTask->ahandle = pTq->pVnode;*/
984
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
985
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
986
    pTask->smaSink.smaSink = smaHandleRes;
987
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
988
    pTask->tbSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
989
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
L
Liu Jicong 已提交
990

H
Haojun Liao 已提交
991
    int32_t ver1 = 1;
5
54liuyao 已提交
992 993 994
    SMetaInfo info = {0};
    int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
    if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
995
      ver1 = info.skmVer;
5
54liuyao 已提交
996
    }
L
Liu Jicong 已提交
997

L
Liu Jicong 已提交
998
    pTask->tbSink.pTSchema =
H
Haojun Liao 已提交
999
        tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, ver1);
wmmhello's avatar
wmmhello 已提交
1000
    if(pTask->tbSink.pTSchema == NULL) {
wmmhello's avatar
wmmhello 已提交
1001
      return -1;
wmmhello's avatar
wmmhello 已提交
1002
    }
L
Liu Jicong 已提交
1003
  }
1004 1005

  streamSetupTrigger(pTask);
1006
  tqInfo("expand stream task on vg %d, task id %d, child id %d, level %d", vgId, pTask->taskId, pTask->selfChildId, pTask->taskLevel);
L
Liu Jicong 已提交
1007
  return 0;
L
Liu Jicong 已提交
1008
}
L
Liu Jicong 已提交
1009

1010 1011 1012 1013 1014 1015
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
  char*               msgStr = pMsg->pCont;
  char*               msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t             msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamTaskCheckReq req;
  SDecoder            decoder;
X
Xiaoyu Wang 已提交
1016
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028
  tDecodeSStreamTaskCheckReq(&decoder, &req);
  tDecoderClear(&decoder);
  int32_t             taskId = req.downstreamTaskId;
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
L
Liu Jicong 已提交
1029
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1030 1031 1032 1033 1034 1035
  if (pTask && atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
    rsp.status = 1;
  } else {
    rsp.status = 0;
  }

L
Liu Jicong 已提交
1036
  if (pTask) streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1037

1038
  tqDebug("tq recv task check req(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
1039 1040 1041 1042 1043 1044 1045
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
  tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
  if (code < 0) {
L
Liu Jicong 已提交
1046
    tqError("unable to encode rsp %d", __LINE__);
L
Liu Jicong 已提交
1047
    return -1;
1048
  }
L
Liu Jicong 已提交
1049

1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
  tEncoderClear(&encoder);

  SRpcMsg rspMsg = {
      .code = 0,
      .pCont = buf,
      .contLen = sizeof(SMsgHead) + len,
      .info = pMsg->info,
  };

  tmsgSendRsp(&rspMsg);
  return 0;
}

1069
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }
  tDecoderClear(&decoder);

1082
  tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
1083 1084
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

L
Liu Jicong 已提交
1085
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
1086 1087 1088 1089
  if (pTask == NULL) {
    return -1;
  }

1090
  code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
L
Liu Jicong 已提交
1091 1092
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
1093 1094
}

1095
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1096 1097 1098 1099 1100
  int32_t code;
#if 0
  code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
  if (code < 0) return code;
#endif
5
54liuyao 已提交
1101 1102 1103
  if (tsDisableStream) {
    return 0;
  }
1104 1105 1106 1107 1108 1109

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
1110

1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTask(&decoder, pTask);
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
  tDecoderClear(&decoder);

  // 2.save task
1122
  code = streamMetaAddTask(pTq->pStreamMeta, sversion, pTask);
1123 1124 1125 1126 1127 1128
  if (code < 0) {
    return -1;
  }

  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
1129
    streamTaskCheckDownstream(pTask, sversion);
1130 1131 1132 1133 1134
  }

  return 0;
}

L
Liu Jicong 已提交
1135 1136 1137 1138 1139
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
  int32_t code;
  char*   msg = pMsg->pCont;
  int32_t msgLen = pMsg->contLen;

1140
  SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
L
Liu Jicong 已提交
1141
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1142 1143 1144 1145 1146 1147 1148
  if (pTask == NULL) {
    return -1;
  }

  // check param
  int64_t fillVer1 = pTask->startVer;
  if (fillVer1 <= 0) {
L
Liu Jicong 已提交
1149
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1150 1151 1152 1153 1154 1155
    return -1;
  }

  // do recovery step 1
  streamSourceRecoverScanStep1(pTask);

L
Liu Jicong 已提交
1156 1157 1158 1159 1160
  if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1161 1162 1163 1164
  // build msg to launch next step
  SStreamRecoverStep2Req req;
  code = streamBuildSourceRecover2Req(pTask, &req);
  if (code < 0) {
L
Liu Jicong 已提交
1165
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1166 1167 1168
    return -1;
  }

L
Liu Jicong 已提交
1169
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1170

L
Liu Jicong 已提交
1171 1172 1173 1174
  if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
    return 0;
  }

1175
  // serialize msg
L
Liu Jicong 已提交
1176 1177 1178 1179 1180 1181 1182 1183
  int32_t len = sizeof(SStreamRecoverStep1Req);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);
1184 1185 1186 1187 1188

  // dispatch msg
  SRpcMsg rpcMsg = {
      .code = 0,
      .contLen = len,
L
Liu Jicong 已提交
1189
      .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE,
L
Liu Jicong 已提交
1190
      .pCont = serializedReq,
1191 1192 1193 1194 1195 1196 1197
  };

  tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);

  return 0;
}

1198
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1199 1200
  int32_t                 code;
  SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
L
Liu Jicong 已提交
1201
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1202 1203 1204 1205 1206
  if (pTask == NULL) {
    return -1;
  }

  // do recovery step 2
1207
  code = streamSourceRecoverScanStep2(pTask, sversion);
1208
  if (code < 0) {
L
Liu Jicong 已提交
1209
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1210 1211 1212
    return -1;
  }

L
Liu Jicong 已提交
1213 1214 1215 1216 1217
  if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1218 1219 1220
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1221
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1222 1223 1224 1225 1226 1227
    return -1;
  }

  // set status normal
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1228
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1229 1230 1231 1232 1233 1234
    return -1;
  }

  // dispatch recover finish req to all related downstream task
  code = streamDispatchRecoverFinishReq(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1235
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1236 1237 1238
    return -1;
  }

L
Liu Jicong 已提交
1239 1240 1241
  atomic_store_8(&pTask->fillHistory, 0);
  streamMetaSaveTask(pTq->pStreamMeta, pTask);

L
Liu Jicong 已提交
1242 1243
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);

1244 1245 1246
  return 0;
}

L
Liu Jicong 已提交
1247 1248 1249
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1250 1251

  // deserialize
1252 1253 1254
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1255
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1256 1257 1258
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

1259
  // find task
L
Liu Jicong 已提交
1260
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1261 1262 1263
  if (pTask == NULL) {
    return -1;
  }
1264
  // do process request
1265
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
1266
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1267 1268 1269
    return -1;
  }

L
Liu Jicong 已提交
1270
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1271
  return 0;
L
Liu Jicong 已提交
1272
}
L
Liu Jicong 已提交
1273

L
Liu Jicong 已提交
1274 1275 1276 1277 1278
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

L
Liu Jicong 已提交
1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
  bool        failed = false;
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    failed = true;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t sz = taosArrayGetSize(pRes->uidList);
L
Liu Jicong 已提交
1295
  if (sz == 0 || pRes->affectedRows == 0) {
L
Liu Jicong 已提交
1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306
    taosArrayDestroy(pRes->uidList);
    return 0;
  }
  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, sz);
  pDelBlock->info.rows = sz;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < sz; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
1307
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
L
Liu Jicong 已提交
1308
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
1309
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
L
Liu Jicong 已提交
1310 1311 1312
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
1313
    colDataSetVal(pUidCol, i, (const char*)pUid, false);
L
Liu Jicong 已提交
1314

1315 1316 1317
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
L
Liu Jicong 已提交
1318 1319
  }

L
Liu Jicong 已提交
1320 1321
  taosArrayDestroy(pRes->uidList);

L
Liu Jicong 已提交
1322 1323 1324
  int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
  *pRef = 1;

L
Liu Jicong 已提交
1325 1326 1327 1328 1329 1330 1331 1332 1333
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
    if (pIter == NULL) break;
    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;

    qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);

L
Liu Jicong 已提交
1334
    if (!failed) {
S
Shengliang Guan 已提交
1335
      SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1336 1337 1338 1339 1340 1341 1342
      pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
      pRefBlock->pBlock = pDelBlock;
      pRefBlock->dataRef = pRef;
      atomic_add_fetch_32(pRefBlock->dataRef, 1);

      if (streamTaskInput(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
        qError("stream task input del failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
1343

L
Liu Jicong 已提交
1344
        atomic_sub_fetch_32(pRef, 1);
L
Liu Jicong 已提交
1345
        taosFreeQitem(pRefBlock);
L
Liu Jicong 已提交
1346 1347
        continue;
      }
L
Liu Jicong 已提交
1348

L
Liu Jicong 已提交
1349 1350 1351 1352
      if (streamSchedExec(pTask) < 0) {
        qError("stream task launch failed, task id %d", pTask->taskId);
        continue;
      }
L
Liu Jicong 已提交
1353

L
Liu Jicong 已提交
1354 1355 1356 1357
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1358

L
Liu Jicong 已提交
1359
  int32_t ref = atomic_sub_fetch_32(pRef, 1);
L
Liu Jicong 已提交
1360
  /*A(ref >= 0);*/
L
Liu Jicong 已提交
1361
  if (ref == 0) {
L
Liu Jicong 已提交
1362
    blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1363 1364 1365 1366
    taosMemoryFree(pRef);
  }

#if 0
S
Shengliang Guan 已提交
1367
    SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388
    pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
    pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
    SSDataBlock block = {0};
    assignOneDataBlock(&block, pDelBlock);
    block.info.type = STREAM_DELETE_DATA;
    taosArrayPush(pStreamBlock->blocks, &block);

    if (!failed) {
      if (streamTaskInput(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
        qError("stream task input del failed, task id %d", pTask->taskId);
        continue;
      }

      if (streamSchedExec(pTask) < 0) {
        qError("stream task launch failed, task id %d", pTask->taskId);
        continue;
      }
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1389
  blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1390
#endif
L
Liu Jicong 已提交
1391 1392 1393 1394

  return 0;
}

L
Liu Jicong 已提交
1395
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
L
Liu Jicong 已提交
1396 1397 1398
  void*               pIter = NULL;
  bool                failed = false;
  SStreamDataSubmit2* pSubmit = NULL;
L
Liu Jicong 已提交
1399

L
Liu Jicong 已提交
1400
  pSubmit = streamDataSubmitNew(submit);
L
Liu Jicong 已提交
1401
  if (pSubmit == NULL) {
L
Liu Jicong 已提交
1402
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1403
    tqError("failed to create data submit for stream since out of memory");
L
Liu Jicong 已提交
1404 1405 1406 1407
    failed = true;
  }

  while (1) {
L
Liu Jicong 已提交
1408
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
1409 1410 1411 1412
    if (pIter == NULL) {
      break;
    }

1413
    SStreamTask* pTask = *(SStreamTask**)pIter;
1414
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
1415
    if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
L
Liu Jicong 已提交
1416 1417 1418
      tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus);
      continue;
    }
L
Liu Jicong 已提交
1419

L
Liu Jicong 已提交
1420
    tqDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, submit.ver);
L
Liu Jicong 已提交
1421

L
Liu Jicong 已提交
1422 1423
    if (!failed) {
      if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
L
Liu Jicong 已提交
1424
        tqError("stream task input failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
1425 1426 1427
        continue;
      }

L
Liu Jicong 已提交
1428
      if (streamSchedExec(pTask) < 0) {
L
Liu Jicong 已提交
1429
        tqError("stream task launch failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
1430 1431
        continue;
      }
L
Liu Jicong 已提交
1432
    } else {
L
Liu Jicong 已提交
1433
      streamTaskInputFail(pTask);
L
Liu Jicong 已提交
1434 1435 1436
    }
  }

L
Liu Jicong 已提交
1437
  if (pSubmit) {
L
Liu Jicong 已提交
1438
    streamDataSubmitRefDec(pSubmit);
L
Liu Jicong 已提交
1439
    taosFreeQitem(pSubmit);
L
Liu Jicong 已提交
1440
  }
L
Liu Jicong 已提交
1441 1442

  return failed ? -1 : 0;
L
Liu Jicong 已提交
1443 1444
}

L
Liu Jicong 已提交
1445 1446 1447
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
  int32_t            taskId = pReq->taskId;
L
Liu Jicong 已提交
1448
  SStreamTask*       pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1449 1450
  if (pTask) {
    streamProcessRunReq(pTask);
L
Liu Jicong 已提交
1451
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1452
    return 0;
1453 1454
  } else {
    return -1;
L
Liu Jicong 已提交
1455
  }
L
Liu Jicong 已提交
1456 1457
}

L
Liu Jicong 已提交
1458
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
1459 1460 1461 1462 1463
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamDispatchReq req;
  SDecoder           decoder;
L
Liu Jicong 已提交
1464
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1465
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1466 1467
  int32_t taskId = req.taskId;

L
Liu Jicong 已提交
1468
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1469
  if (pTask) {
1470 1471 1472 1473
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1474
    streamProcessDispatchReq(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1475
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1476
    return 0;
1477 1478
  } else {
    return -1;
L
Liu Jicong 已提交
1479
  }
L
Liu Jicong 已提交
1480 1481
}

L
Liu Jicong 已提交
1482 1483
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1484
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1485
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1486
  tqDebug("recv dispatch rsp, code: %x", pMsg->code);
L
Liu Jicong 已提交
1487
  if (pTask) {
1488
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1489
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1490
    return 0;
1491 1492
  } else {
    return -1;
L
Liu Jicong 已提交
1493
  }
L
Liu Jicong 已提交
1494
}
L
Liu Jicong 已提交
1495

1496
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1497
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1498
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1499
  return 0;
L
Liu Jicong 已提交
1500
}
L
Liu Jicong 已提交
1501 1502 1503 1504 1505 1506 1507

int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1508
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1509
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1510
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1511
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1512
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1513
  if (pTask) {
L
Liu Jicong 已提交
1514 1515 1516 1517
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1518
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1519
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1520
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1521
    return 0;
L
Liu Jicong 已提交
1522 1523
  } else {
    return -1;
L
Liu Jicong 已提交
1524 1525 1526 1527 1528 1529 1530
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1531

1532 1533 1534 1535 1536 1537
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1538 1539 1540

  SStreamDispatchReq req;
  SDecoder           decoder;
1541
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1542 1543
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1544
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1545 1546
    goto FAIL;
  }
L
Liu Jicong 已提交
1547
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1548

L
Liu Jicong 已提交
1549
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1550

L
Liu Jicong 已提交
1551
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1552
  if (pTask) {
L
Liu Jicong 已提交
1553 1554 1555 1556
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1557
    streamProcessDispatchReq(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1558
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1559 1560
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1561
    return 0;
L
Liu Jicong 已提交
1562
  }
L
Liu Jicong 已提交
1563

1564 1565
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1566
FAIL:
1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
    SRpcMsg rsp = {
        .code = TSDB_CODE_OUT_OF_MEMORY,
        .info = pMsg->info,
    };
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1591 1592 1593
  SRpcMsg rsp = {
      .code = code,
      .info = pMsg->info,
1594 1595
      .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp),
      .pCont = pRspHead,
L
Liu Jicong 已提交
1596
  };
1597
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1598
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1599 1600
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1601
  return -1;
L
Liu Jicong 已提交
1602
}
L
Liu Jicong 已提交
1603

1604
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }