tq.c 50.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

L
Liu Jicong 已提交
18
int32_t tqInit() {
L
Liu Jicong 已提交
19 20 21 22 23 24
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

25 26 27 28 29 30
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
31 32 33
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
34
    atomic_store_8(&tqMgmt.inited, 1);
35
  }
36

L
Liu Jicong 已提交
37 38
  return 0;
}
L
Liu Jicong 已提交
39

40
void tqCleanUp() {
L
Liu Jicong 已提交
41 42 43 44 45 46 47 48
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
49
    streamCleanUp();
L
Liu Jicong 已提交
50 51
    atomic_store_8(&tqMgmt.inited, 0);
  }
52
}
L
Liu Jicong 已提交
53

54
static void destroyTqHandle(void* data) {
55 56 57
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
58
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
59 60 61 62
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
    tqCloseReader(pData->execHandle.pExecReader);
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
63
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
64 65 66 67 68
    walCloseReader(pData->pWalReader);
    tqCloseReader(pData->execHandle.pExecReader);
  }
}

L
Liu Jicong 已提交
69
static void tqPushEntryFree(void* data) {
L
Liu Jicong 已提交
70
  STqPushEntry* p = *(void**)data;
H
Haojun Liao 已提交
71 72 73 74 75 76 77
  if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
    tDeleteSMqDataRsp(p->pDataRsp);
  } else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) {
    tDeleteSTaosxRsp((STaosxRsp*)p->pDataRsp);
  }

  taosMemoryFree(p->pDataRsp);
L
Liu Jicong 已提交
78 79 80
  taosMemoryFree(p);
}

L
Liu Jicong 已提交
81
STQ* tqOpen(const char* path, SVnode* pVnode) {
82
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
83
  if (pTq == NULL) {
S
Shengliang Guan 已提交
84
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
85 86
    return NULL;
  }
87
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
88
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
89
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
90

91
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
92
  taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
93

94
  taosInitRWLatch(&pTq->lock);
L
Liu Jicong 已提交
95
  pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
L
Liu Jicong 已提交
96
  taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree);
L
Liu Jicong 已提交
97

98
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
99
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
100

L
Liu Jicong 已提交
101
  if (tqMetaOpen(pTq) < 0) {
L
Liu Jicong 已提交
102
    return NULL;
103 104
  }

L
Liu Jicong 已提交
105 106
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
L
Liu Jicong 已提交
107
    return NULL;
108 109
  }

110
  pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
111
  if (pTq->pStreamMeta == NULL) {
L
Liu Jicong 已提交
112
    return NULL;
L
Liu Jicong 已提交
113 114
  }

L
Liu Jicong 已提交
115
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pVnode->pWal)) < 0) {
L
Liu Jicong 已提交
116
    return NULL;
L
Liu Jicong 已提交
117 118
  }

L
Liu Jicong 已提交
119 120
  return pTq;
}
L
Liu Jicong 已提交
121

L
Liu Jicong 已提交
122
void tqClose(STQ* pTq) {
123 124
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
125
  }
126 127 128 129 130 131 132 133 134

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
135
}
L
Liu Jicong 已提交
136

L
Liu Jicong 已提交
137
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
138 139 140 141 142 143 144
  int32_t len = 0;
  int32_t code = 0;
  tEncodeSize(tEncodeSMqMetaRsp, pRsp, len, code);
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
L
Liu Jicong 已提交
145 146 147 148 149 150 151 152 153 154
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_META_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
155 156 157 158 159

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
  tEncodeSMqMetaRsp(&encoder, pRsp);
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
160 161 162 163 164 165 166 167 168

  SRpcMsg resp = {
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
  tmsgSendRsp(&resp);

169
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d",
170
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
L
Liu Jicong 已提交
171 172 173 174

  return 0;
}

H
Haojun Liao 已提交
175 176
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
177 178
  int32_t len = 0;
  int32_t code = 0;
H
Haojun Liao 已提交
179 180 181 182 183 184

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
    tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
185 186 187 188 189 190 191 192 193 194 195

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

H
Haojun Liao 已提交
196 197 198
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
199 200 201 202 203

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
H
Haojun Liao 已提交
204 205 206 207 208 209 210

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
    tEncodeSMqDataRsp(&encoder, pRsp);
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*) pRsp);
  }

L
Liu Jicong 已提交
211 212 213
  tEncoderClear(&encoder);

  SRpcMsg rsp = {
H
Haojun Liao 已提交
214
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
215 216 217 218 219 220 221 222 223
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };

  tmsgSendRsp(&rsp);
  return 0;
}

H
Haojun Liao 已提交
224 225 226
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
  SMqDataRsp* pRsp = pPushEntry->pDataRsp;

L
Liu Jicong 已提交
227 228 229
#if 0
  A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
  A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
L
Liu Jicong 已提交
230

L
Liu Jicong 已提交
231 232
  A(!pRsp->withSchema);
  A(taosArrayGetSize(pRsp->blockSchema) == 0);
L
Liu Jicong 已提交
233

234
  if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
H
Haojun Liao 已提交
235
    A(pRsp->rspOffset.version > pRsp->reqOffset.version);
L
Liu Jicong 已提交
236
  }
L
Liu Jicong 已提交
237
#endif
L
Liu Jicong 已提交
238

H
Haojun Liao 已提交
239 240
  SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
  doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType);
L
Liu Jicong 已提交
241

wmmhello's avatar
wmmhello 已提交
242 243
  char buf1[80] = {0};
  char buf2[80] = {0};
H
Haojun Liao 已提交
244 245 246 247
  tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
          TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
L
Liu Jicong 已提交
248 249 250
  return 0;
}

H
Haojun Liao 已提交
251
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type) {
L
Liu Jicong 已提交
252 253 254
#if 0
  A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
  A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
255

H
Haojun Liao 已提交
256 257
  A(!pRsp->withSchema);
  A(taosArrayGetSize(pRsp->blockSchema) == 0);
258 259 260

  if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
    if (pRsp->blockNum > 0) {
L
Liu Jicong 已提交
261
      A(pRsp->rspOffset.version > pRsp->reqOffset.version);
262
    } else {
L
Liu Jicong 已提交
263
      A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
264 265
    }
  }
L
Liu Jicong 已提交
266
#endif
H
Haojun Liao 已提交
267
  doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type);
268 269 270 271 272 273

  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);

H
Haojun Liao 已提交
274 275
  tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%"PRIx64,
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
H
Haojun Liao 已提交
276

277 278 279
  return 0;
}

280 281 282 283 284
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

285
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
286
  STqOffset offset = {0};
H
Haojun Liao 已提交
287
  int32_t vgId = TD_VID(pTq->pVnode);
288

X
Xiaoyu Wang 已提交
289 290
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
291 292 293
  if (tDecodeSTqOffset(&decoder, &offset) < 0) {
    return -1;
  }
294

295 296
  tDecoderClear(&decoder);

wmmhello's avatar
wmmhello 已提交
297
  if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
298
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
H
Haojun Liao 已提交
299
            offset.subKey, vgId, offset.val.uid, offset.val.ts);
L
Liu Jicong 已提交
300
  } else if (offset.val.type == TMQ_OFFSET__LOG) {
S
Shengliang Guan 已提交
301
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
H
Haojun Liao 已提交
302
            vgId, offset.val.version);
303
    if (offset.val.version + 1 == sversion) {
304 305
      offset.val.version += 1;
    }
306
  } else {
307 308
    tqError("invalid commit offset type:%d", offset.val.type);
    return -1;
309
  }
310 311 312 313

  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(&offset, pSavedOffset)) {
    return 0;  // no need to update the offset value
314 315
  }

316
  // save the new offset value
317 318
  if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
    return -1;
319
  }
320 321

  if (offset.val.type == TMQ_OFFSET__LOG) {
322
    STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
323 324
    if (pHandle && (walRefVer(pHandle->pRef, offset.val.version) < 0)) {
      return -1;
325 326 327
    }
  }

328 329 330
  return 0;
}

L
Liu Jicong 已提交
331
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
332
  void* pIter = NULL;
333

L
Liu Jicong 已提交
334
  while (1) {
335
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
336 337 338 339
    if (pIter == NULL) {
      break;
    }

340
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
341

L
Liu Jicong 已提交
342 343
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
344
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
345 346
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
347
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
348 349 350 351 352
          return -1;
        }
      }
    }
  }
353

L
Liu Jicong 已提交
354 355 356
  return 0;
}

L
Liu Jicong 已提交
357 358 359 360 361 362 363 364 365 366
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL) {
    return -1;
  }

L
Liu Jicong 已提交
367 368
  pRsp->withTbName = 0;
#if 0
L
Liu Jicong 已提交
369 370 371 372 373 374 375 376
  pRsp->withTbName = pReq->withTbName;
  if (pRsp->withTbName) {
    pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
    if (pRsp->blockTbName == NULL) {
      // TODO free
      return -1;
    }
  }
L
Liu Jicong 已提交
377
#endif
L
Liu Jicong 已提交
378

L
Liu Jicong 已提交
379
  pRsp->withSchema = false;
L
Liu Jicong 已提交
380 381 382
  return 0;
}

383 384 385 386 387 388 389 390 391 392 393 394 395
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->withTbName = 1;
  pRsp->withSchema = 1;
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
    return -1;
  }
396

397 398 399
  return 0;
}

400 401 402 403 404
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
  uint64_t     consumerId = pRequest->consumerId;
  STqOffsetVal reqOffset = pRequest->reqOffset;
  STqOffset*   pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
H
Haojun Liao 已提交
405 406
  int32_t      vgId = TD_VID(pTq->pVnode);

407 408 409 410 411 412 413 414
  *pBlockReturned = false;

  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
  if (pOffset != NULL) {
    *pOffsetVal = pOffset->val;

    char formatBuf[80];
    tFormatOffset(formatBuf, 80, pOffsetVal);
415
    tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.",
H
Haojun Liao 已提交
416
            consumerId, pHandle->subKey, vgId, formatBuf);
417 418 419 420 421
    return 0;
  } else {
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
    if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
      if (pRequest->useSnapshot) {
H
Haojun Liao 已提交
422 423 424
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
                consumerId, pHandle->subKey, vgId);

425 426 427 428 429 430 431 432 433 434 435
        if (pHandle->fetchMeta) {
          tqOffsetResetToMeta(pOffsetVal, 0);
        } else {
          tqOffsetResetToData(pOffsetVal, 0, 0);
        }
      } else {
        pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
        if (pHandle->pRef == NULL) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return -1;
        }
D
dapan1121 已提交
436

437 438 439 440 441 442 443 444
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer - 1);
      }
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
        SMqDataRsp dataRsp = {0};
        tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);

        tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
H
Haojun Liao 已提交
445 446
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
                pHandle->subKey, vgId, dataRsp.rspOffset.version);
H
Haojun Liao 已提交
447
        int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
448 449 450 451 452 453 454 455
        tDeleteSMqDataRsp(&dataRsp);

        *pBlockReturned = true;
        return code;
      } else {
        STaosxRsp taosxRsp = {0};
        tqInitTaosxRsp(&taosxRsp, pRequest);
        tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
H
Haojun Liao 已提交
456
        int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
457
        tDeleteSTaosxRsp(&taosxRsp);
L
Liu Jicong 已提交
458

459 460 461 462
        *pBlockReturned = true;
        return code;
      }
    } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
H
Haojun Liao 已提交
463 464
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed",
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
465 466 467
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return -1;
    }
L
Liu Jicong 已提交
468 469
  }

470 471
  return 0;
}
L
Liu Jicong 已提交
472

473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511
#define IS_OFFSET_RESET_TYPE(_t)  ((_t) < 0)

static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
  int32_t  code = 0;
  uint64_t consumerId = pRequest->consumerId;
  int32_t  vgId = TD_VID(pTq->pVnode);

  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);

  // lock
  taosWLockLatch(&pTq->lock);

  qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset);

  // till now, all data has been transferred to consumer, new data needs to push client once arrived.
  if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
      dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
    code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
    taosWUnLockLatch(&pTq->lock);
    return code;
  }

  taosWUnLockLatch(&pTq->lock);
  code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);

  // NOTE: this pHandle->consumerId may have been changed already.
  tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
          ", ts:%" PRId64 ", reqId:0x%" PRIx64,
          consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
          dataRsp.rspOffset.ts, pRequest->reqId);

  tDeleteSMqDataRsp(&dataRsp);
  return code;
}

static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
512
  int32_t      code = -1;
513
  STqOffsetVal offset = {0};
514
  SWalCkHead*  pCkHead = NULL;
515 516 517
  int32_t      vgId = TD_VID(pTq->pVnode);

  STqOffsetVal reqOffset = pRequest->reqOffset;
518
  uint64_t     consumerId = pRequest->consumerId;
L
Liu Jicong 已提交
519

520
  // 1. reset the offset if needed
521 522
  if (IS_OFFSET_RESET_TYPE(reqOffset.type)) {
    // handle the reset offset cases, according to the consumer's choice.
523
    bool blockReturned = false;
524
    code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
525 526 527 528 529 530 531
    if (code != 0) {
      return code;
    }

    // empty block returned, quit
    if (blockReturned) {
      return 0;
L
Liu Jicong 已提交
532
    }
533
  } else { // use the consumer specified offset
534
    // the offset value can not be monotonious increase??
535
    offset = reqOffset;
L
Liu Jicong 已提交
536 537
  }

538
  // this is a normal subscribe requirement
539
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
540
    return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
541 542 543 544 545 546 547 548 549 550
  } else { // for taosX
    // todo handle the case where re-balance occurs.
    SMqMetaRsp metaRsp = {0};
    STaosxRsp  taosxRsp = {0};
    tqInitTaosxRsp(&taosxRsp, pRequest);

    if (offset.type != TMQ_OFFSET__LOG) {
      if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) {
        return -1;
      }
wmmhello's avatar
wmmhello 已提交
551

552 553 554 555 556 557 558 559
      if (metaRsp.metaRspLen > 0) {
        code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
        tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
                ",ts:%" PRId64,
                consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts);
        taosMemoryFree(metaRsp.metaRsp);
        tDeleteSTaosxRsp(&taosxRsp);
        return code;
wmmhello's avatar
wmmhello 已提交
560 561
      }

562
      if (taosxRsp.blockNum > 0) {
563 564 565
        code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
        tDeleteSTaosxRsp(&taosxRsp);
        return code;
566 567
      } else {
        offset = taosxRsp.rspOffset;
wmmhello's avatar
wmmhello 已提交
568 569
      }

570 571 572 573
      tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
              ",version:%" PRId64,
              consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
              taosxRsp.rspOffset.version);
H
Haojun Liao 已提交
574
    }
wmmhello's avatar
wmmhello 已提交
575

H
Haojun Liao 已提交
576
    if (offset.type == TMQ_OFFSET__LOG) {
577 578 579 580 581 582 583
      int64_t fetchVer = offset.version + 1;
      pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
      if (pCkHead == NULL) {
        tDeleteSTaosxRsp(&taosxRsp);
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return -1;
      }
wmmhello's avatar
wmmhello 已提交
584

585
      walSetReaderCapacity(pHandle->pWalReader, 2048);
H
Haojun Liao 已提交
586

587 588 589 590 591 592 593 594
      while (1) {
        // todo refactor: this is not correct.
        int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
        if (savedEpoch > pRequest->epoch) {
          tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
                 ", found new consumer epoch %d, discard req epoch %d",
                 consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
          break;
wmmhello's avatar
wmmhello 已提交
595
        }
H
Haojun Liao 已提交
596

597
        if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
598
          tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
H
Haojun Liao 已提交
599
          code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
600
          tDeleteSTaosxRsp(&taosxRsp);
L
Liu Jicong 已提交
601
          taosMemoryFreeClear(pCkHead);
602
          return code;
wmmhello's avatar
wmmhello 已提交
603 604
        }

605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646
        SWalCont* pHead = &pCkHead->head;
        tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d",
                consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);

        if (pHead->msgType == TDMT_VND_SUBMIT) {
          SPackedData submit = {
              .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
              .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
              .ver = pHead->version,
          };

          if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
            tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId,
                    pRequest->subKey);
            return -1;
          }

          if (taosxRsp.blockNum > 0) {
            tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
            code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
            tDeleteSTaosxRsp(&taosxRsp);
            taosMemoryFreeClear(pCkHead);
            return code;
          } else {
            fetchVer++;
          }

        } else {
          /*A(pHandle->fetchMeta);*/
          /*A(IS_META_MSG(pHead->msgType));*/
          tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
          tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
          metaRsp.resMsgType = pHead->msgType;
          metaRsp.metaRspLen = pHead->bodyLen;
          metaRsp.metaRsp = pHead->body;
          if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) {
            code = -1;
            taosMemoryFreeClear(pCkHead);
            tDeleteSTaosxRsp(&taosxRsp);
            return code;
          }
          code = 0;
L
Liu Jicong 已提交
647
          taosMemoryFreeClear(pCkHead);
648
          tDeleteSTaosxRsp(&taosxRsp);
649
          return code;
wmmhello's avatar
wmmhello 已提交
650 651 652
        }
      }
    }
653

654 655 656 657
    tDeleteSTaosxRsp(&taosxRsp);
    taosMemoryFreeClear(pCkHead);
    return 0;
  }
L
Liu Jicong 已提交
658 659
}

660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq   req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

681
  // 2. check re-balance status
682
  taosRLockLatch(&pTq->lock);
683 684 685 686
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
687
    taosRUnLockLatch(&pTq->lock);
688 689
    return -1;
  }
690
  taosRUnLockLatch(&pTq->lock);
691

692
  // 3. update the epoch value
693
  taosWLockLatch(&pTq->lock);
H
Haojun Liao 已提交
694 695
  int32_t savedEpoch = pHandle->epoch;
  if (savedEpoch < reqEpoch) {
696 697
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
    pHandle->epoch = reqEpoch;
H
Haojun Liao 已提交
698
  }
699
  taosWUnLockLatch(&pTq->lock);
700 701 702

  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
H
Haojun Liao 已提交
703 704
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
705

706
  return doPollDataForMq(pTq, pHandle, &req, pMsg);
707 708
}

709
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
710
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
L
Liu Jicong 已提交
711

L
Liu Jicong 已提交
712
  tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
L
Liu Jicong 已提交
713

714
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
715 716 717 718
  int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
  if (code != 0) {
    tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
  }
719
  taosWUnLockLatch(&pTq->lock);
L
Liu Jicong 已提交
720

L
Liu Jicong 已提交
721 722
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
X
Xiaoyu Wang 已提交
723
    // walCloseRef(pHandle->pWalReader->pWal, pHandle->pRef->refId);
L
Liu Jicong 已提交
724 725 726 727 728 729 730
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
731
  }
732

L
Liu Jicong 已提交
733 734
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
735
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
736
  }
L
Liu Jicong 已提交
737

L
Liu Jicong 已提交
738
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
739
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
740
  }
L
Liu Jicong 已提交
741
  return 0;
L
Liu Jicong 已提交
742 743
}

744
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
745 746
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
747
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
748
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
749 750 751 752
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
753 754 755 756 757
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
758 759 760 761 762 763
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

764
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
765 766 767 768 769 770 771 772 773 774 775
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

776
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
777
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
778
  tDecodeSMqRebVgReq(msg, &req);
L
Liu Jicong 已提交
779

780 781 782
  SVnode* pVnode = pTq->pVnode;
  int32_t vgId = TD_VID(pVnode);

783
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
784
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
785

786
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
787
  if (pHandle == NULL) {
L
Liu Jicong 已提交
788
    if (req.oldConsumerId != -1) {
789
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
790
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
791
    }
792

L
Liu Jicong 已提交
793
    if (req.newConsumerId == -1) {
794
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
L
Liu Jicong 已提交
795
      taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
796 797
      return 0;
    }
798

L
Liu Jicong 已提交
799 800
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
801 802
    /*taosInitRWLatch(&pExec->lock);*/

H
Haojun Liao 已提交
803
    uint64_t oldConsumerId = pHandle->consumerId;
L
Liu Jicong 已提交
804 805 806
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
807

L
Liu Jicong 已提交
808
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
809
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
810

811
    // TODO version should be assigned and refed during preprocess
812
    SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
813
    if (pRef == NULL) {
H
Haojun Liao 已提交
814
      taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
815
      return -1;
816
    }
H
Haojun Liao 已提交
817

818 819
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
820

821
    SReadHandle handle = {
822 823
        .meta = pVnode->pMeta,
        .vnode = pVnode,
824 825 826 827
        .initTableReader = true,
        .initTqReader = true,
        .version = ver,
    };
828

wmmhello's avatar
wmmhello 已提交
829
    pHandle->snapshotVer = ver;
830

L
Liu Jicong 已提交
831
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
832
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
833
      req.qmsg = NULL;
834 835

      pHandle->execHandle.task =
836
          qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, NULL);
L
Liu Jicong 已提交
837
      void* scanner = NULL;
838
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
L
Liu Jicong 已提交
839
      pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
L
Liu Jicong 已提交
840
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
841 842
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
      pHandle->execHandle.pExecReader = tqOpenReader(pVnode);
843

L
Liu Jicong 已提交
844
      pHandle->execHandle.execDb.pFilterOutTbUid =
L
Liu Jicong 已提交
845
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
846 847
      buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
848

849
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
L
Liu Jicong 已提交
850
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
851
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
852 853
      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
854
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
855 856
      vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
857 858
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
859
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
L
Liu Jicong 已提交
860
      }
861
      pHandle->execHandle.pExecReader = tqOpenReader(pVnode);
L
Liu Jicong 已提交
862
      tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
L
Liu Jicong 已提交
863
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
864

L
Liu Jicong 已提交
865 866
      buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
867
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
L
Liu Jicong 已提交
868
    }
H
Haojun Liao 已提交
869

870
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
871 872
    tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
            pHandle->consumerId, oldConsumerId);
L
Liu Jicong 已提交
873
    if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
H
Haojun Liao 已提交
874
      taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
875
      return -1;
L
Liu Jicong 已提交
876
    }
L
Liu Jicong 已提交
877
  } else {
878 879 880 881
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
      atomic_store_32(&pHandle->epoch, -1);
      atomic_add_fetch_32(&pHandle->epoch, 1);
H
Haojun Liao 已提交
882
      taosMemoryFree(req.qmsg);
883
      return tqMetaSaveHandle(pTq, req.subKey, pHandle);
884 885 886
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
887

888 889
      taosWLockLatch(&pTq->lock);
      atomic_store_32(&pHandle->epoch, -1);
890

891 892
      // remove if it has been register in the push manager, and return one empty block to consumer
      tqRemovePushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
893

894 895
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
      atomic_add_fetch_32(&pHandle->epoch, 1);
896

897 898 899
      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
        qStreamCloseTsdbReader(pHandle->execHandle.task);
      }
H
Haojun Liao 已提交
900

901 902 903 904 905
      taosWUnLockLatch(&pTq->lock);
      if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
        taosMemoryFree(req.qmsg);
        return -1;
      }
L
Liu Jicong 已提交
906
    }
L
Liu Jicong 已提交
907
  }
L
Liu Jicong 已提交
908

H
Haojun Liao 已提交
909
  taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
910
  return 0;
L
Liu Jicong 已提交
911
}
912

913
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
L
Liu Jicong 已提交
914
#if 0
915
  if (pTask->taskLevel == TASK_LEVEL__AGG) {
L
Liu Jicong 已提交
916
    A(taosArrayGetSize(pTask->childEpInfo) != 0);
L
Liu Jicong 已提交
917
  }
L
Liu Jicong 已提交
918
#endif
L
Liu Jicong 已提交
919

920
  int32_t vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
921
  pTask->refCnt = 1;
L
Liu Jicong 已提交
922
  pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
L
Liu Jicong 已提交
923 924 925

  pTask->inputQueue = streamQueueOpen();
  pTask->outputQueue = streamQueueOpen();
L
Liu Jicong 已提交
926 927

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
928
    return -1;
L
Liu Jicong 已提交
929 930
  }

L
Liu Jicong 已提交
931 932
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
933
  pTask->pMsgCb = &pTq->pVnode->msgCb;
934 935
  pTask->startVer = ver;

936
  // expand executor
937 938 939 940
  if (pTask->fillHistory) {
    pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
  }

941
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
942
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
943 944 945 946
    if (pTask->pState == NULL) {
      return -1;
    }

947 948 949 950
    SReadHandle handle = {
        .meta = pTq->pVnode->pMeta,
        .vnode = pTq->pVnode,
        .initTqReader = 1,
951
        .pStateBackend = pTask->pState,
952
    };
953 954

    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
L
Liu Jicong 已提交
955 956 957
    if (pTask->exec.executor == NULL) {
      return -1;
    }
958

959
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
960
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
961 962 963
    if (pTask->pState == NULL) {
      return -1;
    }
964 965 966
    SReadHandle mgHandle = {
        .vnode = NULL,
        .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
967
        .pStateBackend = pTask->pState,
968
    };
969 970

    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId);
L
Liu Jicong 已提交
971 972 973
    if (pTask->exec.executor == NULL) {
      return -1;
    }
L
Liu Jicong 已提交
974
  }
L
Liu Jicong 已提交
975 976

  // sink
L
Liu Jicong 已提交
977
  /*pTask->ahandle = pTq->pVnode;*/
978
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
979
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
980
    pTask->smaSink.smaSink = smaHandleRes;
981
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
982
    pTask->tbSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
983
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
L
Liu Jicong 已提交
984

H
Haojun Liao 已提交
985
    int32_t ver1 = 1;
5
54liuyao 已提交
986 987 988
    SMetaInfo info = {0};
    int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
    if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
989
      ver1 = info.skmVer;
5
54liuyao 已提交
990
    }
L
Liu Jicong 已提交
991

L
Liu Jicong 已提交
992
    pTask->tbSink.pTSchema =
H
Haojun Liao 已提交
993
        tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, ver1);
wmmhello's avatar
wmmhello 已提交
994
    if(pTask->tbSink.pTSchema == NULL) {
wmmhello's avatar
wmmhello 已提交
995
      return -1;
wmmhello's avatar
wmmhello 已提交
996
    }
L
Liu Jicong 已提交
997
  }
998 999

  streamSetupTrigger(pTask);
1000
  tqInfo("expand stream task on vg %d, task id %d, child id %d, level %d", vgId, pTask->taskId, pTask->selfChildId, pTask->taskLevel);
L
Liu Jicong 已提交
1001
  return 0;
L
Liu Jicong 已提交
1002
}
L
Liu Jicong 已提交
1003

1004 1005 1006 1007 1008 1009
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
  char*               msgStr = pMsg->pCont;
  char*               msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t             msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamTaskCheckReq req;
  SDecoder            decoder;
X
Xiaoyu Wang 已提交
1010
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022
  tDecodeSStreamTaskCheckReq(&decoder, &req);
  tDecoderClear(&decoder);
  int32_t             taskId = req.downstreamTaskId;
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
L
Liu Jicong 已提交
1023
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1024 1025 1026 1027 1028 1029
  if (pTask && atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
    rsp.status = 1;
  } else {
    rsp.status = 0;
  }

L
Liu Jicong 已提交
1030
  if (pTask) streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1031

1032
  tqDebug("tq recv task check req(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
1033 1034 1035 1036 1037 1038 1039
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
  tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
  if (code < 0) {
L
Liu Jicong 已提交
1040
    tqError("unable to encode rsp %d", __LINE__);
L
Liu Jicong 已提交
1041
    return -1;
1042
  }
L
Liu Jicong 已提交
1043

1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
  tEncoderClear(&encoder);

  SRpcMsg rspMsg = {
      .code = 0,
      .pCont = buf,
      .contLen = sizeof(SMsgHead) + len,
      .info = pMsg->info,
  };

  tmsgSendRsp(&rspMsg);
  return 0;
}

1063
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }
  tDecoderClear(&decoder);

1076
  tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
1077 1078
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

L
Liu Jicong 已提交
1079
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
1080 1081 1082 1083
  if (pTask == NULL) {
    return -1;
  }

1084
  code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
L
Liu Jicong 已提交
1085 1086
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
1087 1088
}

1089
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1090 1091 1092 1093 1094
  int32_t code;
#if 0
  code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
  if (code < 0) return code;
#endif
5
54liuyao 已提交
1095 1096 1097
  if (tsDisableStream) {
    return 0;
  }
1098 1099 1100 1101 1102 1103

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
1104

1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTask(&decoder, pTask);
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
  tDecoderClear(&decoder);

  // 2.save task
1116
  code = streamMetaAddTask(pTq->pStreamMeta, sversion, pTask);
1117 1118 1119 1120 1121 1122
  if (code < 0) {
    return -1;
  }

  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
1123
    streamTaskCheckDownstream(pTask, sversion);
1124 1125 1126 1127 1128
  }

  return 0;
}

L
Liu Jicong 已提交
1129 1130 1131 1132 1133
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
  int32_t code;
  char*   msg = pMsg->pCont;
  int32_t msgLen = pMsg->contLen;

1134
  SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
L
Liu Jicong 已提交
1135
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1136 1137 1138 1139 1140 1141 1142
  if (pTask == NULL) {
    return -1;
  }

  // check param
  int64_t fillVer1 = pTask->startVer;
  if (fillVer1 <= 0) {
L
Liu Jicong 已提交
1143
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1144 1145 1146 1147 1148 1149
    return -1;
  }

  // do recovery step 1
  streamSourceRecoverScanStep1(pTask);

L
Liu Jicong 已提交
1150 1151 1152 1153 1154
  if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1155 1156 1157 1158
  // build msg to launch next step
  SStreamRecoverStep2Req req;
  code = streamBuildSourceRecover2Req(pTask, &req);
  if (code < 0) {
L
Liu Jicong 已提交
1159
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1160 1161 1162
    return -1;
  }

L
Liu Jicong 已提交
1163
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1164

L
Liu Jicong 已提交
1165 1166 1167 1168
  if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
    return 0;
  }

1169
  // serialize msg
L
Liu Jicong 已提交
1170 1171 1172 1173 1174 1175 1176 1177
  int32_t len = sizeof(SStreamRecoverStep1Req);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);
1178 1179 1180 1181 1182

  // dispatch msg
  SRpcMsg rpcMsg = {
      .code = 0,
      .contLen = len,
L
Liu Jicong 已提交
1183
      .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE,
L
Liu Jicong 已提交
1184
      .pCont = serializedReq,
1185 1186 1187 1188 1189 1190 1191
  };

  tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);

  return 0;
}

1192
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1193 1194
  int32_t                 code;
  SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
L
Liu Jicong 已提交
1195
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1196 1197 1198 1199 1200
  if (pTask == NULL) {
    return -1;
  }

  // do recovery step 2
1201
  code = streamSourceRecoverScanStep2(pTask, sversion);
1202
  if (code < 0) {
L
Liu Jicong 已提交
1203
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1204 1205 1206
    return -1;
  }

L
Liu Jicong 已提交
1207 1208 1209 1210 1211
  if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1212 1213 1214
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1215
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1216 1217 1218 1219 1220 1221
    return -1;
  }

  // set status normal
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1222
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1223 1224 1225 1226 1227 1228
    return -1;
  }

  // dispatch recover finish req to all related downstream task
  code = streamDispatchRecoverFinishReq(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1229
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1230 1231 1232
    return -1;
  }

L
Liu Jicong 已提交
1233 1234 1235
  atomic_store_8(&pTask->fillHistory, 0);
  streamMetaSaveTask(pTq->pStreamMeta, pTask);

L
Liu Jicong 已提交
1236 1237
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);

1238 1239 1240
  return 0;
}

L
Liu Jicong 已提交
1241 1242 1243
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1244 1245

  // deserialize
1246 1247 1248
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1249
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1250 1251 1252
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

1253
  // find task
L
Liu Jicong 已提交
1254
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1255 1256 1257
  if (pTask == NULL) {
    return -1;
  }
1258
  // do process request
1259
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
1260
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1261 1262 1263
    return -1;
  }

L
Liu Jicong 已提交
1264
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1265
  return 0;
L
Liu Jicong 已提交
1266
}
L
Liu Jicong 已提交
1267

L
Liu Jicong 已提交
1268 1269 1270 1271 1272
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

L
Liu Jicong 已提交
1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
  bool        failed = false;
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    failed = true;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t sz = taosArrayGetSize(pRes->uidList);
L
Liu Jicong 已提交
1289
  if (sz == 0 || pRes->affectedRows == 0) {
L
Liu Jicong 已提交
1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300
    taosArrayDestroy(pRes->uidList);
    return 0;
  }
  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, sz);
  pDelBlock->info.rows = sz;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < sz; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
1301
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
L
Liu Jicong 已提交
1302
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
1303
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
L
Liu Jicong 已提交
1304 1305 1306
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
1307
    colDataSetVal(pUidCol, i, (const char*)pUid, false);
L
Liu Jicong 已提交
1308

1309 1310 1311
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
L
Liu Jicong 已提交
1312 1313
  }

L
Liu Jicong 已提交
1314 1315
  taosArrayDestroy(pRes->uidList);

L
Liu Jicong 已提交
1316 1317 1318
  int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
  *pRef = 1;

L
Liu Jicong 已提交
1319 1320 1321 1322 1323 1324 1325 1326 1327
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
    if (pIter == NULL) break;
    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;

    qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);

L
Liu Jicong 已提交
1328
    if (!failed) {
S
Shengliang Guan 已提交
1329
      SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1330 1331 1332 1333 1334 1335 1336
      pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
      pRefBlock->pBlock = pDelBlock;
      pRefBlock->dataRef = pRef;
      atomic_add_fetch_32(pRefBlock->dataRef, 1);

      if (streamTaskInput(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
        qError("stream task input del failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
1337

L
Liu Jicong 已提交
1338
        atomic_sub_fetch_32(pRef, 1);
L
Liu Jicong 已提交
1339
        taosFreeQitem(pRefBlock);
L
Liu Jicong 已提交
1340 1341
        continue;
      }
L
Liu Jicong 已提交
1342

L
Liu Jicong 已提交
1343 1344 1345 1346
      if (streamSchedExec(pTask) < 0) {
        qError("stream task launch failed, task id %d", pTask->taskId);
        continue;
      }
L
Liu Jicong 已提交
1347

L
Liu Jicong 已提交
1348 1349 1350 1351
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1352

L
Liu Jicong 已提交
1353
  int32_t ref = atomic_sub_fetch_32(pRef, 1);
L
Liu Jicong 已提交
1354
  /*A(ref >= 0);*/
L
Liu Jicong 已提交
1355
  if (ref == 0) {
L
Liu Jicong 已提交
1356
    blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1357 1358 1359 1360
    taosMemoryFree(pRef);
  }

#if 0
S
Shengliang Guan 已提交
1361
    SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382
    pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
    pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
    SSDataBlock block = {0};
    assignOneDataBlock(&block, pDelBlock);
    block.info.type = STREAM_DELETE_DATA;
    taosArrayPush(pStreamBlock->blocks, &block);

    if (!failed) {
      if (streamTaskInput(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
        qError("stream task input del failed, task id %d", pTask->taskId);
        continue;
      }

      if (streamSchedExec(pTask) < 0) {
        qError("stream task launch failed, task id %d", pTask->taskId);
        continue;
      }
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1383
  blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1384
#endif
L
Liu Jicong 已提交
1385 1386 1387 1388

  return 0;
}

L
Liu Jicong 已提交
1389
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
L
Liu Jicong 已提交
1390 1391 1392
  void*               pIter = NULL;
  bool                failed = false;
  SStreamDataSubmit2* pSubmit = NULL;
L
Liu Jicong 已提交
1393

L
Liu Jicong 已提交
1394
  pSubmit = streamDataSubmitNew(submit);
L
Liu Jicong 已提交
1395
  if (pSubmit == NULL) {
L
Liu Jicong 已提交
1396
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1397
    tqError("failed to create data submit for stream since out of memory");
L
Liu Jicong 已提交
1398 1399 1400 1401
    failed = true;
  }

  while (1) {
L
Liu Jicong 已提交
1402
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
1403 1404 1405 1406
    if (pIter == NULL) {
      break;
    }

1407
    SStreamTask* pTask = *(SStreamTask**)pIter;
1408
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
1409
    if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
L
Liu Jicong 已提交
1410 1411 1412
      tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus);
      continue;
    }
L
Liu Jicong 已提交
1413

L
Liu Jicong 已提交
1414
    tqDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, submit.ver);
L
Liu Jicong 已提交
1415

L
Liu Jicong 已提交
1416 1417
    if (!failed) {
      if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
L
Liu Jicong 已提交
1418
        tqError("stream task input failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
1419 1420 1421
        continue;
      }

L
Liu Jicong 已提交
1422
      if (streamSchedExec(pTask) < 0) {
L
Liu Jicong 已提交
1423
        tqError("stream task launch failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
1424 1425
        continue;
      }
L
Liu Jicong 已提交
1426
    } else {
L
Liu Jicong 已提交
1427
      streamTaskInputFail(pTask);
L
Liu Jicong 已提交
1428 1429 1430
    }
  }

L
Liu Jicong 已提交
1431
  if (pSubmit) {
L
Liu Jicong 已提交
1432
    streamDataSubmitRefDec(pSubmit);
L
Liu Jicong 已提交
1433
    taosFreeQitem(pSubmit);
L
Liu Jicong 已提交
1434
  }
L
Liu Jicong 已提交
1435 1436

  return failed ? -1 : 0;
L
Liu Jicong 已提交
1437 1438
}

L
Liu Jicong 已提交
1439 1440 1441
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
  int32_t            taskId = pReq->taskId;
L
Liu Jicong 已提交
1442
  SStreamTask*       pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1443 1444
  if (pTask) {
    streamProcessRunReq(pTask);
L
Liu Jicong 已提交
1445
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1446
    return 0;
1447 1448
  } else {
    return -1;
L
Liu Jicong 已提交
1449
  }
L
Liu Jicong 已提交
1450 1451
}

L
Liu Jicong 已提交
1452
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
1453 1454 1455 1456 1457
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamDispatchReq req;
  SDecoder           decoder;
L
Liu Jicong 已提交
1458
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1459
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1460 1461
  int32_t taskId = req.taskId;

L
Liu Jicong 已提交
1462
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1463
  if (pTask) {
1464 1465 1466 1467
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1468
    streamProcessDispatchReq(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1469
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1470
    return 0;
1471 1472
  } else {
    return -1;
L
Liu Jicong 已提交
1473
  }
L
Liu Jicong 已提交
1474 1475
}

L
Liu Jicong 已提交
1476 1477
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1478
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1479
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1480
  tqDebug("recv dispatch rsp, code: %x", pMsg->code);
L
Liu Jicong 已提交
1481
  if (pTask) {
1482
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1483
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1484
    return 0;
1485 1486
  } else {
    return -1;
L
Liu Jicong 已提交
1487
  }
L
Liu Jicong 已提交
1488
}
L
Liu Jicong 已提交
1489

1490
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1491
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1492
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1493
  return 0;
L
Liu Jicong 已提交
1494
}
L
Liu Jicong 已提交
1495 1496 1497 1498 1499 1500 1501

int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1502
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1503
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1504
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1505
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1506
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1507
  if (pTask) {
L
Liu Jicong 已提交
1508 1509 1510 1511
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1512
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1513
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1514
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1515
    return 0;
L
Liu Jicong 已提交
1516 1517
  } else {
    return -1;
L
Liu Jicong 已提交
1518 1519 1520 1521 1522 1523 1524
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1525

1526 1527 1528 1529 1530 1531
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1532 1533 1534

  SStreamDispatchReq req;
  SDecoder           decoder;
1535
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1536 1537
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1538
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1539 1540
    goto FAIL;
  }
L
Liu Jicong 已提交
1541
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1542

L
Liu Jicong 已提交
1543
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1544

L
Liu Jicong 已提交
1545
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1546
  if (pTask) {
L
Liu Jicong 已提交
1547 1548 1549 1550
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1551
    streamProcessDispatchReq(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1552
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1553 1554
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1555
    return 0;
L
Liu Jicong 已提交
1556
  }
L
Liu Jicong 已提交
1557

1558 1559
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1560
FAIL:
1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
    SRpcMsg rsp = {
        .code = TSDB_CODE_OUT_OF_MEMORY,
        .info = pMsg->info,
    };
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1585 1586 1587
  SRpcMsg rsp = {
      .code = code,
      .info = pMsg->info,
1588 1589
      .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp),
      .pCont = pRspHead,
L
Liu Jicong 已提交
1590
  };
1591
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1592
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1593 1594
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1595
  return -1;
L
Liu Jicong 已提交
1596
}
L
Liu Jicong 已提交
1597

1598
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }