tq.c 50.3 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

L
Liu Jicong 已提交
18
int32_t tqInit() {
L
Liu Jicong 已提交
19 20 21 22 23 24
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

25 26 27 28 29 30
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
31 32 33
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
34
    atomic_store_8(&tqMgmt.inited, 1);
35
  }
36

L
Liu Jicong 已提交
37 38
  return 0;
}
L
Liu Jicong 已提交
39

40
void tqCleanUp() {
L
Liu Jicong 已提交
41 42 43 44 45 46 47 48
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
49
    streamCleanUp();
L
Liu Jicong 已提交
50 51
    atomic_store_8(&tqMgmt.inited, 0);
  }
52
}
L
Liu Jicong 已提交
53

54
static void destroyTqHandle(void* data) {
55 56 57
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
58
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
59 60 61 62
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
    tqCloseReader(pData->execHandle.pExecReader);
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
63
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
64 65 66 67 68
    walCloseReader(pData->pWalReader);
    tqCloseReader(pData->execHandle.pExecReader);
  }
}

L
Liu Jicong 已提交
69
static void tqPushEntryFree(void* data) {
L
Liu Jicong 已提交
70
  STqPushEntry* p = *(void**)data;
H
Haojun Liao 已提交
71 72 73 74 75 76 77
  if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
    tDeleteSMqDataRsp(p->pDataRsp);
  } else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) {
    tDeleteSTaosxRsp((STaosxRsp*)p->pDataRsp);
  }

  taosMemoryFree(p->pDataRsp);
L
Liu Jicong 已提交
78 79 80
  taosMemoryFree(p);
}

L
Liu Jicong 已提交
81
STQ* tqOpen(const char* path, SVnode* pVnode) {
82
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
83
  if (pTq == NULL) {
S
Shengliang Guan 已提交
84
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
85 86
    return NULL;
  }
87
  pTq->path = taosStrdup(path);
L
Liu Jicong 已提交
88
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
89
  pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
90

91
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
92
  taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
93

94
  taosInitRWLatch(&pTq->lock);
L
Liu Jicong 已提交
95
  pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
L
Liu Jicong 已提交
96
  taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree);
L
Liu Jicong 已提交
97

98
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
99
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
L
Liu Jicong 已提交
100

L
Liu Jicong 已提交
101
  if (tqMetaOpen(pTq) < 0) {
L
Liu Jicong 已提交
102
    return NULL;
103 104
  }

L
Liu Jicong 已提交
105 106
  pTq->pOffsetStore = tqOffsetOpen(pTq);
  if (pTq->pOffsetStore == NULL) {
L
Liu Jicong 已提交
107
    return NULL;
108 109
  }

110
  pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
L
Liu Jicong 已提交
111
  if (pTq->pStreamMeta == NULL) {
L
Liu Jicong 已提交
112
    return NULL;
L
Liu Jicong 已提交
113 114
  }

L
Liu Jicong 已提交
115
  if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pVnode->pWal)) < 0) {
L
Liu Jicong 已提交
116
    return NULL;
L
Liu Jicong 已提交
117 118
  }

L
Liu Jicong 已提交
119 120
  return pTq;
}
L
Liu Jicong 已提交
121

L
Liu Jicong 已提交
122
void tqClose(STQ* pTq) {
123 124
  if (pTq == NULL) {
    return;
H
Hongze Cheng 已提交
125
  }
126 127 128 129 130 131 132 133 134

  tqOffsetClose(pTq->pOffsetStore);
  taosHashCleanup(pTq->pHandle);
  taosHashCleanup(pTq->pPushMgr);
  taosHashCleanup(pTq->pCheckInfo);
  taosMemoryFree(pTq->path);
  tqMetaClose(pTq);
  streamMetaClose(pTq->pStreamMeta);
  taosMemoryFree(pTq);
L
Liu Jicong 已提交
135
}
L
Liu Jicong 已提交
136

L
Liu Jicong 已提交
137
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
138 139 140 141 142 143 144
  int32_t len = 0;
  int32_t code = 0;
  tEncodeSize(tEncodeSMqMetaRsp, pRsp, len, code);
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
L
Liu Jicong 已提交
145 146 147 148 149 150 151 152 153 154
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_META_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
155 156 157 158 159

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
  tEncodeSMqMetaRsp(&encoder, pRsp);
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
160 161 162 163 164 165 166 167 168

  SRpcMsg resp = {
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
  tmsgSendRsp(&resp);

169
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d",
170
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
L
Liu Jicong 已提交
171 172 173 174

  return 0;
}

H
Haojun Liao 已提交
175 176
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
                             int64_t consumerId, int32_t type) {
L
Liu Jicong 已提交
177 178
  int32_t len = 0;
  int32_t code = 0;
H
Haojun Liao 已提交
179 180 181 182 183 184

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
    tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }
L
Liu Jicong 已提交
185 186 187 188 189 190 191 192 193 194 195

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

H
Haojun Liao 已提交
196 197 198
  ((SMqRspHead*)buf)->mqMsgType = type;
  ((SMqRspHead*)buf)->epoch = epoch;
  ((SMqRspHead*)buf)->consumerId = consumerId;
L
Liu Jicong 已提交
199 200 201 202 203

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
H
Haojun Liao 已提交
204 205 206 207 208 209 210

  if (type == TMQ_MSG_TYPE__POLL_RSP) {
    tEncodeSMqDataRsp(&encoder, pRsp);
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*) pRsp);
  }

L
Liu Jicong 已提交
211 212 213
  tEncoderClear(&encoder);

  SRpcMsg rsp = {
H
Haojun Liao 已提交
214
      .info = *pRpcHandleInfo,
L
Liu Jicong 已提交
215 216 217 218 219 220 221 222 223
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };

  tmsgSendRsp(&rsp);
  return 0;
}

H
Haojun Liao 已提交
224 225 226
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
  SMqDataRsp* pRsp = pPushEntry->pDataRsp;

L
Liu Jicong 已提交
227 228 229
#if 0
  A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
  A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
L
Liu Jicong 已提交
230

L
Liu Jicong 已提交
231 232
  A(!pRsp->withSchema);
  A(taosArrayGetSize(pRsp->blockSchema) == 0);
L
Liu Jicong 已提交
233

234
  if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
H
Haojun Liao 已提交
235
    A(pRsp->rspOffset.version > pRsp->reqOffset.version);
L
Liu Jicong 已提交
236
  }
L
Liu Jicong 已提交
237
#endif
L
Liu Jicong 已提交
238

H
Haojun Liao 已提交
239 240
  SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
  doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType);
L
Liu Jicong 已提交
241

wmmhello's avatar
wmmhello 已提交
242 243
  char buf1[80] = {0};
  char buf2[80] = {0};
H
Haojun Liao 已提交
244 245 246 247
  tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset);
  tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
          TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
L
Liu Jicong 已提交
248 249 250
  return 0;
}

H
Haojun Liao 已提交
251
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type) {
L
Liu Jicong 已提交
252 253 254
#if 0
  A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
  A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
255

H
Haojun Liao 已提交
256 257
  A(!pRsp->withSchema);
  A(taosArrayGetSize(pRsp->blockSchema) == 0);
258 259 260

  if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
    if (pRsp->blockNum > 0) {
L
Liu Jicong 已提交
261
      A(pRsp->rspOffset.version > pRsp->reqOffset.version);
262
    } else {
L
Liu Jicong 已提交
263
      A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
264 265
    }
  }
L
Liu Jicong 已提交
266
#endif
H
Haojun Liao 已提交
267
  doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type);
268 269 270 271 272 273

  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);

H
Haojun Liao 已提交
274 275
  tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%"PRIx64,
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
H
Haojun Liao 已提交
276

277 278 279
  return 0;
}

280 281 282 283 284
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

285
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
286
  STqOffset offset = {0};
H
Haojun Liao 已提交
287
  int32_t vgId = TD_VID(pTq->pVnode);
288

X
Xiaoyu Wang 已提交
289 290
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
291 292 293
  if (tDecodeSTqOffset(&decoder, &offset) < 0) {
    return -1;
  }
294

295 296
  tDecoderClear(&decoder);

wmmhello's avatar
wmmhello 已提交
297
  if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
298
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
H
Haojun Liao 已提交
299
            offset.subKey, vgId, offset.val.uid, offset.val.ts);
L
Liu Jicong 已提交
300
  } else if (offset.val.type == TMQ_OFFSET__LOG) {
S
Shengliang Guan 已提交
301
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
H
Haojun Liao 已提交
302
            vgId, offset.val.version);
303
    if (offset.val.version + 1 == sversion) {
304 305
      offset.val.version += 1;
    }
306
  } else {
307 308
    tqError("invalid commit offset type:%d", offset.val.type);
    return -1;
309
  }
310 311 312 313

  STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
  if (pSavedOffset != NULL && tqOffsetLessOrEqual(&offset, pSavedOffset)) {
    return 0;  // no need to update the offset value
314 315
  }

316
  // save the new offset value
317 318
  if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
    return -1;
319
  }
320 321

  if (offset.val.type == TMQ_OFFSET__LOG) {
322
    STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
323 324
    if (pHandle && (walRefVer(pHandle->pRef, offset.val.version) < 0)) {
      return -1;
325 326 327
    }
  }

328 329 330
  return 0;
}

L
Liu Jicong 已提交
331
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
332
  void* pIter = NULL;
333

L
Liu Jicong 已提交
334
  while (1) {
335
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
336 337 338 339
    if (pIter == NULL) {
      break;
    }

340
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
341

L
Liu Jicong 已提交
342 343
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
344
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
345 346
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
347
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
348 349 350 351 352
          return -1;
        }
      }
    }
  }
353

L
Liu Jicong 已提交
354 355 356
  return 0;
}

L
Liu Jicong 已提交
357 358 359 360 361 362 363 364 365 366
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL) {
    return -1;
  }

L
Liu Jicong 已提交
367 368
  pRsp->withTbName = 0;
#if 0
L
Liu Jicong 已提交
369 370 371 372 373 374 375 376
  pRsp->withTbName = pReq->withTbName;
  if (pRsp->withTbName) {
    pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
    if (pRsp->blockTbName == NULL) {
      // TODO free
      return -1;
    }
  }
L
Liu Jicong 已提交
377
#endif
L
Liu Jicong 已提交
378

L
Liu Jicong 已提交
379
  pRsp->withSchema = false;
L
Liu Jicong 已提交
380 381 382
  return 0;
}

383 384 385 386 387 388 389 390 391 392 393 394 395
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->withTbName = 1;
  pRsp->withSchema = 1;
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
    return -1;
  }
396

397 398 399
  return 0;
}

400 401 402 403 404
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
  uint64_t     consumerId = pRequest->consumerId;
  STqOffsetVal reqOffset = pRequest->reqOffset;
  STqOffset*   pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
H
Haojun Liao 已提交
405 406
  int32_t      vgId = TD_VID(pTq->pVnode);

407 408 409 410 411 412 413 414
  *pBlockReturned = false;

  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
  if (pOffset != NULL) {
    *pOffsetVal = pOffset->val;

    char formatBuf[80];
    tFormatOffset(formatBuf, 80, pOffsetVal);
415
    tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.",
H
Haojun Liao 已提交
416
            consumerId, pHandle->subKey, vgId, formatBuf);
417 418 419 420 421
    return 0;
  } else {
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
    if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
      if (pRequest->useSnapshot) {
H
Haojun Liao 已提交
422 423 424
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
                consumerId, pHandle->subKey, vgId);

425 426 427 428 429 430 431 432 433 434 435
        if (pHandle->fetchMeta) {
          tqOffsetResetToMeta(pOffsetVal, 0);
        } else {
          tqOffsetResetToData(pOffsetVal, 0, 0);
        }
      } else {
        pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
        if (pHandle->pRef == NULL) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return -1;
        }
D
dapan1121 已提交
436

437 438 439 440 441 442 443 444
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer - 1);
      }
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
        SMqDataRsp dataRsp = {0};
        tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);

        tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
H
Haojun Liao 已提交
445 446
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
                pHandle->subKey, vgId, dataRsp.rspOffset.version);
H
Haojun Liao 已提交
447
        int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
448 449 450 451 452 453 454 455
        tDeleteSMqDataRsp(&dataRsp);

        *pBlockReturned = true;
        return code;
      } else {
        STaosxRsp taosxRsp = {0};
        tqInitTaosxRsp(&taosxRsp, pRequest);
        tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
H
Haojun Liao 已提交
456
        int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
457
        tDeleteSTaosxRsp(&taosxRsp);
L
Liu Jicong 已提交
458

459 460 461 462
        *pBlockReturned = true;
        return code;
      }
    } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
H
Haojun Liao 已提交
463 464
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed",
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
465 466 467
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return -1;
    }
L
Liu Jicong 已提交
468 469
  }

470 471
  return 0;
}
L
Liu Jicong 已提交
472

473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511
#define IS_OFFSET_RESET_TYPE(_t)  ((_t) < 0)

static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
  int32_t  code = 0;
  uint64_t consumerId = pRequest->consumerId;
  int32_t  vgId = TD_VID(pTq->pVnode);

  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);

  // lock
  taosWLockLatch(&pTq->lock);

  qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset);

  // till now, all data has been transferred to consumer, new data needs to push client once arrived.
  if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
      dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
    code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
    taosWUnLockLatch(&pTq->lock);
    return code;
  }

  taosWUnLockLatch(&pTq->lock);
  code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);

  // NOTE: this pHandle->consumerId may have been changed already.
  tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
          ", ts:%" PRId64 ", reqId:0x%" PRIx64,
          consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
          dataRsp.rspOffset.ts, pRequest->reqId);

  tDeleteSMqDataRsp(&dataRsp);
  return code;
}

static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
512
  int32_t      code = -1;
513
  STqOffsetVal offset = {0};
514
  SWalCkHead*  pCkHead = NULL;
515 516 517
  int32_t      vgId = TD_VID(pTq->pVnode);

  STqOffsetVal reqOffset = pRequest->reqOffset;
518
  uint64_t     consumerId = pRequest->consumerId;
L
Liu Jicong 已提交
519

520
  // 1. reset the offset if needed
521 522
  if (IS_OFFSET_RESET_TYPE(reqOffset.type)) {
    // handle the reset offset cases, according to the consumer's choice.
523
    bool blockReturned = false;
524
    code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
525 526 527 528 529 530 531
    if (code != 0) {
      return code;
    }

    // empty block returned, quit
    if (blockReturned) {
      return 0;
L
Liu Jicong 已提交
532
    }
533
  } else { // use the consumer specified offset
534
    // the offset value can not be monotonious increase??
535
    offset = reqOffset;
L
Liu Jicong 已提交
536 537
  }

538
  // this is a normal subscribe requirement
539
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
540
    return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
541 542
  }

543
  // todo handle the case where re-balance occurs.
544 545 546
  // for taosx
  SMqMetaRsp metaRsp = {0};
  STaosxRsp taosxRsp = {0};
547
  tqInitTaosxRsp(&taosxRsp, pRequest);
548

549 550
  if (offset.type != TMQ_OFFSET__LOG) {
    if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) {
L
Liu Jicong 已提交
551 552
      return -1;
    }
L
Liu Jicong 已提交
553

L
Liu Jicong 已提交
554
    if (metaRsp.metaRspLen > 0) {
555
      code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
H
Haojun Liao 已提交
556
      tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
wmmhello's avatar
wmmhello 已提交
557
                  ",ts:%" PRId64,
558
              consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
wmmhello's avatar
wmmhello 已提交
559
              metaRsp.rspOffset.ts);
wmmhello's avatar
wmmhello 已提交
560
      taosMemoryFree(metaRsp.metaRsp);
561 562
      tDeleteSTaosxRsp(&taosxRsp);
      return code;
wmmhello's avatar
wmmhello 已提交
563 564
    }

565
    if (taosxRsp.blockNum > 0) {
H
Haojun Liao 已提交
566
      code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
567 568
      tDeleteSTaosxRsp(&taosxRsp);
      return code;
L
Liu Jicong 已提交
569
    } else {
570
      offset = taosxRsp.rspOffset;
571
    }
wmmhello's avatar
wmmhello 已提交
572

H
Haojun Liao 已提交
573
    tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
wmmhello's avatar
wmmhello 已提交
574
                ",version:%" PRId64,
575 576
            consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
            taosxRsp.rspOffset.version);
577 578 579
  }

  if (offset.type == TMQ_OFFSET__LOG) {
580
    int64_t fetchVer = offset.version + 1;
wmmhello's avatar
wmmhello 已提交
581 582
    pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
    if (pCkHead == NULL) {
583
      tDeleteSTaosxRsp(&taosxRsp);
H
Haojun Liao 已提交
584
      terrno = TSDB_CODE_OUT_OF_MEMORY;
585
      return -1;
586
    }
wmmhello's avatar
wmmhello 已提交
587
    walSetReaderCapacity(pHandle->pWalReader, 2048);
588
    int totalRows = 0;
wmmhello's avatar
wmmhello 已提交
589
    while (1) {
590 591 592
      // todo refactor: this is not correct.
      int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
      if (savedEpoch > pRequest->epoch) {
H
Haojun Liao 已提交
593
        tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
wmmhello's avatar
wmmhello 已提交
594
                   ", found new consumer epoch %d, discard req epoch %d",
595
               consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
wmmhello's avatar
wmmhello 已提交
596 597 598
        break;
      }

599
      if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
600
        tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
601 602 603 604
        code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
        tDeleteSTaosxRsp(&taosxRsp);
        taosMemoryFreeClear(pCkHead);
        return code;
wmmhello's avatar
wmmhello 已提交
605 606 607
      }

      SWalCont* pHead = &pCkHead->head;
608
      tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
609
              pRequest->epoch, vgId, fetchVer, pHead->msgType);
wmmhello's avatar
wmmhello 已提交
610

611 612
      // process meta
      if (pHead->msgType != TDMT_VND_SUBMIT) {
613 614 615 616 617 618 619 620
        if(totalRows > 0) {
          tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
          code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
          tDeleteSTaosxRsp(&taosxRsp);
          taosMemoryFreeClear(pCkHead);
          return code;
        }

621
        tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
wmmhello's avatar
wmmhello 已提交
622 623 624 625
        tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
        metaRsp.resMsgType = pHead->msgType;
        metaRsp.metaRspLen = pHead->bodyLen;
        metaRsp.metaRsp = pHead->body;
626
        if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
627
          code = -1;
L
Liu Jicong 已提交
628
          taosMemoryFreeClear(pCkHead);
629
          tDeleteSTaosxRsp(&taosxRsp);
630
          return code;
wmmhello's avatar
wmmhello 已提交
631 632
        }
        code = 0;
L
Liu Jicong 已提交
633
        taosMemoryFreeClear(pCkHead);
634
        tDeleteSTaosxRsp(&taosxRsp);
635
        return code;
wmmhello's avatar
wmmhello 已提交
636
      }
637 638 639 640 641 642 643 644 645 646 647

      // process data
      SPackedData submit = {
          .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
          .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
          .ver = pHead->version,
      };

      if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows) < 0) {
        tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId,
                pRequest->subKey);
648 649
        taosMemoryFreeClear(pCkHead);
        tDeleteSTaosxRsp(&taosxRsp);
650 651 652 653 654 655 656 657 658 659 660 661
        return -1;
      }

      if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
        tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
        code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
        tDeleteSTaosxRsp(&taosxRsp);
        taosMemoryFreeClear(pCkHead);
        return code;
      } else {
        fetchVer++;
      }
wmmhello's avatar
wmmhello 已提交
662 663
    }
  }
664

665
  tDeleteSTaosxRsp(&taosxRsp);
L
Liu Jicong 已提交
666
  taosMemoryFreeClear(pCkHead);
667
  return 0;
L
Liu Jicong 已提交
668 669
}

670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
  SMqPollReq   req = {0};
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int64_t      consumerId = req.consumerId;
  int32_t      reqEpoch = req.epoch;
  STqOffsetVal reqOffset = req.reqOffset;
  int32_t      vgId = TD_VID(pTq->pVnode);

  // 1. find handle
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
    tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

691
  // 2. check re-balance status
692
  taosRLockLatch(&pTq->lock);
693 694 695 696
  if (pHandle->consumerId != consumerId) {
    tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
            consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
697
    taosRUnLockLatch(&pTq->lock);
698 699
    return -1;
  }
700
  taosRUnLockLatch(&pTq->lock);
701

702
  // 3. update the epoch value
703
  taosWLockLatch(&pTq->lock);
H
Haojun Liao 已提交
704 705
  int32_t savedEpoch = pHandle->epoch;
  if (savedEpoch < reqEpoch) {
706 707
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
    pHandle->epoch = reqEpoch;
H
Haojun Liao 已提交
708
  }
709
  taosWUnLockLatch(&pTq->lock);
710 711 712

  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
H
Haojun Liao 已提交
713 714
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
715

716
  return doPollDataForMq(pTq, pHandle, &req, pMsg);
717 718
}

719
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
720
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
L
Liu Jicong 已提交
721

L
Liu Jicong 已提交
722
  tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
L
Liu Jicong 已提交
723

724
  taosWLockLatch(&pTq->lock);
L
Liu Jicong 已提交
725 726 727 728
  int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
  if (code != 0) {
    tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
  }
729
  taosWUnLockLatch(&pTq->lock);
L
Liu Jicong 已提交
730

L
Liu Jicong 已提交
731 732
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
  if (pHandle) {
X
Xiaoyu Wang 已提交
733
    // walCloseRef(pHandle->pWalReader->pWal, pHandle->pRef->refId);
L
Liu Jicong 已提交
734 735 736 737 738 739 740
    if (pHandle->pRef) {
      walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
    }
    code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
    if (code != 0) {
      tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
    }
L
Liu Jicong 已提交
741
  }
742

L
Liu Jicong 已提交
743 744
  code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
  if (code != 0) {
745
    tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
L
Liu Jicong 已提交
746
  }
L
Liu Jicong 已提交
747

L
Liu Jicong 已提交
748
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
L
Liu Jicong 已提交
749
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
750
  }
L
Liu Jicong 已提交
751
  return 0;
L
Liu Jicong 已提交
752 753
}

754
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
755 756
  STqCheckInfo info = {0};
  SDecoder     decoder;
X
Xiaoyu Wang 已提交
757
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
758
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
759 760 761 762
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
763 764 765 766 767
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
768 769 770 771 772 773
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

774
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
775 776 777 778 779 780 781 782 783 784 785
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

786
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
787
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
788
  tDecodeSMqRebVgReq(msg, &req);
L
Liu Jicong 已提交
789

790 791 792
  SVnode* pVnode = pTq->pVnode;
  int32_t vgId = TD_VID(pVnode);

793
  tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
794
          req.oldConsumerId, req.newConsumerId);
L
Liu Jicong 已提交
795

796
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
797
  if (pHandle == NULL) {
L
Liu Jicong 已提交
798
    if (req.oldConsumerId != -1) {
799
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
800
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
L
Liu Jicong 已提交
801
    }
802

L
Liu Jicong 已提交
803
    if (req.newConsumerId == -1) {
804
      tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
L
Liu Jicong 已提交
805
      taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
806 807
      return 0;
    }
808

L
Liu Jicong 已提交
809 810
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
811 812
    /*taosInitRWLatch(&pExec->lock);*/

H
Haojun Liao 已提交
813
    uint64_t oldConsumerId = pHandle->consumerId;
L
Liu Jicong 已提交
814 815 816
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
817

L
Liu Jicong 已提交
818
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
819
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
820

821
    // TODO version should be assigned and refed during preprocess
822
    SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
823
    if (pRef == NULL) {
H
Haojun Liao 已提交
824
      taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
825
      return -1;
826
    }
H
Haojun Liao 已提交
827

828 829
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
830

831
    SReadHandle handle = {
832 833
        .meta = pVnode->pMeta,
        .vnode = pVnode,
834 835 836 837
        .initTableReader = true,
        .initTqReader = true,
        .version = ver,
    };
838

wmmhello's avatar
wmmhello 已提交
839
    pHandle->snapshotVer = ver;
840

L
Liu Jicong 已提交
841
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
842
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
843
      req.qmsg = NULL;
844 845

      pHandle->execHandle.task =
846
          qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, NULL);
L
Liu Jicong 已提交
847
      void* scanner = NULL;
848
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
L
Liu Jicong 已提交
849
      pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
L
Liu Jicong 已提交
850
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
851 852
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
      pHandle->execHandle.pExecReader = tqOpenReader(pVnode);
853

L
Liu Jicong 已提交
854
      pHandle->execHandle.execDb.pFilterOutTbUid =
L
Liu Jicong 已提交
855
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
856 857
      buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
858

859
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
L
Liu Jicong 已提交
860
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
861
      pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
wmmhello's avatar
wmmhello 已提交
862 863
      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
864
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
865 866
      vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
867 868
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
869
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
L
Liu Jicong 已提交
870
      }
871
      pHandle->execHandle.pExecReader = tqOpenReader(pVnode);
L
Liu Jicong 已提交
872
      tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
L
Liu Jicong 已提交
873
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
874

L
Liu Jicong 已提交
875 876
      buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
877
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
L
Liu Jicong 已提交
878
    }
H
Haojun Liao 已提交
879

880
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
881 882
    tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
            pHandle->consumerId, oldConsumerId);
L
Liu Jicong 已提交
883
    if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
H
Haojun Liao 已提交
884
      taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
885
      return -1;
L
Liu Jicong 已提交
886
    }
L
Liu Jicong 已提交
887
  } else {
888 889 890 891
    if (pHandle->consumerId == req.newConsumerId) {  // do nothing
      tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
      atomic_store_32(&pHandle->epoch, -1);
      atomic_add_fetch_32(&pHandle->epoch, 1);
H
Haojun Liao 已提交
892
      taosMemoryFree(req.qmsg);
893
      return tqMetaSaveHandle(pTq, req.subKey, pHandle);
894 895 896
    } else {
      tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
             req.newConsumerId);
897

898 899
      taosWLockLatch(&pTq->lock);
      atomic_store_32(&pHandle->epoch, -1);
900

901 902
      // remove if it has been register in the push manager, and return one empty block to consumer
      tqRemovePushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
903

904 905
      atomic_store_64(&pHandle->consumerId, req.newConsumerId);
      atomic_add_fetch_32(&pHandle->epoch, 1);
906

907 908 909
      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
        qStreamCloseTsdbReader(pHandle->execHandle.task);
      }
H
Haojun Liao 已提交
910

911 912 913 914 915
      taosWUnLockLatch(&pTq->lock);
      if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
        taosMemoryFree(req.qmsg);
        return -1;
      }
L
Liu Jicong 已提交
916
    }
L
Liu Jicong 已提交
917
  }
L
Liu Jicong 已提交
918

H
Haojun Liao 已提交
919
  taosMemoryFree(req.qmsg);
L
Liu Jicong 已提交
920
  return 0;
L
Liu Jicong 已提交
921
}
922

923
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
L
Liu Jicong 已提交
924
#if 0
925
  if (pTask->taskLevel == TASK_LEVEL__AGG) {
L
Liu Jicong 已提交
926
    A(taosArrayGetSize(pTask->childEpInfo) != 0);
L
Liu Jicong 已提交
927
  }
L
Liu Jicong 已提交
928
#endif
L
Liu Jicong 已提交
929

930
  int32_t vgId = TD_VID(pTq->pVnode);
L
Liu Jicong 已提交
931
  pTask->refCnt = 1;
L
Liu Jicong 已提交
932
  pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
L
Liu Jicong 已提交
933 934 935

  pTask->inputQueue = streamQueueOpen();
  pTask->outputQueue = streamQueueOpen();
L
Liu Jicong 已提交
936 937

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
938
    return -1;
L
Liu Jicong 已提交
939 940
  }

L
Liu Jicong 已提交
941 942
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
943
  pTask->pMsgCb = &pTq->pVnode->msgCb;
944 945
  pTask->startVer = ver;

946
  // expand executor
947 948 949 950
  if (pTask->fillHistory) {
    pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
  }

951
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
952
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
953 954 955 956
    if (pTask->pState == NULL) {
      return -1;
    }

957 958 959 960
    SReadHandle handle = {
        .meta = pTq->pVnode->pMeta,
        .vnode = pTq->pVnode,
        .initTqReader = 1,
961
        .pStateBackend = pTask->pState,
962
    };
963 964

    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
L
Liu Jicong 已提交
965 966 967
    if (pTask->exec.executor == NULL) {
      return -1;
    }
968

969
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
970
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
971 972 973
    if (pTask->pState == NULL) {
      return -1;
    }
974 975 976
    SReadHandle mgHandle = {
        .vnode = NULL,
        .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
977
        .pStateBackend = pTask->pState,
978
    };
979 980

    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId);
L
Liu Jicong 已提交
981 982 983
    if (pTask->exec.executor == NULL) {
      return -1;
    }
L
Liu Jicong 已提交
984
  }
L
Liu Jicong 已提交
985 986

  // sink
L
Liu Jicong 已提交
987
  /*pTask->ahandle = pTq->pVnode;*/
988
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
989
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
990
    pTask->smaSink.smaSink = smaHandleRes;
991
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
992
    pTask->tbSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
993
    pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
L
Liu Jicong 已提交
994

H
Haojun Liao 已提交
995
    int32_t ver1 = 1;
5
54liuyao 已提交
996 997 998
    SMetaInfo info = {0};
    int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
    if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
999
      ver1 = info.skmVer;
5
54liuyao 已提交
1000
    }
L
Liu Jicong 已提交
1001

L
Liu Jicong 已提交
1002
    pTask->tbSink.pTSchema =
H
Haojun Liao 已提交
1003
        tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, ver1);
wmmhello's avatar
wmmhello 已提交
1004
    if(pTask->tbSink.pTSchema == NULL) {
wmmhello's avatar
wmmhello 已提交
1005
      return -1;
wmmhello's avatar
wmmhello 已提交
1006
    }
L
Liu Jicong 已提交
1007
  }
1008 1009

  streamSetupTrigger(pTask);
1010
  tqInfo("expand stream task on vg %d, task id %d, child id %d, level %d", vgId, pTask->taskId, pTask->selfChildId, pTask->taskLevel);
L
Liu Jicong 已提交
1011
  return 0;
L
Liu Jicong 已提交
1012
}
L
Liu Jicong 已提交
1013

1014 1015 1016 1017 1018 1019
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
  char*               msgStr = pMsg->pCont;
  char*               msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t             msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamTaskCheckReq req;
  SDecoder            decoder;
X
Xiaoyu Wang 已提交
1020
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032
  tDecodeSStreamTaskCheckReq(&decoder, &req);
  tDecoderClear(&decoder);
  int32_t             taskId = req.downstreamTaskId;
  SStreamTaskCheckRsp rsp = {
      .reqId = req.reqId,
      .streamId = req.streamId,
      .childId = req.childId,
      .downstreamNodeId = req.downstreamNodeId,
      .downstreamTaskId = req.downstreamTaskId,
      .upstreamNodeId = req.upstreamNodeId,
      .upstreamTaskId = req.upstreamTaskId,
  };
L
Liu Jicong 已提交
1033
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1034 1035 1036 1037 1038 1039
  if (pTask && atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
    rsp.status = 1;
  } else {
    rsp.status = 0;
  }

L
Liu Jicong 已提交
1040
  if (pTask) streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1041

1042
  tqDebug("tq recv task check req(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
1043 1044 1045 1046 1047 1048 1049
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

  SEncoder encoder;
  int32_t  code;
  int32_t  len;
  tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
  if (code < 0) {
L
Liu Jicong 已提交
1050
    tqError("unable to encode rsp %d", __LINE__);
L
Liu Jicong 已提交
1051
    return -1;
1052
  }
L
Liu Jicong 已提交
1053

1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
  ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
  tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
  tEncoderClear(&encoder);

  SRpcMsg rspMsg = {
      .code = 0,
      .pCont = buf,
      .contLen = sizeof(SMsgHead) + len,
      .info = pMsg->info,
  };

  tmsgSendRsp(&rspMsg);
  return 0;
}

1073
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085
  int32_t             code;
  SStreamTaskCheckRsp rsp;

  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
  if (code < 0) {
    tDecoderClear(&decoder);
    return -1;
  }
  tDecoderClear(&decoder);

1086
  tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
1087 1088
          rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);

L
Liu Jicong 已提交
1089
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
1090 1091 1092 1093
  if (pTask == NULL) {
    return -1;
  }

1094
  code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
L
Liu Jicong 已提交
1095 1096
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
  return code;
1097 1098
}

1099
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1100 1101 1102 1103 1104
  int32_t code;
#if 0
  code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
  if (code < 0) return code;
#endif
5
54liuyao 已提交
1105 1106 1107
  if (tsDisableStream) {
    return 0;
  }
1108 1109 1110 1111 1112 1113

  // 1.deserialize msg and build task
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
1114

1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  code = tDecodeSStreamTask(&decoder, pTask);
  if (code < 0) {
    tDecoderClear(&decoder);
    taosMemoryFree(pTask);
    return -1;
  }
  tDecoderClear(&decoder);

  // 2.save task
1126
  code = streamMetaAddTask(pTq->pStreamMeta, sversion, pTask);
1127 1128 1129 1130 1131 1132
  if (code < 0) {
    return -1;
  }

  // 3.go through recover steps to fill history
  if (pTask->fillHistory) {
1133
    streamTaskCheckDownstream(pTask, sversion);
1134 1135 1136 1137 1138
  }

  return 0;
}

L
Liu Jicong 已提交
1139 1140 1141 1142 1143
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
  int32_t code;
  char*   msg = pMsg->pCont;
  int32_t msgLen = pMsg->contLen;

1144
  SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
L
Liu Jicong 已提交
1145
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1146 1147 1148 1149 1150 1151 1152
  if (pTask == NULL) {
    return -1;
  }

  // check param
  int64_t fillVer1 = pTask->startVer;
  if (fillVer1 <= 0) {
L
Liu Jicong 已提交
1153
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1154 1155 1156 1157 1158 1159
    return -1;
  }

  // do recovery step 1
  streamSourceRecoverScanStep1(pTask);

L
Liu Jicong 已提交
1160 1161 1162 1163 1164
  if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1165 1166 1167 1168
  // build msg to launch next step
  SStreamRecoverStep2Req req;
  code = streamBuildSourceRecover2Req(pTask, &req);
  if (code < 0) {
L
Liu Jicong 已提交
1169
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1170 1171 1172
    return -1;
  }

L
Liu Jicong 已提交
1173
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1174

L
Liu Jicong 已提交
1175 1176 1177 1178
  if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
    return 0;
  }

1179
  // serialize msg
L
Liu Jicong 已提交
1180 1181 1182 1183 1184 1185 1186 1187
  int32_t len = sizeof(SStreamRecoverStep1Req);

  void* serializedReq = rpcMallocCont(len);
  if (serializedReq == NULL) {
    return -1;
  }

  memcpy(serializedReq, &req, len);
1188 1189 1190 1191 1192

  // dispatch msg
  SRpcMsg rpcMsg = {
      .code = 0,
      .contLen = len,
L
Liu Jicong 已提交
1193
      .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE,
L
Liu Jicong 已提交
1194
      .pCont = serializedReq,
1195 1196 1197 1198 1199 1200 1201
  };

  tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);

  return 0;
}

1202
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1203 1204
  int32_t                 code;
  SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
L
Liu Jicong 已提交
1205
  SStreamTask*            pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
1206 1207 1208 1209 1210
  if (pTask == NULL) {
    return -1;
  }

  // do recovery step 2
1211
  code = streamSourceRecoverScanStep2(pTask, sversion);
1212
  if (code < 0) {
L
Liu Jicong 已提交
1213
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1214 1215 1216
    return -1;
  }

L
Liu Jicong 已提交
1217 1218 1219 1220 1221
  if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
    return 0;
  }

1222 1223 1224
  // restore param
  code = streamRestoreParam(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1225
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1226 1227 1228 1229 1230 1231
    return -1;
  }

  // set status normal
  code = streamSetStatusNormal(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1232
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1233 1234 1235 1236 1237 1238
    return -1;
  }

  // dispatch recover finish req to all related downstream task
  code = streamDispatchRecoverFinishReq(pTask);
  if (code < 0) {
L
Liu Jicong 已提交
1239
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1240 1241 1242
    return -1;
  }

L
Liu Jicong 已提交
1243 1244 1245
  atomic_store_8(&pTask->fillHistory, 0);
  streamMetaSaveTask(pTq->pStreamMeta, pTask);

L
Liu Jicong 已提交
1246 1247
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);

1248 1249 1250
  return 0;
}

L
Liu Jicong 已提交
1251 1252 1253
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
1254 1255

  // deserialize
1256 1257 1258
  SStreamRecoverFinishReq req;

  SDecoder decoder;
X
Xiaoyu Wang 已提交
1259
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1260 1261 1262
  tDecodeSStreamRecoverFinishReq(&decoder, &req);
  tDecoderClear(&decoder);

1263
  // find task
L
Liu Jicong 已提交
1264
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
1265 1266 1267
  if (pTask == NULL) {
    return -1;
  }
1268
  // do process request
1269
  if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
L
Liu Jicong 已提交
1270
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1271 1272 1273
    return -1;
  }

L
Liu Jicong 已提交
1274
  streamMetaReleaseTask(pTq->pStreamMeta, pTask);
1275
  return 0;
L
Liu Jicong 已提交
1276
}
L
Liu Jicong 已提交
1277

L
Liu Jicong 已提交
1278 1279 1280 1281 1282
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}

L
Liu Jicong 已提交
1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
  bool        failed = false;
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    failed = true;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t sz = taosArrayGetSize(pRes->uidList);
L
Liu Jicong 已提交
1299
  if (sz == 0 || pRes->affectedRows == 0) {
L
Liu Jicong 已提交
1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310
    taosArrayDestroy(pRes->uidList);
    return 0;
  }
  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, sz);
  pDelBlock->info.rows = sz;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < sz; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
1311
    colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
L
Liu Jicong 已提交
1312
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
1313
    colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
L
Liu Jicong 已提交
1314 1315 1316
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
1317
    colDataSetVal(pUidCol, i, (const char*)pUid, false);
L
Liu Jicong 已提交
1318

1319 1320 1321
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
L
Liu Jicong 已提交
1322 1323
  }

L
Liu Jicong 已提交
1324 1325
  taosArrayDestroy(pRes->uidList);

L
Liu Jicong 已提交
1326 1327 1328
  int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
  *pRef = 1;

L
Liu Jicong 已提交
1329 1330 1331 1332 1333 1334 1335 1336 1337
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
    if (pIter == NULL) break;
    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;

    qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);

L
Liu Jicong 已提交
1338
    if (!failed) {
S
Shengliang Guan 已提交
1339
      SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1340 1341 1342 1343 1344 1345 1346
      pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
      pRefBlock->pBlock = pDelBlock;
      pRefBlock->dataRef = pRef;
      atomic_add_fetch_32(pRefBlock->dataRef, 1);

      if (streamTaskInput(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
        qError("stream task input del failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
1347

L
Liu Jicong 已提交
1348
        atomic_sub_fetch_32(pRef, 1);
L
Liu Jicong 已提交
1349
        taosFreeQitem(pRefBlock);
L
Liu Jicong 已提交
1350 1351
        continue;
      }
L
Liu Jicong 已提交
1352

L
Liu Jicong 已提交
1353 1354 1355 1356
      if (streamSchedExec(pTask) < 0) {
        qError("stream task launch failed, task id %d", pTask->taskId);
        continue;
      }
L
Liu Jicong 已提交
1357

L
Liu Jicong 已提交
1358 1359 1360 1361
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1362

L
Liu Jicong 已提交
1363
  int32_t ref = atomic_sub_fetch_32(pRef, 1);
L
Liu Jicong 已提交
1364
  /*A(ref >= 0);*/
L
Liu Jicong 已提交
1365
  if (ref == 0) {
L
Liu Jicong 已提交
1366
    blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1367 1368 1369 1370
    taosMemoryFree(pRef);
  }

#if 0
S
Shengliang Guan 已提交
1371
    SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
L
Liu Jicong 已提交
1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392
    pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
    pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
    SSDataBlock block = {0};
    assignOneDataBlock(&block, pDelBlock);
    block.info.type = STREAM_DELETE_DATA;
    taosArrayPush(pStreamBlock->blocks, &block);

    if (!failed) {
      if (streamTaskInput(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
        qError("stream task input del failed, task id %d", pTask->taskId);
        continue;
      }

      if (streamSchedExec(pTask) < 0) {
        qError("stream task launch failed, task id %d", pTask->taskId);
        continue;
      }
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
1393
  blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
1394
#endif
L
Liu Jicong 已提交
1395 1396 1397 1398

  return 0;
}

L
Liu Jicong 已提交
1399
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
L
Liu Jicong 已提交
1400 1401 1402
  void*               pIter = NULL;
  bool                failed = false;
  SStreamDataSubmit2* pSubmit = NULL;
L
Liu Jicong 已提交
1403

L
Liu Jicong 已提交
1404
  pSubmit = streamDataSubmitNew(submit);
L
Liu Jicong 已提交
1405
  if (pSubmit == NULL) {
L
Liu Jicong 已提交
1406
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1407
    tqError("failed to create data submit for stream since out of memory");
L
Liu Jicong 已提交
1408 1409 1410 1411
    failed = true;
  }

  while (1) {
L
Liu Jicong 已提交
1412
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
1413 1414 1415 1416
    if (pIter == NULL) {
      break;
    }

1417
    SStreamTask* pTask = *(SStreamTask**)pIter;
1418
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
1419
    if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
L
Liu Jicong 已提交
1420 1421 1422
      tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus);
      continue;
    }
L
Liu Jicong 已提交
1423

L
Liu Jicong 已提交
1424
    tqDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, submit.ver);
L
Liu Jicong 已提交
1425

L
Liu Jicong 已提交
1426 1427
    if (!failed) {
      if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
L
Liu Jicong 已提交
1428
        tqError("stream task input failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
1429 1430 1431
        continue;
      }

L
Liu Jicong 已提交
1432
      if (streamSchedExec(pTask) < 0) {
L
Liu Jicong 已提交
1433
        tqError("stream task launch failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
1434 1435
        continue;
      }
L
Liu Jicong 已提交
1436
    } else {
L
Liu Jicong 已提交
1437
      streamTaskInputFail(pTask);
L
Liu Jicong 已提交
1438 1439 1440
    }
  }

L
Liu Jicong 已提交
1441
  if (pSubmit) {
L
Liu Jicong 已提交
1442
    streamDataSubmitRefDec(pSubmit);
L
Liu Jicong 已提交
1443
    taosFreeQitem(pSubmit);
L
Liu Jicong 已提交
1444
  }
L
Liu Jicong 已提交
1445 1446

  return failed ? -1 : 0;
L
Liu Jicong 已提交
1447 1448
}

L
Liu Jicong 已提交
1449 1450 1451
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRunReq* pReq = pMsg->pCont;
  int32_t            taskId = pReq->taskId;
L
Liu Jicong 已提交
1452
  SStreamTask*       pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1453 1454
  if (pTask) {
    streamProcessRunReq(pTask);
L
Liu Jicong 已提交
1455
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1456
    return 0;
1457 1458
  } else {
    return -1;
L
Liu Jicong 已提交
1459
  }
L
Liu Jicong 已提交
1460 1461
}

L
Liu Jicong 已提交
1462
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
1463 1464 1465 1466 1467
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamDispatchReq req;
  SDecoder           decoder;
L
Liu Jicong 已提交
1468
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1469
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1470 1471
  int32_t taskId = req.taskId;

L
Liu Jicong 已提交
1472
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1473
  if (pTask) {
1474 1475 1476 1477
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1478
    streamProcessDispatchReq(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1479
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1480
    return 0;
1481 1482
  } else {
    return -1;
L
Liu Jicong 已提交
1483
  }
L
Liu Jicong 已提交
1484 1485
}

L
Liu Jicong 已提交
1486 1487
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1488
  int32_t             taskId = ntohl(pRsp->upstreamTaskId);
L
Liu Jicong 已提交
1489
  SStreamTask*        pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
1490
  tqDebug("recv dispatch rsp, code: %x", pMsg->code);
L
Liu Jicong 已提交
1491
  if (pTask) {
1492
    streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
L
Liu Jicong 已提交
1493
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1494
    return 0;
1495 1496
  } else {
    return -1;
L
Liu Jicong 已提交
1497
  }
L
Liu Jicong 已提交
1498
}
L
Liu Jicong 已提交
1499

1500
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1501
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
1502
  streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1503
  return 0;
L
Liu Jicong 已提交
1504
}
L
Liu Jicong 已提交
1505 1506 1507 1508 1509 1510 1511

int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
1512
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1513
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1514
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1515
  int32_t      taskId = req.dstTaskId;
L
Liu Jicong 已提交
1516
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1517
  if (pTask) {
L
Liu Jicong 已提交
1518 1519 1520 1521
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1522
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1523
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1524
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1525
    return 0;
L
Liu Jicong 已提交
1526 1527
  } else {
    return -1;
L
Liu Jicong 已提交
1528 1529 1530 1531 1532 1533 1534
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1535

1536 1537 1538 1539 1540 1541
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*      pTq = pVnode->pTq;
  SMsgHead* msgStr = pMsg->pCont;
  char*     msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t   msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t   code = 0;
L
Liu Jicong 已提交
1542 1543 1544

  SStreamDispatchReq req;
  SDecoder           decoder;
1545
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
L
Liu Jicong 已提交
1546 1547
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1548
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1549 1550
    goto FAIL;
  }
L
Liu Jicong 已提交
1551
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1552

L
Liu Jicong 已提交
1553
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1554

L
Liu Jicong 已提交
1555
  SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1556
  if (pTask) {
L
Liu Jicong 已提交
1557 1558 1559 1560
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1561
    streamProcessDispatchReq(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1562
    streamMetaReleaseTask(pTq->pStreamMeta, pTask);
L
Liu Jicong 已提交
1563 1564
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
1565
    return 0;
L
Liu Jicong 已提交
1566
  }
L
Liu Jicong 已提交
1567

1568 1569
  code = TSDB_CODE_STREAM_TASK_NOT_EXIST;

L
Liu Jicong 已提交
1570
FAIL:
1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594
  if (pMsg->info.handle == NULL) return -1;

  SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
  if (pRspHead == NULL) {
    SRpcMsg rsp = {
        .code = TSDB_CODE_OUT_OF_MEMORY,
        .info = pMsg->info,
    };
    tqDebug("send dispatch error rsp, code: %x", code);
    tmsgSendRsp(&rsp);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
    return -1;
  }

  pRspHead->vgId = htonl(req.upstreamNodeId);
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
  pRsp->streamId = htobe64(req.streamId);
  pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
  pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
  pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
  pRsp->downstreamTaskId = htonl(req.taskId);
  pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
1595 1596 1597
  SRpcMsg rsp = {
      .code = code,
      .info = pMsg->info,
1598 1599
      .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp),
      .pCont = pRspHead,
L
Liu Jicong 已提交
1600
  };
1601
  tqDebug("send dispatch error rsp, code: %x", code);
L
Liu Jicong 已提交
1602
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1603 1604
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
1605
  return -1;
L
Liu Jicong 已提交
1606
}
L
Liu Jicong 已提交
1607

1608
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }