tq.c 33.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

L
Liu Jicong 已提交
18
int32_t tqInit() {
L
Liu Jicong 已提交
19 20 21 22 23 24
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

25 26 27 28 29 30
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
31 32 33
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
34
    atomic_store_8(&tqMgmt.inited, 1);
35
  }
36

L
Liu Jicong 已提交
37 38
  return 0;
}
L
Liu Jicong 已提交
39

40
void tqCleanUp() {
L
Liu Jicong 已提交
41 42 43 44 45 46 47 48
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
49
    streamCleanUp();
L
Liu Jicong 已提交
50 51
    atomic_store_8(&tqMgmt.inited, 0);
  }
52
}
L
Liu Jicong 已提交
53

L
Liu Jicong 已提交
54
STQ* tqOpen(const char* path, SVnode* pVnode) {
55
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
56
  if (pTq == NULL) {
L
Liu Jicong 已提交
57
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
58 59
    return NULL;
  }
H
Hongze Cheng 已提交
60
  pTq->path = strdup(path);
L
Liu Jicong 已提交
61
  pTq->pVnode = pVnode;
62

63
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
64

65
  pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
66

67
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
68

L
Liu Jicong 已提交
69
  if (tqMetaOpen(pTq) < 0) {
70 71 72
    ASSERT(0);
  }

73 74 75 76
  if (tqOffsetOpen(pTq) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
77 78 79 80 81
  pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask);
  if (pTq->pStreamMeta == NULL) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
82 83 84 85
  if (streamLoadTasks(pTq->pStreamMeta) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
86 87
  return pTq;
}
L
Liu Jicong 已提交
88

L
Liu Jicong 已提交
89
void tqClose(STQ* pTq) {
H
Hongze Cheng 已提交
90
  if (pTq) {
91
    tqOffsetClose(pTq->pOffsetStore);
92 93 94
    taosHashCleanup(pTq->pHandle);
    taosHashCleanup(pTq->pPushMgr);
    taosHashCleanup(pTq->pCheckInfo);
95
    taosMemoryFree(pTq->path);
L
Liu Jicong 已提交
96
    tqMetaClose(pTq);
L
Liu Jicong 已提交
97
    streamMetaClose(pTq->pStreamMeta);
wafwerar's avatar
wafwerar 已提交
98
    taosMemoryFree(pTq);
H
Hongze Cheng 已提交
99
  }
L
Liu Jicong 已提交
100
}
L
Liu Jicong 已提交
101

L
Liu Jicong 已提交
102
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
103 104 105 106 107 108 109
  int32_t len = 0;
  int32_t code = 0;
  tEncodeSize(tEncodeSMqMetaRsp, pRsp, len, code);
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
L
Liu Jicong 已提交
110 111 112 113 114 115 116 117 118 119
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_META_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
120 121 122 123 124

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
  tEncodeSMqMetaRsp(&encoder, pRsp);
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
125 126 127 128 129 130 131 132 133

  SRpcMsg resp = {
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
  tmsgSendRsp(&resp);

134 135
  tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, offset type:%d",
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
L
Liu Jicong 已提交
136 137 138 139

  return 0;
}

L
Liu Jicong 已提交
140 141 142 143 144 145 146 147 148 149
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) {
  ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
  ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);

  if (pRsp->withSchema) {
    ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
  } else {
    ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
  }

150 151 152 153 154 155
  if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
    if (pRsp->blockNum > 0) {
      ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
    } else {
      ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
    }
L
Liu Jicong 已提交
156 157
  }

wmmhello's avatar
wmmhello 已提交
158 159
  int32_t len = 0;
  int32_t code = 0;
L
Liu Jicong 已提交
160 161 162 163 164
  tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
L
Liu Jicong 已提交
165 166 167 168 169 170 171 172 173 174 175
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

wmmhello's avatar
wmmhello 已提交
176
  SEncoder encoder = {0};
L
Liu Jicong 已提交
177
  tEncoderInit(&encoder, abuf, len);
L
Liu Jicong 已提交
178
  tEncodeSMqDataRsp(&encoder, pRsp);
wmmhello's avatar
wmmhello 已提交
179
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
180 181

  SRpcMsg rsp = {
L
Liu Jicong 已提交
182 183 184 185 186
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
L
Liu Jicong 已提交
187
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
188

wmmhello's avatar
wmmhello 已提交
189 190
  char buf1[80] = {0};
  char buf2[80] = {0};
L
Liu Jicong 已提交
191 192
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);
S
Shengliang Guan 已提交
193
  tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
L
Liu Jicong 已提交
194
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
L
Liu Jicong 已提交
195 196 197 198

  return 0;
}

199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
  ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
  ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);

  if (pRsp->withSchema) {
    ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
  } else {
    ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
  }

  if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
    if (pRsp->blockNum > 0) {
      ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
    } else {
      ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
    }
  }

  int32_t len = 0;
  int32_t code = 0;
  tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTaosxRsp(&encoder, pRsp);
  tEncoderClear(&encoder);

  SRpcMsg rsp = {
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
  tmsgSendRsp(&rsp);

  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);
  tqDebug("taosx rsp, vgId:%d, from consumer:%" PRId64
          ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);

  return 0;
}

259 260 261 262 263 264
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
265 266 267 268 269 270 271 272 273
  STqOffset offset = {0};
  SDecoder  decoder;
  tDecoderInit(&decoder, msg, msgLen);
  if (tDecodeSTqOffset(&decoder, &offset) < 0) {
    ASSERT(0);
    return -1;
  }
  tDecoderClear(&decoder);

wmmhello's avatar
wmmhello 已提交
274
  if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
275 276
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
            offset.subKey, TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts);
L
Liu Jicong 已提交
277
  } else if (offset.val.type == TMQ_OFFSET__LOG) {
S
Shengliang Guan 已提交
278
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
L
Liu Jicong 已提交
279
            TD_VID(pTq->pVnode), offset.val.version);
280 281 282
    if (offset.val.version + 1 == version) {
      offset.val.version += 1;
    }
283 284 285
  } else {
    ASSERT(0);
  }
286 287 288 289 290
  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
  if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) {
    return 0;
  }

291 292 293
  if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
    ASSERT(0);
    return -1;
294
  }
295 296

  if (offset.val.type == TMQ_OFFSET__LOG) {
297
    STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
L
Liu Jicong 已提交
298 299 300 301 302
    if (pHandle) {
      if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
        ASSERT(0);
        return -1;
      }
303 304 305
    }
  }

306 307
  // rsp

308 309
  /*}*/
  /*}*/
310 311 312 313

  return 0;
}

L
Liu Jicong 已提交
314
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
315 316
  void* pIter = NULL;
  while (1) {
317
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
318
    if (pIter == NULL) break;
319
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
L
Liu Jicong 已提交
320 321
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
322
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
323 324
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
325
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
326 327 328 329 330 331 332 333
          return -1;
        }
      }
    }
  }
  return 0;
}

L
Liu Jicong 已提交
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL) {
    return -1;
  }

  pRsp->withTbName = pReq->withTbName;
  if (pRsp->withTbName) {
    pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
    if (pRsp->blockTbName == NULL) {
      // TODO free
      return -1;
    }
  }

  if (subType == TOPIC_SUB_TYPE__COLUMN) {
    pRsp->withSchema = false;
  } else {
    pRsp->withSchema = true;
    pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
    if (pRsp->blockSchema == NULL) {
      // TODO free
      return -1;
    }
  }
  return 0;
}

366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->withTbName = 1;
  pRsp->withSchema = 1;
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
382
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
383 384 385 386 387 388
  SMqPollReq*  pReq = pMsg->pCont;
  int64_t      consumerId = pReq->consumerId;
  int32_t      reqEpoch = pReq->epoch;
  int32_t      code = 0;
  STqOffsetVal reqOffset = pReq->reqOffset;
  STqOffsetVal fetchOffsetNew;
wmmhello's avatar
wmmhello 已提交
389
  SWalCkHead*  pCkHead = NULL;
L
Liu Jicong 已提交
390 391

  // 1.find handle
392
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
L
Liu Jicong 已提交
393 394
  /*ASSERT(pHandle);*/
  if (pHandle == NULL) {
S
Shengliang Guan 已提交
395 396
    tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId,
            TD_VID(pTq->pVnode), pReq->subKey);
L
Liu Jicong 已提交
397 398 399 400 401
    return -1;
  }

  // check rebalance
  if (pHandle->consumerId != consumerId) {
S
Shengliang Guan 已提交
402 403
    tqError("tmq poll: consumer handle mismatch for consumer:%" PRId64
            ", in vgId:%d, subkey %s, handle consumer id %" PRId64,
L
Liu Jicong 已提交
404
            consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId);
L
Liu Jicong 已提交
405
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
L
Liu Jicong 已提交
406 407 408 409 410 411 412 413 414
    return -1;
  }

  // update epoch if need
  int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
  while (consumerEpoch < reqEpoch) {
    consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
  }

L
Liu Jicong 已提交
415 416
  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
S
Shengliang Guan 已提交
417
  tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
L
Liu Jicong 已提交
418 419
          pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);

L
Liu Jicong 已提交
420 421 422 423 424 425 426
  // 2.reset offset if needed
  if (reqOffset.type > 0) {
    fetchOffsetNew = reqOffset;
  } else {
    STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
    if (pOffset != NULL) {
      fetchOffsetNew = pOffset->val;
L
Liu Jicong 已提交
427 428
      char formatBuf[80];
      tFormatOffset(formatBuf, 80, &fetchOffsetNew);
L
Liu Jicong 已提交
429 430
      tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey,
              TD_VID(pTq->pVnode), formatBuf);
L
Liu Jicong 已提交
431 432
    } else {
      if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
L
Liu Jicong 已提交
433 434
        if (pReq->useSnapshot) {
          if (pHandle->fetchMeta) {
wmmhello's avatar
wmmhello 已提交
435
            tqOffsetResetToMeta(&fetchOffsetNew, 0);
L
Liu Jicong 已提交
436
          } else {
437
            tqOffsetResetToData(&fetchOffsetNew, 0, 0);
L
Liu Jicong 已提交
438 439 440 441 442
          }
        } else {
          tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal));
        }
      } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
443 444 445
        SMqDataRsp dataRsp = {0};
        tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);

L
Liu Jicong 已提交
446
        tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
S
Shengliang Guan 已提交
447 448
        tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId,
                pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
L
Liu Jicong 已提交
449 450 451
        if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
          code = -1;
        }
L
Liu Jicong 已提交
452 453
        tDeleteSMqDataRsp(&dataRsp);
        return code;
L
Liu Jicong 已提交
454
      } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
455 456
        tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64
                " in vg %d, subkey %s, reset none failed",
L
Liu Jicong 已提交
457
                pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
L
Liu Jicong 已提交
458
        terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
459
        return -1;
L
Liu Jicong 已提交
460 461 462 463
      }
    }
  }

464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
    SMqDataRsp dataRsp = {0};
    tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
    tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);

    if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
      code = -1;
    }

    tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid:%ld, version:%ld",
            consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
            dataRsp.rspOffset.uid, dataRsp.rspOffset.version);

    tDeleteSMqDataRsp(&dataRsp);
    return code;
  }

  // for taosx
  ASSERT(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN);

  SMqMetaRsp metaRsp = {0};

  STaosxRsp taosxRsp = {0};
  tqInitTaosxRsp(&taosxRsp, pReq);

  if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
    tqScan(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew);
L
Liu Jicong 已提交
491

L
Liu Jicong 已提交
492
    if (metaRsp.metaRspLen > 0) {
wmmhello's avatar
wmmhello 已提交
493 494 495
      if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
        code = -1;
      }
L
Liu Jicong 已提交
496 497 498
      tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send meta offset type:%d,uid:%ld,version:%ld", consumerId,
              pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
              metaRsp.rspOffset.version);
wmmhello's avatar
wmmhello 已提交
499
      taosMemoryFree(metaRsp.metaRsp);
500 501
      tDeleteSTaosxRsp(&taosxRsp);
      return code;
wmmhello's avatar
wmmhello 已提交
502 503
    }

504 505
    if (taosxRsp.blockNum > 0) {
      if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
506 507
        code = -1;
      }
508 509
      tDeleteSTaosxRsp(&taosxRsp);
      return code;
L
Liu Jicong 已提交
510
    } else {
511
      fetchOffsetNew = taosxRsp.rspOffset;
512
    }
wmmhello's avatar
wmmhello 已提交
513

514 515 516
    tqDebug("taosx poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld",
            consumerId, pHandle->subKey, TD_VID(pTq->pVnode), taosxRsp.blockNum, taosxRsp.rspOffset.type,
            taosxRsp.rspOffset.uid, taosxRsp.rspOffset.version);
517 518
  }

519
  if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
wmmhello's avatar
wmmhello 已提交
520 521 522
    int64_t fetchVer = fetchOffsetNew.version + 1;
    pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
    if (pCkHead == NULL) {
523
      return -1;
524 525
    }

wmmhello's avatar
wmmhello 已提交
526 527 528 529 530 531
    walSetReaderCapacity(pHandle->pWalReader, 2048);

    while (1) {
      consumerEpoch = atomic_load_32(&pHandle->epoch);
      if (consumerEpoch > reqEpoch) {
        tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64
L
Liu Jicong 已提交
532
               ", found new consumer epoch %d, discard req epoch %d",
wmmhello's avatar
wmmhello 已提交
533 534 535 536 537
               consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
        break;
      }

      if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
538 539
        tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
        if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
540 541
          code = -1;
        }
542 543 544
        tDeleteSTaosxRsp(&taosxRsp);
        if (pCkHead) taosMemoryFree(pCkHead);
        return code;
wmmhello's avatar
wmmhello 已提交
545 546 547
      }

      SWalCont* pHead = &pCkHead->head;
wmmhello's avatar
wmmhello 已提交
548

wmmhello's avatar
wmmhello 已提交
549 550 551 552 553 554
      tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
              pReq->epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);

      if (pHead->msgType == TDMT_VND_SUBMIT) {
        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;

555
        if (tqTaosxScanLog(pTq, pHandle, pCont, &taosxRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
556 557 558 559
          /*ASSERT(0);*/
        }
        // TODO batch optimization:
        // TODO continue scan until meeting batch requirement
560 561 562
        if (taosxRsp.blockNum > 0 /* threshold */) {
          tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
          if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
563 564
            code = -1;
          }
565 566 567
          tDeleteSTaosxRsp(&taosxRsp);
          if (pCkHead) taosMemoryFree(pCkHead);
          return code;
wmmhello's avatar
wmmhello 已提交
568 569 570 571 572 573 574 575 576 577 578 579 580 581
        } else {
          fetchVer++;
        }

      } else {
        ASSERT(pHandle->fetchMeta);
        ASSERT(IS_META_MSG(pHead->msgType));
        tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
        tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
        metaRsp.resMsgType = pHead->msgType;
        metaRsp.metaRspLen = pHead->bodyLen;
        metaRsp.metaRsp = pHead->body;
        if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
          code = -1;
582 583
          taosMemoryFree(pCkHead);
          return code;
wmmhello's avatar
wmmhello 已提交
584 585
        }
        code = 0;
586 587
        if (pCkHead) taosMemoryFree(pCkHead);
        return code;
wmmhello's avatar
wmmhello 已提交
588 589 590
      }
    }
  }
591
  return 0;
L
Liu Jicong 已提交
592 593
}

594
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
595
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
L
Liu Jicong 已提交
596

597
  int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
L
Liu Jicong 已提交
598
  ASSERT(code == 0);
599

L
Liu Jicong 已提交
600 601
  tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);

L
Liu Jicong 已提交
602
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
603 604
    ASSERT(0);
  }
L
Liu Jicong 已提交
605
  return 0;
L
Liu Jicong 已提交
606 607
}

608 609 610
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
  STqCheckInfo info = {0};
  SDecoder     decoder;
L
Liu Jicong 已提交
611
  tDecoderInit(&decoder, msg, msgLen);
612
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
613 614 615 616
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
617 618 619 620 621
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
622 623 624 625 626 627
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

628 629 630 631 632 633 634 635 636 637 638 639 640
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
641
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
642 643
  tDecodeSMqRebVgReq(msg, &req);
  // todo lock
644
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
645
  if (pHandle == NULL) {
L
Liu Jicong 已提交
646 647 648 649
    if (req.oldConsumerId != -1) {
      tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey,
              req.newConsumerId, req.oldConsumerId);
    }
L
Liu Jicong 已提交
650
    ASSERT(req.newConsumerId != -1);
L
Liu Jicong 已提交
651 652
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
653 654
    /*taosInitRWLatch(&pExec->lock);*/

L
Liu Jicong 已提交
655 656 657
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
658

L
Liu Jicong 已提交
659
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
660
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
661

662 663 664 665
    // TODO version should be assigned and refed during preprocess
    SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
    if (pRef == NULL) {
      ASSERT(0);
L
Liu Jicong 已提交
666
      return -1;
667 668 669
    }
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
670

671 672 673 674 675 676 677
    SReadHandle handle = {
        .meta = pTq->pVnode->pMeta,
        .vnode = pTq->pVnode,
        .initTableReader = true,
        .initTqReader = true,
        .version = ver,
    };
wmmhello's avatar
wmmhello 已提交
678
    pHandle->snapshotVer = ver;
679

L
Liu Jicong 已提交
680
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
681
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
682
      req.qmsg = NULL;
683 684

      pHandle->execHandle.task =
wmmhello's avatar
wmmhello 已提交
685
          qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols,
L
Liu Jicong 已提交
686
                                   &pHandle->execHandle.pSchemaWrapper);
687
      ASSERT(pHandle->execHandle.task);
L
Liu Jicong 已提交
688
      void* scanner = NULL;
689
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
L
Liu Jicong 已提交
690 691 692
      ASSERT(scanner);
      pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
      ASSERT(pHandle->execHandle.pExecReader);
L
Liu Jicong 已提交
693
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
wmmhello's avatar
wmmhello 已提交
694
      pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
L
Liu Jicong 已提交
695
      pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
L
Liu Jicong 已提交
696
      pHandle->execHandle.execDb.pFilterOutTbUid =
L
Liu Jicong 已提交
697
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
698 699
      buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
700

701
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
L
Liu Jicong 已提交
702
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
wmmhello's avatar
wmmhello 已提交
703 704 705 706
      pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);

      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
707
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
H
Hongze Cheng 已提交
708
      vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
709
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
710 711
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
S
Shengliang Guan 已提交
712
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
L
Liu Jicong 已提交
713
      }
L
Liu Jicong 已提交
714 715
      pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
      tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
L
Liu Jicong 已提交
716
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
717

L
Liu Jicong 已提交
718 719 720
      buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
L
Liu Jicong 已提交
721
    }
722
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
S
Shengliang Guan 已提交
723
    tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
L
Liu Jicong 已提交
724 725
    if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
      // TODO
P
plum-lihui 已提交
726
      ASSERT(0);
L
Liu Jicong 已提交
727
    }
L
Liu Jicong 已提交
728
  } else {
L
Liu Jicong 已提交
729
    /*ASSERT(pExec->consumerId == req.oldConsumerId);*/
L
Liu Jicong 已提交
730
    // TODO handle qmsg and exec modification
L
Liu Jicong 已提交
731 732 733
    atomic_store_32(&pHandle->epoch, -1);
    atomic_store_64(&pHandle->consumerId, req.newConsumerId);
    atomic_add_fetch_32(&pHandle->epoch, 1);
L
Liu Jicong 已提交
734 735 736
    if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
      // TODO
    }
L
Liu Jicong 已提交
737
  }
L
Liu Jicong 已提交
738

L
Liu Jicong 已提交
739
  return 0;
L
Liu Jicong 已提交
740
}
741

L
Liu Jicong 已提交
742
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
743
  if (pTask->taskLevel == TASK_LEVEL__AGG) {
L
Liu Jicong 已提交
744 745
    ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
  }
L
Liu Jicong 已提交
746

L
Liu Jicong 已提交
747
  pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
L
Liu Jicong 已提交
748 749 750

  pTask->inputQueue = streamQueueOpen();
  pTask->outputQueue = streamQueueOpen();
L
Liu Jicong 已提交
751 752

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
753
    return -1;
L
Liu Jicong 已提交
754 755
  }

L
Liu Jicong 已提交
756 757 758
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;

759 760
  pTask->pMsgCb = &pTq->pVnode->msgCb;

761 762
  // expand executor
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
763 764 765 766 767
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
    if (pTask->pState == NULL) {
      return -1;
    }

768 769 770 771
    SReadHandle handle = {
        .meta = pTq->pVnode->pMeta,
        .vnode = pTq->pVnode,
        .initTqReader = 1,
772
        .pStateBackend = pTask->pState,
773 774 775 776
    };
    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
    ASSERT(pTask->exec.executor);
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
777 778 779 780
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
    if (pTask->pState == NULL) {
      return -1;
    }
781 782 783
    SReadHandle mgHandle = {
        .vnode = NULL,
        .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
784
        .pStateBackend = pTask->pState,
785 786
    };
    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
L
Liu Jicong 已提交
787
    ASSERT(pTask->exec.executor);
L
Liu Jicong 已提交
788
  }
L
Liu Jicong 已提交
789 790

  // sink
L
Liu Jicong 已提交
791
  /*pTask->ahandle = pTq->pVnode;*/
792
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
793
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
794
    pTask->smaSink.smaSink = smaHandleRes;
795
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
796 797 798
    pTask->tbSink.vnode = pTq->pVnode;
    pTask->tbSink.tbSinkFunc = tqTableSink;

L
Liu Jicong 已提交
799 800
    ASSERT(pTask->tbSink.pSchemaWrapper);
    ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
L
Liu Jicong 已提交
801

L
Liu Jicong 已提交
802
    pTask->tbSink.pTSchema =
C
Cary Xu 已提交
803
        tdGetSTSChemaFromSSChema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, 1);
L
Liu Jicong 已提交
804
    ASSERT(pTask->tbSink.pTSchema);
L
Liu Jicong 已提交
805
  }
806 807 808

  streamSetupTrigger(pTask);

L
Liu Jicong 已提交
809
  tqInfo("expand stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
810
         pTask->selfChildId);
L
Liu Jicong 已提交
811
  return 0;
L
Liu Jicong 已提交
812
}
L
Liu Jicong 已提交
813

814
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
815
  //
816
  return streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
L
Liu Jicong 已提交
817
}
L
Liu Jicong 已提交
818

L
Liu Jicong 已提交
819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
  bool        failed = false;
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    failed = true;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t sz = taosArrayGetSize(pRes->uidList);
  if (sz == 0) {
    taosArrayDestroy(pRes->uidList);
    return 0;
  }
  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, sz);
  pDelBlock->info.rows = sz;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < sz; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataAppend(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataAppend(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataAppend(pUidCol, i, (const char*)pUid, false);

    colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

L
Liu Jicong 已提交
860 861
  taosArrayDestroy(pRes->uidList);

L
Liu Jicong 已提交
862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
    if (pIter == NULL) break;
    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;

    qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);

    SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
    pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
    pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
    SSDataBlock block = {0};
    assignOneDataBlock(&block, pDelBlock);
    block.info.type = STREAM_DELETE_DATA;
    taosArrayPush(pStreamBlock->blocks, &block);

    if (!failed) {
      if (streamTaskInput(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
        qError("stream task input del failed, task id %d", pTask->taskId);
        continue;
      }

      if (streamSchedExec(pTask) < 0) {
        qError("stream task launch failed, task id %d", pTask->taskId);
        continue;
      }
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
893
  blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
894 895 896 897 898

  return 0;
}

int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
L
Liu Jicong 已提交
899 900 901
  void*              pIter = NULL;
  bool               failed = false;
  SStreamDataSubmit* pSubmit = NULL;
L
Liu Jicong 已提交
902

L
Liu Jicong 已提交
903
  pSubmit = streamDataSubmitNew(pReq);
L
Liu Jicong 已提交
904
  if (pSubmit == NULL) {
L
Liu Jicong 已提交
905 906
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    qError("failed to create data submit for stream since out of memory");
L
Liu Jicong 已提交
907 908 909 910
    failed = true;
  }

  while (1) {
L
Liu Jicong 已提交
911
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
L
Liu Jicong 已提交
912
    if (pIter == NULL) break;
913
    SStreamTask* pTask = *(SStreamTask**)pIter;
914
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
L
Liu Jicong 已提交
915

S
Shengliang Guan 已提交
916
    qDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);
L
Liu Jicong 已提交
917

L
Liu Jicong 已提交
918 919
    if (!failed) {
      if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
L
Liu Jicong 已提交
920
        qError("stream task input failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
921 922 923
        continue;
      }

L
Liu Jicong 已提交
924
      if (streamSchedExec(pTask) < 0) {
L
Liu Jicong 已提交
925
        qError("stream task launch failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
926 927
        continue;
      }
L
Liu Jicong 已提交
928
    } else {
L
Liu Jicong 已提交
929
      streamTaskInputFail(pTask);
L
Liu Jicong 已提交
930 931 932
    }
  }

L
Liu Jicong 已提交
933
  if (pSubmit) {
L
Liu Jicong 已提交
934
    streamDataSubmitRefDec(pSubmit);
L
Liu Jicong 已提交
935
    taosFreeQitem(pSubmit);
L
Liu Jicong 已提交
936
  }
L
Liu Jicong 已提交
937 938

  return failed ? -1 : 0;
L
Liu Jicong 已提交
939 940
}

L
Liu Jicong 已提交
941
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
942
  //
L
Liu Jicong 已提交
943 944
  SStreamTaskRunReq* pReq = pMsg->pCont;
  int32_t            taskId = pReq->taskId;
L
Liu Jicong 已提交
945 946 947
  SStreamTask*       pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
    streamProcessRunReq(pTask);
L
Liu Jicong 已提交
948
    return 0;
949 950
  } else {
    return -1;
L
Liu Jicong 已提交
951
  }
L
Liu Jicong 已提交
952 953
}

L
Liu Jicong 已提交
954
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
L
Liu Jicong 已提交
955
  ASSERT(0);
956 957 958 959 960
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamDispatchReq req;
  SDecoder           decoder;
L
Liu Jicong 已提交
961
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
962
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
963 964 965 966
  int32_t taskId = req.taskId;

  SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
967 968 969 970
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
971
    streamProcessDispatchReq(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
972
    return 0;
973 974
  } else {
    return -1;
L
Liu Jicong 已提交
975
  }
L
Liu Jicong 已提交
976 977
}

L
Liu Jicong 已提交
978
#if 0
L
Liu Jicong 已提交
979 980 981
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRecoverReq* pReq = pMsg->pCont;
  int32_t                taskId = pReq->taskId;
L
Liu Jicong 已提交
982 983 984
  SStreamTask*           pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
    streamProcessRecoverReq(pTask, pReq, pMsg);
L
Liu Jicong 已提交
985
    return 0;
986 987
  } else {
    return -1;
L
Liu Jicong 已提交
988
  }
L
Liu Jicong 已提交
989 990
}

L
Liu Jicong 已提交
991 992 993 994 995
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
  int32_t                taskId = pRsp->rspTaskId;

  SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
996
  if (pTask) {
L
Liu Jicong 已提交
997
    streamProcessRecoverRsp(pTask, pRsp);
L
Liu Jicong 已提交
998
    return 0;
999 1000
  } else {
    return -1;
L
Liu Jicong 已提交
1001
  }
L
Liu Jicong 已提交
1002
}
L
Liu Jicong 已提交
1003
#endif
L
Liu Jicong 已提交
1004

L
Liu Jicong 已提交
1005 1006 1007 1008
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t             taskId = pRsp->taskId;
  SStreamTask*        pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1009
  if (pTask) {
L
Liu Jicong 已提交
1010
    streamProcessDispatchRsp(pTask, pRsp);
L
Liu Jicong 已提交
1011
    return 0;
1012 1013
  } else {
    return -1;
L
Liu Jicong 已提交
1014
  }
L
Liu Jicong 已提交
1015
}
L
Liu Jicong 已提交
1016

1017
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1018
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
L
Liu Jicong 已提交
1019

L
Liu Jicong 已提交
1020
  return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1021
}
L
Liu Jicong 已提交
1022 1023 1024 1025 1026 1027 1028 1029 1030

int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
  tDecoderInit(&decoder, msgBody, msgLen);
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1031 1032 1033
  int32_t      taskId = req.dstTaskId;
  SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
L
Liu Jicong 已提交
1034 1035 1036 1037
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1038 1039
    streamProcessRetrieveReq(pTask, &req, &rsp);
    return 0;
L
Liu Jicong 已提交
1040 1041
  } else {
    return -1;
L
Liu Jicong 已提交
1042 1043 1044 1045 1046 1047 1048
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061

void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*    pTq = pVnode->pTq;
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t code = 0;

  SStreamDispatchReq req;
  SDecoder           decoder;
  tDecoderInit(&decoder, msgBody, msgLen);
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1062
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1063 1064
    goto FAIL;
  }
L
Liu Jicong 已提交
1065
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1066

L
Liu Jicong 已提交
1067
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1068

L
Liu Jicong 已提交
1069 1070
  SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
L
Liu Jicong 已提交
1071 1072 1073 1074
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1075
    streamProcessDispatchReq(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1076 1077
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
L
Liu Jicong 已提交
1078 1079
    return;
  }
L
Liu Jicong 已提交
1080

L
Liu Jicong 已提交
1081 1082 1083 1084 1085 1086 1087
FAIL:
  if (pMsg->info.handle == NULL) return;
  SRpcMsg rsp = {
      .code = code,
      .info = pMsg->info,
  };
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1088 1089
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
L
Liu Jicong 已提交
1090
}