tq.c 50.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

L
Liu Jicong 已提交
18
int32_t tqInit() {
L
Liu Jicong 已提交
19 20 21 22 23 24
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

25 26 27 28 29 30
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
31 32 33
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
34
    atomic_store_8(&tqMgmt.inited, 1);
35
  }
36

L
Liu Jicong 已提交
37 38
  return 0;
}
L
Liu Jicong 已提交
39

40
void tqCleanUp() {
L
Liu Jicong 已提交
41 42 43 44 45 46 47 48
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
49
    streamCleanUp();
L
Liu Jicong 已提交
50 51
    atomic_store_8(&tqMgmt.inited, 0);
  }
52
}
L
Liu Jicong 已提交
53

54
static void destroyTqHandle(void* data) {
55 56 57
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
58
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
59 60 61 62
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
    tqCloseReader(pData->execHandle.pExecReader);
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
63
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
64 65 66 67 68
    walCloseReader(pData->pWalReader);
    tqCloseReader(pData->execHandle.pExecReader);
  }
}

L
Liu Jicong 已提交
69
static void tqPushEntryFree(void* data) {
L
Liu Jicong 已提交
70
  STqPushEntry* p = *(void**)data;
H
Haojun Liao 已提交
71 72 73 74 75 76 77
  if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
    tDeleteSMqDataRsp(p->pDataRsp);
  } else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) {
    tDeleteSTaosxRsp((STaosxRsp*)p->pDataRsp);
  }

  taosMemoryFree(p->pDataRsp);
L
Liu Jicong 已提交
78 79 80
  taosMemoryFree(p);
}

L
Liu Jicong 已提交
81
STQ* tqOpen(const char* path, SVnode* pVnode) {
82
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
83
  if (pTq == NULL) {
S
Shengliang Guan 已提交
84
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
85 86
    return NULL;
  }
87
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
88
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
89
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
90

91
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
92
  taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
93

94
  taosInitRWLatch(&pTq->lock);
L
Liu Jicong 已提交
95
  pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
L
Liu Jicong 已提交
96
  taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree);
L
Liu Jicong 已提交
97

98
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
99
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
100

L
Liu Jicong 已提交
101
  if (tqMetaOpen(pTq) < 0) {
L
Liu Jicong 已提交
102
    return NULL;
103 104
  }

L
Liu Jicong 已提交
105 106
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
L
Liu Jicong 已提交
107
    return NULL;
108 109
  }

110
  pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
111
  if (pTq->pStreamMeta == NULL) {
L
Liu Jicong 已提交
112
    return NULL;
L
Liu Jicong 已提交
113 114
  }

L
Liu Jicong 已提交
115
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pVnode->pWal)) < 0) {
L
Liu Jicong 已提交
116
    return NULL;
L
Liu Jicong 已提交
117 118
  }

L
Liu Jicong 已提交
119 120
  return pTq;
}
L
Liu Jicong 已提交
121

L
Liu Jicong 已提交
122
void tqClose(STQ* pTq) {
123 124
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
125
  }
126 127 128 129 130 131 132 133 134

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
135
}
L
Liu Jicong 已提交
136

L
Liu Jicong 已提交
137
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
138 139 140 141 142 143 144
  int32_t len = 0;
  int32_t code = 0;
  tEncodeSize(tEncodeSMqMetaRsp, pRsp, len, code);
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
L
Liu Jicong 已提交
145 146 147 148 149 150 151 152 153 154
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_META_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
155 156 157 158 159

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
  tEncodeSMqMetaRsp(&encoder, pRsp);
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
160 161 162 163 164 165 166 167 168

  SRpcMsg resp = {
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
  tmsgSendRsp(&resp);

169
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d",
170
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
L
Liu Jicong 已提交
171 172 173 174

  return 0;
}

H
Haojun Liao 已提交
175 176
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
177 178
  int32_t len = 0;
  int32_t code = 0;
H
Haojun Liao 已提交
179 180 181 182 183 184

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
    tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
185 186 187 188 189 190 191 192 193 194 195

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

H
Haojun Liao 已提交
196 197 198
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
199 200 201 202 203

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
H
Haojun Liao 已提交
204 205 206 207 208 209 210

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
    tEncodeSMqDataRsp(&encoder, pRsp);
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*) pRsp);
  }

L
Liu Jicong 已提交
211 212 213
  tEncoderClear(&encoder);

  SRpcMsg rsp = {
H
Haojun Liao 已提交
214
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
215 216 217 218 219 220 221 222 223
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };

  tmsgSendRsp(&rsp);
  return 0;
}

H
Haojun Liao 已提交
224 225 226
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
  SMqDataRsp* pRsp = pPushEntry->pDataRsp;

L
Liu Jicong 已提交
227 228 229
#if 0
  A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
  A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
L
Liu Jicong 已提交
230

L
Liu Jicong 已提交
231 232
  A(!pRsp->withSchema);
  A(taosArrayGetSize(pRsp->blockSchema) == 0);
L
Liu Jicong 已提交
233

234
  if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
H
Haojun Liao 已提交
235
    A(pRsp->rspOffset.version > pRsp->reqOffset.version);
L
Liu Jicong 已提交
236
  }
L
Liu Jicong 已提交
237
#endif
L
Liu Jicong 已提交
238

H
Haojun Liao 已提交
239 240
  SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
  doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType);
L
Liu Jicong 已提交
241

wmmhello's avatar
wmmhello 已提交
242 243
  char buf1[80] = {0};
  char buf2[80] = {0};
H
Haojun Liao 已提交
244 245 246 247
  tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
          TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
L
Liu Jicong 已提交
248 249 250
  return 0;
}

H
Haojun Liao 已提交
251
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type) {
L
Liu Jicong 已提交
252 253 254
#if 0
  A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
  A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
255

H
Haojun Liao 已提交
256 257
  A(!pRsp->withSchema);
  A(taosArrayGetSize(pRsp->blockSchema) == 0);
258 259 260

  if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
    if (pRsp->blockNum > 0) {
L
Liu Jicong 已提交
261
      A(pRsp->rspOffset.version > pRsp->reqOffset.version);
262
    } else {
L
Liu Jicong 已提交
263
      A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
264 265
    }
  }
L
Liu Jicong 已提交
266
#endif
H
Haojun Liao 已提交
267
  doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type);
268 269 270 271 272 273

  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);

H
Haojun Liao 已提交
274 275
  tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%"PRIx64,
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
H
Haojun Liao 已提交
276

277 278 279
  return 0;
}

280 281 282 283 284
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

285
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
286
  STqOffset offset = {0};
H
Haojun Liao 已提交
287
  int32_t vgId = TD_VID(pTq->pVnode);
288

X
Xiaoyu Wang 已提交
289 290
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
291 292 293
  if (tDecodeSTqOffset(&decoder, &offset) < 0) {
    return -1;
  }
294

295 296
  tDecoderClear(&decoder);

wmmhello's avatar
wmmhello 已提交
297
  if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
298
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
H
Haojun Liao 已提交
299
            offset.subKey, vgId, offset.val.uid, offset.val.ts);
L
Liu Jicong 已提交
300
  } else if (offset.val.type == TMQ_OFFSET__LOG) {
S
Shengliang Guan 已提交
301
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
H
Haojun Liao 已提交
302
            vgId, offset.val.version);
303
    if (offset.val.version + 1 == sversion) {
304 305
      offset.val.version += 1;
    }
306
  } else {
307 308
    tqError("invalid commit offset type:%d", offset.val.type);
    return -1;
309
  }
310 311 312 313

  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(&offset, pSavedOffset)) {
    return 0;  // no need to update the offset value
314 315
  }

316
  // save the new offset value
317 318
  if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
    return -1;
319
  }
320 321

  if (offset.val.type == TMQ_OFFSET__LOG) {
322
    STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
323 324
    if (pHandle && (walRefVer(pHandle->pRef, offset.val.version) < 0)) {
      return -1;
325 326 327
    }
  }

328 329 330
  return 0;
}

L
Liu Jicong 已提交
331
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
332
  void* pIter = NULL;
333

L
Liu Jicong 已提交
334
  while (1) {
335
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
336 337 338 339
    if (pIter == NULL) {
      break;
    }

340
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
341

L
Liu Jicong 已提交
342 343
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
344
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
345 346
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
347
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
348 349 350 351 352
          return -1;
        }
      }
    }
  }
353

L
Liu Jicong 已提交
354 355 356
  return 0;
}

L
Liu Jicong 已提交
357 358 359 360 361 362 363 364 365 366
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL) {
    return -1;
  }

L
Liu Jicong 已提交
367 368
  pRsp->withTbName = 0;
#if 0
L
Liu Jicong 已提交
369 370 371 372 373 374 375 376
  pRsp->withTbName = pReq->withTbName;
  if (pRsp->withTbName) {
    pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
    if (pRsp->blockTbName == NULL) {
      // TODO free
      return -1;
    }
  }
L
Liu Jicong 已提交
377
#endif
L
Liu Jicong 已提交
378

L
Liu Jicong 已提交
379
  pRsp->withSchema = false;
L
Liu Jicong 已提交
380 381 382
  return 0;
}

383 384 385 386 387 388 389 390 391 392 393 394 395
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->withTbName = 1;
  pRsp->withSchema = 1;
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
    return -1;
  }
396

397 398 399
  return 0;
}

400 401 402 403 404
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
  uint64_t     consumerId = pRequest->consumerId;
  STqOffsetVal reqOffset = pRequest->reqOffset;
  STqOffset*   pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
H
Haojun Liao 已提交
405 406
  int32_t      vgId = TD_VID(pTq->pVnode);

407 408 409 410 411 412 413 414
  *pBlockReturned = false;

  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
  if (pOffset != NULL) {
    *pOffsetVal = pOffset->val;

    char formatBuf[80];
    tFormatOffset(formatBuf, 80, pOffsetVal);
415
    tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.",
H
Haojun Liao 已提交
416
            consumerId, pHandle->subKey, vgId, formatBuf);
417 418 419 420 421
    return 0;
  } else {
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
    if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
      if (pRequest->useSnapshot) {
H
Haojun Liao 已提交
422 423 424
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
                consumerId, pHandle->subKey, vgId);

425 426 427 428 429 430 431 432 433 434 435
        if (pHandle->fetchMeta) {
          tqOffsetResetToMeta(pOffsetVal, 0);
        } else {
          tqOffsetResetToData(pOffsetVal, 0, 0);
        }
      } else {
        pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
        if (pHandle->pRef == NULL) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return -1;
        }
D
dapan1121 已提交
436

437 438 439 440 441 442 443 444
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer - 1);
      }
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
        SMqDataRsp dataRsp = {0};
        tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);

        tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
H
Haojun Liao 已提交
445 446
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
                pHandle->subKey, vgId, dataRsp.rspOffset.version);
H
Haojun Liao 已提交
447
        int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
448 449 450 451 452 453 454 455
        tDeleteSMqDataRsp(&dataRsp);

        *pBlockReturned = true;
        return code;
      } else {
        STaosxRsp taosxRsp = {0};
        tqInitTaosxRsp(&taosxRsp, pRequest);
        tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
H
Haojun Liao 已提交
456
        int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
457
        tDeleteSTaosxRsp(&taosxRsp);
L
Liu Jicong 已提交
458

459 460 461 462
        *pBlockReturned = true;
        return code;
      }
    } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
H
Haojun Liao 已提交
463 464
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed",
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
465 466 467
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return -1;
    }
L
Liu Jicong 已提交
468 469
  }

470 471
  return 0;
}
L
Liu Jicong 已提交
472

473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511
#define IS_OFFSET_RESET_TYPE(_t)  ((_t) < 0)

static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
  int32_t  code = 0;
  uint64_t consumerId = pRequest->consumerId;
  int32_t  vgId = TD_VID(pTq->pVnode);

  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);

  // lock
  taosWLockLatch(&pTq->lock);

  qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset);

  // till now, all data has been transferred to consumer, new data needs to push client once arrived.
  if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
      dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
    code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
    taosWUnLockLatch(&pTq->lock);
    return code;
  }

  taosWUnLockLatch(&pTq->lock);
  code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);

  // NOTE: this pHandle->consumerId may have been changed already.
  tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
          ", ts:%" PRId64 ", reqId:0x%" PRIx64,
          consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
          dataRsp.rspOffset.ts, pRequest->reqId);

  tDeleteSMqDataRsp(&dataRsp);
  return code;
}

static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
512
  int32_t      code = -1;
513
  STqOffsetVal offset = {0};
514
  SWalCkHead*  pCkHead = NULL;
515 516 517
  int32_t      vgId = TD_VID(pTq->pVnode);

  STqOffsetVal reqOffset = pRequest->reqOffset;
518
  uint64_t     consumerId = pRequest->consumerId;
L
Liu Jicong 已提交
519

520
  // 1. reset the offset if needed
521 522
  if (IS_OFFSET_RESET_TYPE(reqOffset.type)) {
    // handle the reset offset cases, according to the consumer's choice.
523
    bool blockReturned = false;
524
    code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
525 526 527 528 529 530 531
    if (code != 0) {
      return code;
    }

    // empty block returned, quit
    if (blockReturned) {
      return 0;
L
Liu Jicong 已提交
532
    }
533
  } else { // use the consumer specified offset
534
    // the offset value can not be monotonious increase??
535
    offset = reqOffset;
L
Liu Jicong 已提交
536 537
  }

538
  // this is a normal subscribe requirement
539
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
540
    return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
541 542 543 544 545 546 547 548 549 550
  } else { // for taosX
    // todo handle the case where re-balance occurs.
    SMqMetaRsp metaRsp = {0};
    STaosxRsp  taosxRsp = {0};
    tqInitTaosxRsp(&taosxRsp, pRequest);

    if (offset.type != TMQ_OFFSET__LOG) {
      if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) {
        return -1;
      }
wmmhello's avatar
wmmhello 已提交
551

552 553 554 555 556 557 558 559
      if (metaRsp.metaRspLen > 0) {
        code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
        tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
                ",ts:%" PRId64,
                consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts);
        taosMemoryFree(metaRsp.metaRsp);
        tDeleteSTaosxRsp(&taosxRsp);
        return code;
wmmhello's avatar
wmmhello 已提交
560 561
      }

562
      if (taosxRsp.blockNum > 0) {
563 564 565
        code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
        tDeleteSTaosxRsp(&taosxRsp);
        return code;
566 567
      } else {
        offset = taosxRsp.rspOffset;
wmmhello's avatar
wmmhello 已提交
568 569
      }

570 571 572 573 574 575 576 577 578 579 580 581 582
      tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
              ",version:%" PRId64,
              consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
              taosxRsp.rspOffset.version);
    } else {
      //  if (offset.type == TMQ_OFFSET__LOG) {
      int64_t fetchVer = offset.version + 1;
      pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
      if (pCkHead == NULL) {
        tDeleteSTaosxRsp(&taosxRsp);
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return -1;
      }
wmmhello's avatar
wmmhello 已提交
583

584
      walSetReaderCapacity(pHandle->pWalReader, 2048);
H
Haojun Liao 已提交
585

586 587 588 589 590 591 592 593
      while (1) {
        // todo refactor: this is not correct.
        int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
        if (savedEpoch > pRequest->epoch) {
          tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
                 ", found new consumer epoch %d, discard req epoch %d",
                 consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
          break;
wmmhello's avatar
wmmhello 已提交
594
        }
H
Haojun Liao 已提交
595

596
        if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
597
          tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
H
Haojun Liao 已提交
598
          code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
599
          tDeleteSTaosxRsp(&taosxRsp);
L
Liu Jicong 已提交
600
          taosMemoryFreeClear(pCkHead);
601
          return code;
wmmhello's avatar
wmmhello 已提交
602 603
        }

604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645
        SWalCont* pHead = &pCkHead->head;
        tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d",
                consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);

        if (pHead->msgType == TDMT_VND_SUBMIT) {
          SPackedData submit = {
              .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
              .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
              .ver = pHead->version,
          };

          if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
            tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId,
                    pRequest->subKey);
            return -1;
          }

          if (taosxRsp.blockNum > 0) {
            tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
            code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
            tDeleteSTaosxRsp(&taosxRsp);
            taosMemoryFreeClear(pCkHead);
            return code;
          } else {
            fetchVer++;
          }

        } else {
          /*A(pHandle->fetchMeta);*/
          /*A(IS_META_MSG(pHead->msgType));*/
          tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
          tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
          metaRsp.resMsgType = pHead->msgType;
          metaRsp.metaRspLen = pHead->bodyLen;
          metaRsp.metaRsp = pHead->body;
          if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) {
            code = -1;
            taosMemoryFreeClear(pCkHead);
            tDeleteSTaosxRsp(&taosxRsp);
            return code;
          }
          code = 0;
L
Liu Jicong 已提交
646
          taosMemoryFreeClear(pCkHead);
647
          tDeleteSTaosxRsp(&taosxRsp);
648
          return code;
wmmhello's avatar
wmmhello 已提交
649 650 651
        }
      }
    }
652

653 654 655 656
    tDeleteSTaosxRsp(&taosxRsp);
    taosMemoryFreeClear(pCkHead);
    return 0;
  }
L
Liu Jicong 已提交
657 658
}

659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq   req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

680
  // 2. check re-balance status
681
  taosRLockLatch(&pTq->lock);
682 683 684 685
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
686
    taosRUnLockLatch(&pTq->lock);
687 688
    return -1;
  }
689
  taosRUnLockLatch(&pTq->lock);
690

691
  // 3. update the epoch value
692
  taosWLockLatch(&pTq->lock);
H
Haojun Liao 已提交
693 694
  int32_t savedEpoch = pHandle->epoch;
  if (savedEpoch < reqEpoch) {
695 696
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
    pHandle->epoch = reqEpoch;
H
Haojun Liao 已提交
697
  }
698
  taosWUnLockLatch(&pTq->lock);
699 700 701

  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
H
Haojun Liao 已提交
702 703
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
704

705
  return doPollDataForMq(pTq, pHandle, &req, pMsg);
706 707
}

708
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
709
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
L
Liu Jicong 已提交
710

L
Liu Jicong 已提交
711
  tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
L
Liu Jicong 已提交
712

713
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
714 715 716 717
  int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
  if (code != 0) {
    tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
  }
718
  taosWUnLockLatch(&pTq->lock);
L
Liu Jicong 已提交
719

L
Liu Jicong 已提交
720 721
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
X
Xiaoyu Wang 已提交
722
    // walCloseRef(pHandle->pWalReader->pWal, pHandle->pRef->refId);
L
Liu Jicong 已提交
723 724 725 726 727 728 729
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
730
  }
731

L
Liu Jicong 已提交
732 733
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
734
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
735
  }
L
Liu Jicong 已提交
736

L
Liu Jicong 已提交
737
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
738
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
739
  }
L
Liu Jicong 已提交
740
  return 0;
L
Liu Jicong 已提交
741 742
}

743
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
744 745
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
746
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
747
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
748 749 750 751
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
752 753 754 755 756
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
757 758 759 760 761 762
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

763
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
764 765 766 767 768 769 770 771 772 773 774
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

775
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
776
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
777
  tDecodeSMqRebVgReq(msg, &req);
L
Liu Jicong 已提交
778

779 780 781
  SVnode* pVnode = pTq->pVnode;
  int32_t vgId = TD_VID(pVnode);

782
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
783
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
784

785
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
786
  if (pHandle == NULL) {
L
Liu Jicong 已提交
787
    if (req.oldConsumerId != -1) {
788
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
789
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
790
    }
791

L
Liu Jicong 已提交
792
    if (req.newConsumerId == -1) {
793
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
L
Liu Jicong 已提交
794
      taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
795 796
      return 0;
    }
797

L
Liu Jicong 已提交
798 799
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
800 801
    /*taosInitRWLatch(&pExec->lock);*/

H
Haojun Liao 已提交
802
    uint64_t oldConsumerId = pHandle->consumerId;
L
Liu Jicong 已提交
803 804 805
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
806

L
Liu Jicong 已提交
807
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
808
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
809

810
    // TODO version should be assigned and refed during preprocess
811
    SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
812
    if (pRef == NULL) {
H
Haojun Liao 已提交
813
      taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
814
      return -1;
815
    }
H
Haojun Liao 已提交
816

817 818
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
819

820
    SReadHandle handle = {
821 822
        .meta = pVnode->pMeta,
        .vnode = pVnode,
823 824 825 826
        .initTableReader = true,
        .initTqReader = true,
        .version = ver,
    };
827

wmmhello's avatar
wmmhello 已提交
828
    pHandle->snapshotVer = ver;
829

L
Liu Jicong 已提交
830
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
831
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
832
      req.qmsg = NULL;
833 834

      pHandle->execHandle.task =
835
          qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, NULL);
L
Liu Jicong 已提交
836
      void* scanner = NULL;
837
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
L
Liu Jicong 已提交
838
      pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
L
Liu Jicong 已提交
839
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
840 841
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
      pHandle->execHandle.pExecReader = tqOpenReader(pVnode);
842

L
Liu Jicong 已提交
843
      pHandle->execHandle.execDb.pFilterOutTbUid =
L
Liu Jicong 已提交
844
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
845 846
      buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
847

848
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
L
Liu Jicong 已提交
849
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
850
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
851 852
      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
853
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
854 855
      vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
856 857
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
858
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
L
Liu Jicong 已提交
859
      }
860
      pHandle->execHandle.pExecReader = tqOpenReader(pVnode);
L
Liu Jicong 已提交
861
      tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
L
Liu Jicong 已提交
862
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
863

L
Liu Jicong 已提交
864 865
      buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
866
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
L
Liu Jicong 已提交
867
    }
H
Haojun Liao 已提交
868

869
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
870 871
    tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
            pHandle->consumerId, oldConsumerId);
L
Liu Jicong 已提交
872
    if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
H
Haojun Liao 已提交
873
      taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
874
      return -1;
L
Liu Jicong 已提交
875
    }
L
Liu Jicong 已提交
876
  } else {
877 878 879 880
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
      atomic_store_32(&pHandle->epoch, -1);
      atomic_add_fetch_32(&pHandle->epoch, 1);
H
Haojun Liao 已提交
881
      taosMemoryFree(req.qmsg);
882
      return tqMetaSaveHandle(pTq, req.subKey, pHandle);
883 884 885
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
886

887 888
      taosWLockLatch(&pTq->lock);
      atomic_store_32(&pHandle->epoch, -1);
889

890 891
      // remove if it has been register in the push manager, and return one empty block to consumer
      tqRemovePushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
892

893 894
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
      atomic_add_fetch_32(&pHandle->epoch, 1);
895

896 897 898
      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
        qStreamCloseTsdbReader(pHandle->execHandle.task);
      }
H
Haojun Liao 已提交
899

900 901 902 903 904
      taosWUnLockLatch(&pTq->lock);
      if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
        taosMemoryFree(req.qmsg);
        return -1;
      }
L
Liu Jicong 已提交
905
    }
L
Liu Jicong 已提交
906
  }
L
Liu Jicong 已提交
907

H
Haojun Liao 已提交
908
  taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
909
  return 0;
L
Liu Jicong 已提交
910
}
911

912
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
L
Liu Jicong 已提交
913
#if 0
914
  if (pTask->taskLevel == TASK_LEVEL__AGG) {
L
Liu Jicong 已提交
915
    A(taosArrayGetSize(pTask->childEpInfo) != 0);
L
Liu Jicong 已提交
916
  }
L
Liu Jicong 已提交
917
#endif
L
Liu Jicong 已提交
918

919
  int32_t vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
920
  pTask->refCnt = 1;
L
Liu Jicong 已提交
921
  pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
L
Liu Jicong 已提交
922 923 924

  pTask->inputQueue = streamQueueOpen();
  pTask->outputQueue = streamQueueOpen();
L
Liu Jicong 已提交
925 926

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
927
    return -1;
L
Liu Jicong 已提交
928 929
  }

L
Liu Jicong 已提交
930 931
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
932
  pTask->pMsgCb = &pTq->pVnode->msgCb;
933 934
  pTask->startVer = ver;

935
  // expand executor
936 937 938 939
  if (pTask->fillHistory) {
    pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
  }

940
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
941
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
942 943 944 945
    if (pTask->pState == NULL) {
      return -1;
    }

946 947 948 949
    SReadHandle handle = {
        .meta = pTq->pVnode->pMeta,
        .vnode = pTq->pVnode,
        .initTqReader = 1,
950
        .pStateBackend = pTask->pState,
951
    };
952 953

    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
L
Liu Jicong 已提交
954 955 956
    if (pTask->exec.executor == NULL) {
      return -1;
    }
957

958
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
959
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
960 961 962
    if (pTask->pState == NULL) {
      return -1;
    }
963 964 965
    SReadHandle mgHandle = {
        .vnode = NULL,
        .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
966
        .pStateBackend = pTask->pState,
967
    };
968 969

    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId);
L
Liu Jicong 已提交
970 971 972
    if (pTask->exec.executor == NULL) {
      return -1;
    }
L
Liu Jicong 已提交
973
  }
L
Liu Jicong 已提交
974 975

  // sink
L
Liu Jicong 已提交
976
  /*pTask->ahandle = pTq->pVnode;*/
977
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
978
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
979
    pTask->smaSink.smaSink = smaHandleRes;
980
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
981
    pTask->tbSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
982
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
L
Liu Jicong 已提交
983

H
Haojun Liao 已提交
984
    int32_t ver1 = 1;
5
54liuyao 已提交
985 986 987
    SMetaInfo info = {0};
    int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
    if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
988
      ver1 = info.skmVer;
5
54liuyao 已提交
989
    }
L
Liu Jicong 已提交
990

L
Liu Jicong 已提交
991
    pTask->tbSink.pTSchema =
H
Haojun Liao 已提交
992
        tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, ver1);
wmmhello's avatar
wmmhello 已提交
993
    if(pTask->tbSink.pTSchema == NULL) {
wmmhello's avatar
wmmhello 已提交
994
      return -1;
wmmhello's avatar
wmmhello 已提交
995
    }
L
Liu Jicong 已提交
996
  }
997 998

  streamSetupTrigger(pTask);
999
  tqInfo("expand stream task on vg %d, task id %d, child id %d, level %d", vgId, pTask->taskId, pTask->selfChildId, pTask->taskLevel);
L
Liu Jicong 已提交
1000
  return 0;
L
Liu Jicong 已提交
1001
}
L
Liu Jicong 已提交
1002

1003 1004 1005 1006 1007 1008
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
  char*               msgStr = pMsg->pCont;
  char*               msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t             msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamTaskCheckReq req;
  SDecoder            decoder;
X
Xiaoyu Wang 已提交
1009
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021
  tDecodeSStreamTaskCheckReq(&decoder, &req);
  tDecoderClear(&decoder);
  int32_t             taskId = req.downstreamTaskId;
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
L
Liu Jicong 已提交
1022
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1023 1024 1025 1026 1027 1028
  if (pTask && atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
    rsp.status = 1;
  } else {
    rsp.status = 0;
  }

L
Liu Jicong 已提交
1029
  if (pTask) streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1030

1031
  tqDebug("tq recv task check req(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
1032 1033 1034 1035 1036 1037 1038
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
  tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
  if (code < 0) {
L
Liu Jicong 已提交
1039
    tqError("unable to encode rsp %d", __LINE__);
L
Liu Jicong 已提交
1040
    return -1;
1041
  }
L
Liu Jicong 已提交
1042

1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
  tEncoderClear(&encoder);

  SRpcMsg rspMsg = {
      .code = 0,
      .pCont = buf,
      .contLen = sizeof(SMsgHead) + len,
      .info = pMsg->info,
  };

  tmsgSendRsp(&rspMsg);
  return 0;
}

1062
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }
  tDecoderClear(&decoder);

1075
  tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
1076 1077
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

L
Liu Jicong 已提交
1078
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
1079 1080 1081 1082
  if (pTask == NULL) {
    return -1;
  }

1083
  code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
L
Liu Jicong 已提交
1084 1085
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
1086 1087
}

1088
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1089 1090 1091 1092 1093
  int32_t code;
#if 0
  code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
  if (code < 0) return code;
#endif
5
54liuyao 已提交
1094 1095 1096
  if (tsDisableStream) {
    return 0;
  }
1097 1098 1099 1100 1101 1102

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
1103

1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTask(&decoder, pTask);
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
  tDecoderClear(&decoder);

  // 2.save task
1115
  code = streamMetaAddTask(pTq->pStreamMeta, sversion, pTask);
1116 1117 1118 1119 1120 1121
  if (code < 0) {
    return -1;
  }

  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
1122
    streamTaskCheckDownstream(pTask, sversion);
1123 1124 1125 1126 1127
  }

  return 0;
}

L
Liu Jicong 已提交
1128 1129 1130 1131 1132
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
  int32_t code;
  char*   msg = pMsg->pCont;
  int32_t msgLen = pMsg->contLen;

1133
  SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
L
Liu Jicong 已提交
1134
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1135 1136 1137 1138 1139 1140 1141
  if (pTask == NULL) {
    return -1;
  }

  // check param
  int64_t fillVer1 = pTask->startVer;
  if (fillVer1 <= 0) {
L
Liu Jicong 已提交
1142
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1143 1144 1145 1146 1147 1148
    return -1;
  }

  // do recovery step 1
  streamSourceRecoverScanStep1(pTask);

L
Liu Jicong 已提交
1149 1150 1151 1152 1153
  if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1154 1155 1156 1157
  // build msg to launch next step
  SStreamRecoverStep2Req req;
  code = streamBuildSourceRecover2Req(pTask, &req);
  if (code < 0) {
L
Liu Jicong 已提交
1158
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1159 1160 1161
    return -1;
  }

L
Liu Jicong 已提交
1162
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1163

L
Liu Jicong 已提交
1164 1165 1166 1167
  if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
    return 0;
  }

1168
  // serialize msg
L
Liu Jicong 已提交
1169 1170 1171 1172 1173 1174 1175 1176
  int32_t len = sizeof(SStreamRecoverStep1Req);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);
1177 1178 1179 1180 1181

  // dispatch msg
  SRpcMsg rpcMsg = {
      .code = 0,
      .contLen = len,
L
Liu Jicong 已提交
1182
      .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE,
L
Liu Jicong 已提交
1183
      .pCont = serializedReq,
1184 1185 1186 1187 1188 1189 1190
  };

  tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);

  return 0;
}

1191
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1192 1193
  int32_t                 code;
  SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
L
Liu Jicong 已提交
1194
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1195 1196 1197 1198 1199
  if (pTask == NULL) {
    return -1;
  }

  // do recovery step 2
1200
  code = streamSourceRecoverScanStep2(pTask, sversion);
1201
  if (code < 0) {
L
Liu Jicong 已提交
1202
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1203 1204 1205
    return -1;
  }

L
Liu Jicong 已提交
1206 1207 1208 1209 1210
  if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1211 1212 1213
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1214
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1215 1216 1217 1218 1219 1220
    return -1;
  }

  // set status normal
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1221
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1222 1223 1224 1225 1226 1227
    return -1;
  }

  // dispatch recover finish req to all related downstream task
  code = streamDispatchRecoverFinishReq(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1228
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1229 1230 1231
    return -1;
  }

L
Liu Jicong 已提交
1232 1233 1234
  atomic_store_8(&pTask->fillHistory, 0);
  streamMetaSaveTask(pTq->pStreamMeta, pTask);

L
Liu Jicong 已提交
1235 1236
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);

1237 1238 1239
  return 0;
}

L
Liu Jicong 已提交
1240 1241 1242
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1243 1244

  // deserialize
1245 1246 1247
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1248
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1249 1250 1251
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

1252
  // find task
L
Liu Jicong 已提交
1253
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1254 1255 1256
  if (pTask == NULL) {
    return -1;
  }
1257
  // do process request
1258
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
1259
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1260 1261 1262
    return -1;
  }

L
Liu Jicong 已提交
1263
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1264
  return 0;
L
Liu Jicong 已提交
1265
}
L
Liu Jicong 已提交
1266

L
Liu Jicong 已提交
1267 1268 1269 1270 1271
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

L
Liu Jicong 已提交
1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
  bool        failed = false;
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    failed = true;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t sz = taosArrayGetSize(pRes->uidList);
L
Liu Jicong 已提交
1288
  if (sz == 0 || pRes->affectedRows == 0) {
L
Liu Jicong 已提交
1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299
    taosArrayDestroy(pRes->uidList);
    return 0;
  }
  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, sz);
  pDelBlock->info.rows = sz;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < sz; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
1300
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
L
Liu Jicong 已提交
1301
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
1302
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
L
Liu Jicong 已提交
1303 1304 1305
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
1306
    colDataSetVal(pUidCol, i, (const char*)pUid, false);
L
Liu Jicong 已提交
1307

1308 1309 1310
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
L
Liu Jicong 已提交
1311 1312
  }

L
Liu Jicong 已提交
1313 1314
  taosArrayDestroy(pRes->uidList);

L
Liu Jicong 已提交
1315 1316 1317
  int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
  *pRef = 1;

L
Liu Jicong 已提交
1318 1319 1320 1321 1322 1323 1324 1325 1326
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
    if (pIter == NULL) break;
    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;

    qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);

L
Liu Jicong 已提交
1327
    if (!failed) {
S
Shengliang Guan 已提交
1328
      SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1329 1330 1331 1332 1333 1334 1335
      pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
      pRefBlock->pBlock = pDelBlock;
      pRefBlock->dataRef = pRef;
      atomic_add_fetch_32(pRefBlock->dataRef, 1);

      if (streamTaskInput(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
        qError("stream task input del failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
1336

L
Liu Jicong 已提交
1337
        atomic_sub_fetch_32(pRef, 1);
L
Liu Jicong 已提交
1338
        taosFreeQitem(pRefBlock);
L
Liu Jicong 已提交
1339 1340
        continue;
      }
L
Liu Jicong 已提交
1341

L
Liu Jicong 已提交
1342 1343 1344 1345
      if (streamSchedExec(pTask) < 0) {
        qError("stream task launch failed, task id %d", pTask->taskId);
        continue;
      }
L
Liu Jicong 已提交
1346

L
Liu Jicong 已提交
1347 1348 1349 1350
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1351

L
Liu Jicong 已提交
1352
  int32_t ref = atomic_sub_fetch_32(pRef, 1);
L
Liu Jicong 已提交
1353
  /*A(ref >= 0);*/
L
Liu Jicong 已提交
1354
  if (ref == 0) {
L
Liu Jicong 已提交
1355
    blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1356 1357 1358 1359
    taosMemoryFree(pRef);
  }

#if 0
S
Shengliang Guan 已提交
1360
    SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381
    pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
    pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
    SSDataBlock block = {0};
    assignOneDataBlock(&block, pDelBlock);
    block.info.type = STREAM_DELETE_DATA;
    taosArrayPush(pStreamBlock->blocks, &block);

    if (!failed) {
      if (streamTaskInput(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
        qError("stream task input del failed, task id %d", pTask->taskId);
        continue;
      }

      if (streamSchedExec(pTask) < 0) {
        qError("stream task launch failed, task id %d", pTask->taskId);
        continue;
      }
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1382
  blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1383
#endif
L
Liu Jicong 已提交
1384 1385 1386 1387

  return 0;
}

L
Liu Jicong 已提交
1388
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
L
Liu Jicong 已提交
1389 1390 1391
  void*               pIter = NULL;
  bool                failed = false;
  SStreamDataSubmit2* pSubmit = NULL;
L
Liu Jicong 已提交
1392

L
Liu Jicong 已提交
1393
  pSubmit = streamDataSubmitNew(submit);
L
Liu Jicong 已提交
1394
  if (pSubmit == NULL) {
L
Liu Jicong 已提交
1395
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1396
    tqError("failed to create data submit for stream since out of memory");
L
Liu Jicong 已提交
1397 1398 1399 1400
    failed = true;
  }

  while (1) {
L
Liu Jicong 已提交
1401
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
1402 1403 1404 1405
    if (pIter == NULL) {
      break;
    }

1406
    SStreamTask* pTask = *(SStreamTask**)pIter;
1407
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
1408
    if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
L
Liu Jicong 已提交
1409 1410 1411
      tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus);
      continue;
    }
L
Liu Jicong 已提交
1412

L
Liu Jicong 已提交
1413
    tqDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, submit.ver);
L
Liu Jicong 已提交
1414

L
Liu Jicong 已提交
1415 1416
    if (!failed) {
      if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
L
Liu Jicong 已提交
1417
        tqError("stream task input failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
1418 1419 1420
        continue;
      }

L
Liu Jicong 已提交
1421
      if (streamSchedExec(pTask) < 0) {
L
Liu Jicong 已提交
1422
        tqError("stream task launch failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
1423 1424
        continue;
      }
L
Liu Jicong 已提交
1425
    } else {
L
Liu Jicong 已提交
1426
      streamTaskInputFail(pTask);
L
Liu Jicong 已提交
1427 1428 1429
    }
  }

L
Liu Jicong 已提交
1430
  if (pSubmit) {
L
Liu Jicong 已提交
1431
    streamDataSubmitRefDec(pSubmit);
L
Liu Jicong 已提交
1432
    taosFreeQitem(pSubmit);
L
Liu Jicong 已提交
1433
  }
L
Liu Jicong 已提交
1434 1435

  return failed ? -1 : 0;
L
Liu Jicong 已提交
1436 1437
}

L
Liu Jicong 已提交
1438 1439 1440
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
  int32_t            taskId = pReq->taskId;
L
Liu Jicong 已提交
1441
  SStreamTask*       pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1442 1443
  if (pTask) {
    streamProcessRunReq(pTask);
L
Liu Jicong 已提交
1444
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1445
    return 0;
1446 1447
  } else {
    return -1;
L
Liu Jicong 已提交
1448
  }
L
Liu Jicong 已提交
1449 1450
}

L
Liu Jicong 已提交
1451
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
1452 1453 1454 1455 1456
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamDispatchReq req;
  SDecoder           decoder;
L
Liu Jicong 已提交
1457
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1458
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1459 1460
  int32_t taskId = req.taskId;

L
Liu Jicong 已提交
1461
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1462
  if (pTask) {
1463 1464 1465 1466
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1467
    streamProcessDispatchReq(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1468
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1469
    return 0;
1470 1471
  } else {
    return -1;
L
Liu Jicong 已提交
1472
  }
L
Liu Jicong 已提交
1473 1474
}

L
Liu Jicong 已提交
1475 1476
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1477
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1478
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1479
  tqDebug("recv dispatch rsp, code: %x", pMsg->code);
L
Liu Jicong 已提交
1480
  if (pTask) {
1481
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1482
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1483
    return 0;
1484 1485
  } else {
    return -1;
L
Liu Jicong 已提交
1486
  }
L
Liu Jicong 已提交
1487
}
L
Liu Jicong 已提交
1488

1489
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1490
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1491
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1492
  return 0;
L
Liu Jicong 已提交
1493
}
L
Liu Jicong 已提交
1494 1495 1496 1497 1498 1499 1500

int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1501
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1502
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1503
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1504
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1505
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1506
  if (pTask) {
L
Liu Jicong 已提交
1507 1508 1509 1510
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1511
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1512
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1513
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1514
    return 0;
L
Liu Jicong 已提交
1515 1516
  } else {
    return -1;
L
Liu Jicong 已提交
1517 1518 1519 1520 1521 1522 1523
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1524

1525 1526 1527 1528 1529 1530
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1531 1532 1533

  SStreamDispatchReq req;
  SDecoder           decoder;
1534
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1535 1536
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1537
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1538 1539
    goto FAIL;
  }
L
Liu Jicong 已提交
1540
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1541

L
Liu Jicong 已提交
1542
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1543

L
Liu Jicong 已提交
1544
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1545
  if (pTask) {
L
Liu Jicong 已提交
1546 1547 1548 1549
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1550
    streamProcessDispatchReq(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1551
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1552 1553
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1554
    return 0;
L
Liu Jicong 已提交
1555
  }
L
Liu Jicong 已提交
1556

1557 1558
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1559
FAIL:
1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
    SRpcMsg rsp = {
        .code = TSDB_CODE_OUT_OF_MEMORY,
        .info = pMsg->info,
    };
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1584 1585 1586
  SRpcMsg rsp = {
      .code = code,
      .info = pMsg->info,
1587 1588
      .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp),
      .pCont = pRspHead,
L
Liu Jicong 已提交
1589
  };
1590
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1591
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1592 1593
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1594
  return -1;
L
Liu Jicong 已提交
1595
}
L
Liu Jicong 已提交
1596

1597
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }