tq.c 30.8 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

L
Liu Jicong 已提交
18
int32_t tqInit() {
L
Liu Jicong 已提交
19 20 21 22 23 24
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

25 26 27 28 29 30
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
31 32 33
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
34
    atomic_store_8(&tqMgmt.inited, 1);
35
  }
36

L
Liu Jicong 已提交
37 38
  return 0;
}
L
Liu Jicong 已提交
39

40
void tqCleanUp() {
L
Liu Jicong 已提交
41 42 43 44 45 46 47 48
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
49
    streamCleanUp();
L
Liu Jicong 已提交
50 51
    atomic_store_8(&tqMgmt.inited, 0);
  }
52
}
L
Liu Jicong 已提交
53

L
Liu Jicong 已提交
54
STQ* tqOpen(const char* path, SVnode* pVnode) {
55
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
56
  if (pTq == NULL) {
L
Liu Jicong 已提交
57
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
58 59
    return NULL;
  }
H
Hongze Cheng 已提交
60
  pTq->path = strdup(path);
L
Liu Jicong 已提交
61
  pTq->pVnode = pVnode;
62

63
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
64

65
  pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
66

67
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
68

L
Liu Jicong 已提交
69
  if (tqMetaOpen(pTq) < 0) {
70 71 72
    ASSERT(0);
  }

73 74 75 76
  if (tqOffsetOpen(pTq) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
77 78 79 80 81
  pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask);
  if (pTq->pStreamMeta == NULL) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
82 83 84 85
  if (streamLoadTasks(pTq->pStreamMeta) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
86 87
  return pTq;
}
L
Liu Jicong 已提交
88

L
Liu Jicong 已提交
89
void tqClose(STQ* pTq) {
H
Hongze Cheng 已提交
90
  if (pTq) {
91
    tqOffsetClose(pTq->pOffsetStore);
92 93 94
    taosHashCleanup(pTq->pHandle);
    taosHashCleanup(pTq->pPushMgr);
    taosHashCleanup(pTq->pCheckInfo);
95
    taosMemoryFree(pTq->path);
L
Liu Jicong 已提交
96
    tqMetaClose(pTq);
L
Liu Jicong 已提交
97
    streamMetaClose(pTq->pStreamMeta);
wafwerar's avatar
wafwerar 已提交
98
    taosMemoryFree(pTq);
H
Hongze Cheng 已提交
99
  }
L
Liu Jicong 已提交
100
}
L
Liu Jicong 已提交
101

L
Liu Jicong 已提交
102
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
103 104 105 106 107 108 109
  int32_t len = 0;
  int32_t code = 0;
  tEncodeSize(tEncodeSMqMetaRsp, pRsp, len, code);
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
L
Liu Jicong 已提交
110 111 112 113 114 115 116 117 118 119
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_META_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
120 121 122 123 124

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
  tEncodeSMqMetaRsp(&encoder, pRsp);
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
125 126 127 128 129 130 131 132 133

  SRpcMsg resp = {
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
  tmsgSendRsp(&resp);

134 135
  tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, offset type:%d",
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
L
Liu Jicong 已提交
136 137 138 139

  return 0;
}

L
Liu Jicong 已提交
140 141 142 143 144 145 146 147 148 149
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) {
  ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
  ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);

  if (pRsp->withSchema) {
    ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
  } else {
    ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
  }

150 151 152 153 154 155
  if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
    if (pRsp->blockNum > 0) {
      ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
    } else {
      ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
    }
L
Liu Jicong 已提交
156 157
  }

wmmhello's avatar
wmmhello 已提交
158 159
  int32_t len = 0;
  int32_t code = 0;
L
Liu Jicong 已提交
160 161 162 163 164
  tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
L
Liu Jicong 已提交
165 166 167 168 169 170 171 172 173 174 175
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

wmmhello's avatar
wmmhello 已提交
176
  SEncoder encoder = {0};
L
Liu Jicong 已提交
177
  tEncoderInit(&encoder, abuf, len);
L
Liu Jicong 已提交
178
  tEncodeSMqDataRsp(&encoder, pRsp);
wmmhello's avatar
wmmhello 已提交
179
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
180 181

  SRpcMsg rsp = {
L
Liu Jicong 已提交
182 183 184 185 186
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
L
Liu Jicong 已提交
187
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
188

wmmhello's avatar
wmmhello 已提交
189 190
  char buf1[80] = {0};
  char buf2[80] = {0};
L
Liu Jicong 已提交
191 192
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);
S
Shengliang Guan 已提交
193
  tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
L
Liu Jicong 已提交
194
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
L
Liu Jicong 已提交
195 196 197 198

  return 0;
}

199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
  ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
  ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);

  if (pRsp->withSchema) {
    ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
  } else {
    ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
  }

  if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
    if (pRsp->blockNum > 0) {
      ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
    } else {
      ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
    }
  }

  int32_t len = 0;
  int32_t code = 0;
  tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTaosxRsp(&encoder, pRsp);
  tEncoderClear(&encoder);

  SRpcMsg rsp = {
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
  tmsgSendRsp(&rsp);

  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);
  tqDebug("taosx rsp, vgId:%d, from consumer:%" PRId64
          ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);

  return 0;
}

259 260 261 262 263 264
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
265 266 267 268 269 270 271 272 273
  STqOffset offset = {0};
  SDecoder  decoder;
  tDecoderInit(&decoder, msg, msgLen);
  if (tDecodeSTqOffset(&decoder, &offset) < 0) {
    ASSERT(0);
    return -1;
  }
  tDecoderClear(&decoder);

wmmhello's avatar
wmmhello 已提交
274
  if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
275 276
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
            offset.subKey, TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts);
L
Liu Jicong 已提交
277
  } else if (offset.val.type == TMQ_OFFSET__LOG) {
S
Shengliang Guan 已提交
278
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
L
Liu Jicong 已提交
279
            TD_VID(pTq->pVnode), offset.val.version);
280 281 282
    if (offset.val.version + 1 == version) {
      offset.val.version += 1;
    }
283 284 285
  } else {
    ASSERT(0);
  }
286 287 288 289 290
  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
  if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) {
    return 0;
  }

291 292 293
  if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
    ASSERT(0);
    return -1;
294
  }
295 296

  if (offset.val.type == TMQ_OFFSET__LOG) {
297
    STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
L
Liu Jicong 已提交
298 299 300 301 302
    if (pHandle) {
      if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
        ASSERT(0);
        return -1;
      }
303 304 305
    }
  }

306 307
  // rsp

308 309
  /*}*/
  /*}*/
310 311 312 313

  return 0;
}

L
Liu Jicong 已提交
314
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
315 316
  void* pIter = NULL;
  while (1) {
317
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
318
    if (pIter == NULL) break;
319
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
L
Liu Jicong 已提交
320 321
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
322
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
323 324
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
325
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
326 327 328 329 330 331 332 333
          return -1;
        }
      }
    }
  }
  return 0;
}

L
Liu Jicong 已提交
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL) {
    return -1;
  }

  pRsp->withTbName = pReq->withTbName;
  if (pRsp->withTbName) {
    pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
    if (pRsp->blockTbName == NULL) {
      // TODO free
      return -1;
    }
  }

  if (subType == TOPIC_SUB_TYPE__COLUMN) {
    pRsp->withSchema = false;
  } else {
    pRsp->withSchema = true;
    pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
    if (pRsp->blockSchema == NULL) {
      // TODO free
      return -1;
    }
  }
  return 0;
}

366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->withTbName = 1;
  pRsp->withSchema = 1;
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
382
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
383 384 385 386 387 388
  SMqPollReq*  pReq = pMsg->pCont;
  int64_t      consumerId = pReq->consumerId;
  int32_t      reqEpoch = pReq->epoch;
  int32_t      code = 0;
  STqOffsetVal reqOffset = pReq->reqOffset;
  STqOffsetVal fetchOffsetNew;
wmmhello's avatar
wmmhello 已提交
389
  SWalCkHead*  pCkHead = NULL;
L
Liu Jicong 已提交
390 391

  // 1.find handle
392
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
L
Liu Jicong 已提交
393 394
  /*ASSERT(pHandle);*/
  if (pHandle == NULL) {
S
Shengliang Guan 已提交
395 396
    tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId,
            TD_VID(pTq->pVnode), pReq->subKey);
L
Liu Jicong 已提交
397 398 399 400 401
    return -1;
  }

  // check rebalance
  if (pHandle->consumerId != consumerId) {
S
Shengliang Guan 已提交
402 403
    tqError("tmq poll: consumer handle mismatch for consumer:%" PRId64
            ", in vgId:%d, subkey %s, handle consumer id %" PRId64,
L
Liu Jicong 已提交
404
            consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId);
L
Liu Jicong 已提交
405
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
L
Liu Jicong 已提交
406 407 408 409 410 411 412 413 414
    return -1;
  }

  // update epoch if need
  int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
  while (consumerEpoch < reqEpoch) {
    consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
  }

L
Liu Jicong 已提交
415 416
  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
S
Shengliang Guan 已提交
417
  tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
L
Liu Jicong 已提交
418 419
          pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);

L
Liu Jicong 已提交
420 421 422 423 424 425 426
  // 2.reset offset if needed
  if (reqOffset.type > 0) {
    fetchOffsetNew = reqOffset;
  } else {
    STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
    if (pOffset != NULL) {
      fetchOffsetNew = pOffset->val;
L
Liu Jicong 已提交
427 428
      char formatBuf[80];
      tFormatOffset(formatBuf, 80, &fetchOffsetNew);
L
Liu Jicong 已提交
429 430
      tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey,
              TD_VID(pTq->pVnode), formatBuf);
L
Liu Jicong 已提交
431 432
    } else {
      if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
L
Liu Jicong 已提交
433 434
        if (pReq->useSnapshot) {
          if (pHandle->fetchMeta) {
wmmhello's avatar
wmmhello 已提交
435
            tqOffsetResetToMeta(&fetchOffsetNew, 0);
L
Liu Jicong 已提交
436
          } else {
437
            tqOffsetResetToData(&fetchOffsetNew, 0, 0);
L
Liu Jicong 已提交
438 439 440 441 442
          }
        } else {
          tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal));
        }
      } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
443 444 445
        SMqDataRsp dataRsp = {0};
        tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);

L
Liu Jicong 已提交
446
        tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
S
Shengliang Guan 已提交
447 448
        tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId,
                pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
L
Liu Jicong 已提交
449 450 451
        if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
          code = -1;
        }
L
Liu Jicong 已提交
452 453
        tDeleteSMqDataRsp(&dataRsp);
        return code;
L
Liu Jicong 已提交
454
      } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
455 456
        tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64
                " in vg %d, subkey %s, reset none failed",
L
Liu Jicong 已提交
457
                pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
L
Liu Jicong 已提交
458
        terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
459
        return -1;
L
Liu Jicong 已提交
460 461 462 463
      }
    }
  }

464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
    SMqDataRsp dataRsp = {0};
    tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
    tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);

    if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
      code = -1;
    }

    tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid:%ld, version:%ld",
            consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
            dataRsp.rspOffset.uid, dataRsp.rspOffset.version);

    tDeleteSMqDataRsp(&dataRsp);
    return code;
  }

  // for taosx
  ASSERT(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN);

  SMqMetaRsp metaRsp = {0};

  STaosxRsp taosxRsp = {0};
  tqInitTaosxRsp(&taosxRsp, pReq);

  if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
    tqScan(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew);
L
Liu Jicong 已提交
491

L
Liu Jicong 已提交
492
    if (metaRsp.metaRspLen > 0) {
wmmhello's avatar
wmmhello 已提交
493 494 495
      if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
        code = -1;
      }
L
Liu Jicong 已提交
496 497 498
      tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send meta offset type:%d,uid:%ld,version:%ld", consumerId,
              pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
              metaRsp.rspOffset.version);
wmmhello's avatar
wmmhello 已提交
499
      taosMemoryFree(metaRsp.metaRsp);
500 501
      tDeleteSTaosxRsp(&taosxRsp);
      return code;
wmmhello's avatar
wmmhello 已提交
502 503
    }

504 505
    if (taosxRsp.blockNum > 0) {
      if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
506 507
        code = -1;
      }
508 509
      tDeleteSTaosxRsp(&taosxRsp);
      return code;
L
Liu Jicong 已提交
510
    } else {
511
      fetchOffsetNew = taosxRsp.rspOffset;
512
    }
wmmhello's avatar
wmmhello 已提交
513

514 515 516
    tqDebug("taosx poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld",
            consumerId, pHandle->subKey, TD_VID(pTq->pVnode), taosxRsp.blockNum, taosxRsp.rspOffset.type,
            taosxRsp.rspOffset.uid, taosxRsp.rspOffset.version);
517 518
  }

519
  if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
wmmhello's avatar
wmmhello 已提交
520 521 522
    int64_t fetchVer = fetchOffsetNew.version + 1;
    pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
    if (pCkHead == NULL) {
523
      return -1;
524 525
    }

wmmhello's avatar
wmmhello 已提交
526 527 528 529 530 531
    walSetReaderCapacity(pHandle->pWalReader, 2048);

    while (1) {
      consumerEpoch = atomic_load_32(&pHandle->epoch);
      if (consumerEpoch > reqEpoch) {
        tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64
L
Liu Jicong 已提交
532
               ", found new consumer epoch %d, discard req epoch %d",
wmmhello's avatar
wmmhello 已提交
533 534 535 536 537
               consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
        break;
      }

      if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
538 539
        tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
        if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
540 541
          code = -1;
        }
542 543 544
        tDeleteSTaosxRsp(&taosxRsp);
        if (pCkHead) taosMemoryFree(pCkHead);
        return code;
wmmhello's avatar
wmmhello 已提交
545 546 547
      }

      SWalCont* pHead = &pCkHead->head;
wmmhello's avatar
wmmhello 已提交
548

wmmhello's avatar
wmmhello 已提交
549 550 551 552 553 554
      tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
              pReq->epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);

      if (pHead->msgType == TDMT_VND_SUBMIT) {
        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;

555
        if (tqTaosxScanLog(pTq, pHandle, pCont, &taosxRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
556 557 558 559
          /*ASSERT(0);*/
        }
        // TODO batch optimization:
        // TODO continue scan until meeting batch requirement
560 561 562
        if (taosxRsp.blockNum > 0 /* threshold */) {
          tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
          if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
563 564
            code = -1;
          }
565 566 567
          tDeleteSTaosxRsp(&taosxRsp);
          if (pCkHead) taosMemoryFree(pCkHead);
          return code;
wmmhello's avatar
wmmhello 已提交
568 569 570 571 572 573 574 575 576 577 578 579 580 581
        } else {
          fetchVer++;
        }

      } else {
        ASSERT(pHandle->fetchMeta);
        ASSERT(IS_META_MSG(pHead->msgType));
        tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
        tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
        metaRsp.resMsgType = pHead->msgType;
        metaRsp.metaRspLen = pHead->bodyLen;
        metaRsp.metaRsp = pHead->body;
        if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
          code = -1;
582 583
          taosMemoryFree(pCkHead);
          return code;
wmmhello's avatar
wmmhello 已提交
584 585
        }
        code = 0;
586 587
        if (pCkHead) taosMemoryFree(pCkHead);
        return code;
wmmhello's avatar
wmmhello 已提交
588 589 590
      }
    }
  }
591
  return 0;
L
Liu Jicong 已提交
592 593
}

594
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
595
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
L
Liu Jicong 已提交
596

597
  int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
L
Liu Jicong 已提交
598
  ASSERT(code == 0);
599

L
Liu Jicong 已提交
600 601
  tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);

L
Liu Jicong 已提交
602
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
603 604
    ASSERT(0);
  }
L
Liu Jicong 已提交
605
  return 0;
L
Liu Jicong 已提交
606 607
}

608 609 610
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
  STqCheckInfo info = {0};
  SDecoder     decoder;
L
Liu Jicong 已提交
611
  tDecoderInit(&decoder, msg, msgLen);
612
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
613 614 615 616
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
617 618 619 620 621
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
622 623 624 625 626 627
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

628 629 630 631 632 633 634 635 636 637 638 639 640
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
641
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
642 643
  tDecodeSMqRebVgReq(msg, &req);
  // todo lock
644
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
645
  if (pHandle == NULL) {
L
Liu Jicong 已提交
646 647 648 649
    if (req.oldConsumerId != -1) {
      tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey,
              req.newConsumerId, req.oldConsumerId);
    }
L
Liu Jicong 已提交
650
    ASSERT(req.newConsumerId != -1);
L
Liu Jicong 已提交
651 652
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
653 654
    /*taosInitRWLatch(&pExec->lock);*/

L
Liu Jicong 已提交
655 656 657
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
658

L
Liu Jicong 已提交
659
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
660
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
661

662 663 664 665
    // TODO version should be assigned and refed during preprocess
    SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
    if (pRef == NULL) {
      ASSERT(0);
L
Liu Jicong 已提交
666
      return -1;
667 668 669
    }
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
670

671 672 673 674 675 676 677
    SReadHandle handle = {
        .meta = pTq->pVnode->pMeta,
        .vnode = pTq->pVnode,
        .initTableReader = true,
        .initTqReader = true,
        .version = ver,
    };
wmmhello's avatar
wmmhello 已提交
678
    pHandle->snapshotVer = ver;
679

L
Liu Jicong 已提交
680
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
681
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
682
      req.qmsg = NULL;
683 684

      pHandle->execHandle.task =
wmmhello's avatar
wmmhello 已提交
685
          qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols,
L
Liu Jicong 已提交
686
                                   &pHandle->execHandle.pSchemaWrapper);
687
      ASSERT(pHandle->execHandle.task);
L
Liu Jicong 已提交
688
      void* scanner = NULL;
689
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
L
Liu Jicong 已提交
690 691 692
      ASSERT(scanner);
      pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
      ASSERT(pHandle->execHandle.pExecReader);
L
Liu Jicong 已提交
693
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
wmmhello's avatar
wmmhello 已提交
694
      pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
L
Liu Jicong 已提交
695
      pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
L
Liu Jicong 已提交
696
      pHandle->execHandle.execDb.pFilterOutTbUid =
L
Liu Jicong 已提交
697
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
698 699
      buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
700

701
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
L
Liu Jicong 已提交
702
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
wmmhello's avatar
wmmhello 已提交
703 704 705 706
      pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);

      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
707
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
H
Hongze Cheng 已提交
708
      vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
709
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
710 711
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
S
Shengliang Guan 已提交
712
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
L
Liu Jicong 已提交
713
      }
L
Liu Jicong 已提交
714 715
      pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
      tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
L
Liu Jicong 已提交
716
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
717

L
Liu Jicong 已提交
718 719 720
      buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
L
Liu Jicong 已提交
721
    }
722
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
S
Shengliang Guan 已提交
723
    tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
L
Liu Jicong 已提交
724 725
    if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
      // TODO
P
plum-lihui 已提交
726
      ASSERT(0);
L
Liu Jicong 已提交
727
    }
L
Liu Jicong 已提交
728
  } else {
L
Liu Jicong 已提交
729
    /*ASSERT(pExec->consumerId == req.oldConsumerId);*/
L
Liu Jicong 已提交
730
    // TODO handle qmsg and exec modification
L
Liu Jicong 已提交
731 732 733
    atomic_store_32(&pHandle->epoch, -1);
    atomic_store_64(&pHandle->consumerId, req.newConsumerId);
    atomic_add_fetch_32(&pHandle->epoch, 1);
L
Liu Jicong 已提交
734 735 736
    if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
      // TODO
    }
L
Liu Jicong 已提交
737
  }
L
Liu Jicong 已提交
738

L
Liu Jicong 已提交
739
  return 0;
L
Liu Jicong 已提交
740
}
741

L
Liu Jicong 已提交
742
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
743
  if (pTask->taskLevel == TASK_LEVEL__AGG) {
L
Liu Jicong 已提交
744 745
    ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
  }
L
Liu Jicong 已提交
746

L
Liu Jicong 已提交
747
  pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
L
Liu Jicong 已提交
748 749 750

  pTask->inputQueue = streamQueueOpen();
  pTask->outputQueue = streamQueueOpen();
L
Liu Jicong 已提交
751 752

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
753
    return -1;
L
Liu Jicong 已提交
754 755
  }

L
Liu Jicong 已提交
756 757 758
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;

759 760
  pTask->pMsgCb = &pTq->pVnode->msgCb;

761 762
  // expand executor
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
763 764 765 766 767
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
    if (pTask->pState == NULL) {
      return -1;
    }

768 769 770 771
    SReadHandle handle = {
        .meta = pTq->pVnode->pMeta,
        .vnode = pTq->pVnode,
        .initTqReader = 1,
772
        .pStateBackend = pTask->pState,
773 774 775 776
    };
    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
    ASSERT(pTask->exec.executor);
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
777 778 779 780
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
    if (pTask->pState == NULL) {
      return -1;
    }
781 782 783
    SReadHandle mgHandle = {
        .vnode = NULL,
        .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
784
        .pStateBackend = pTask->pState,
785 786
    };
    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
L
Liu Jicong 已提交
787
    ASSERT(pTask->exec.executor);
L
Liu Jicong 已提交
788
  }
L
Liu Jicong 已提交
789 790

  // sink
L
Liu Jicong 已提交
791
  /*pTask->ahandle = pTq->pVnode;*/
792
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
793
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
794
    pTask->smaSink.smaSink = smaHandleRes;
795
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
796 797 798
    pTask->tbSink.vnode = pTq->pVnode;
    pTask->tbSink.tbSinkFunc = tqTableSink;

L
Liu Jicong 已提交
799 800
    ASSERT(pTask->tbSink.pSchemaWrapper);
    ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
L
Liu Jicong 已提交
801

L
Liu Jicong 已提交
802
    pTask->tbSink.pTSchema =
C
Cary Xu 已提交
803
        tdGetSTSChemaFromSSChema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, 1);
L
Liu Jicong 已提交
804
    ASSERT(pTask->tbSink.pTSchema);
L
Liu Jicong 已提交
805
  }
806 807 808

  streamSetupTrigger(pTask);

L
Liu Jicong 已提交
809
  tqInfo("expand stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
810
         pTask->selfChildId);
L
Liu Jicong 已提交
811
  return 0;
L
Liu Jicong 已提交
812
}
L
Liu Jicong 已提交
813

814
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
815
  //
816
  return streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
L
Liu Jicong 已提交
817
}
L
Liu Jicong 已提交
818

L
Liu Jicong 已提交
819
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
L
Liu Jicong 已提交
820 821 822
  void*              pIter = NULL;
  bool               failed = false;
  SStreamDataSubmit* pSubmit = NULL;
L
Liu Jicong 已提交
823

L
Liu Jicong 已提交
824
  pSubmit = streamDataSubmitNew(pReq);
L
Liu Jicong 已提交
825
  if (pSubmit == NULL) {
L
Liu Jicong 已提交
826 827
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    qError("failed to create data submit for stream since out of memory");
L
Liu Jicong 已提交
828 829 830 831
    failed = true;
  }

  while (1) {
L
Liu Jicong 已提交
832
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
L
Liu Jicong 已提交
833
    if (pIter == NULL) break;
834
    SStreamTask* pTask = *(SStreamTask**)pIter;
835
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
L
Liu Jicong 已提交
836

S
Shengliang Guan 已提交
837
    qDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);
L
Liu Jicong 已提交
838

L
Liu Jicong 已提交
839 840
    if (!failed) {
      if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
L
Liu Jicong 已提交
841
        qError("stream task input failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
842 843 844
        continue;
      }

L
Liu Jicong 已提交
845
      if (streamSchedExec(pTask) < 0) {
L
Liu Jicong 已提交
846
        qError("stream task launch failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
847 848
        continue;
      }
L
Liu Jicong 已提交
849
    } else {
L
Liu Jicong 已提交
850
      streamTaskInputFail(pTask);
L
Liu Jicong 已提交
851 852 853
    }
  }

L
Liu Jicong 已提交
854
  if (pSubmit) {
L
Liu Jicong 已提交
855
    streamDataSubmitRefDec(pSubmit);
L
Liu Jicong 已提交
856
    taosFreeQitem(pSubmit);
L
Liu Jicong 已提交
857
  }
L
Liu Jicong 已提交
858 859

  return failed ? -1 : 0;
L
Liu Jicong 已提交
860 861
}

L
Liu Jicong 已提交
862
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
863
  //
L
Liu Jicong 已提交
864 865
  SStreamTaskRunReq* pReq = pMsg->pCont;
  int32_t            taskId = pReq->taskId;
L
Liu Jicong 已提交
866 867 868
  SStreamTask*       pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
    streamProcessRunReq(pTask);
L
Liu Jicong 已提交
869
    return 0;
870 871
  } else {
    return -1;
L
Liu Jicong 已提交
872
  }
L
Liu Jicong 已提交
873 874
}

L
Liu Jicong 已提交
875
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
L
Liu Jicong 已提交
876
  ASSERT(0);
877 878 879 880 881
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamDispatchReq req;
  SDecoder           decoder;
L
Liu Jicong 已提交
882
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
883
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
884 885 886 887
  int32_t taskId = req.taskId;

  SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
888 889 890 891
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
892
    streamProcessDispatchReq(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
893
    return 0;
894 895
  } else {
    return -1;
L
Liu Jicong 已提交
896
  }
L
Liu Jicong 已提交
897 898
}

L
Liu Jicong 已提交
899
#if 0
L
Liu Jicong 已提交
900 901 902
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRecoverReq* pReq = pMsg->pCont;
  int32_t                taskId = pReq->taskId;
L
Liu Jicong 已提交
903 904 905
  SStreamTask*           pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
    streamProcessRecoverReq(pTask, pReq, pMsg);
L
Liu Jicong 已提交
906
    return 0;
907 908
  } else {
    return -1;
L
Liu Jicong 已提交
909
  }
L
Liu Jicong 已提交
910 911
}

L
Liu Jicong 已提交
912 913 914 915 916
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
  int32_t                taskId = pRsp->rspTaskId;

  SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
917
  if (pTask) {
L
Liu Jicong 已提交
918
    streamProcessRecoverRsp(pTask, pRsp);
L
Liu Jicong 已提交
919
    return 0;
920 921
  } else {
    return -1;
L
Liu Jicong 已提交
922
  }
L
Liu Jicong 已提交
923
}
L
Liu Jicong 已提交
924
#endif
L
Liu Jicong 已提交
925

L
Liu Jicong 已提交
926 927 928 929
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t             taskId = pRsp->taskId;
  SStreamTask*        pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
930
  if (pTask) {
L
Liu Jicong 已提交
931
    streamProcessDispatchRsp(pTask, pRsp);
L
Liu Jicong 已提交
932
    return 0;
933 934
  } else {
    return -1;
L
Liu Jicong 已提交
935
  }
L
Liu Jicong 已提交
936
}
L
Liu Jicong 已提交
937

938
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
939
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
L
Liu Jicong 已提交
940

L
Liu Jicong 已提交
941
  return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
942
}
L
Liu Jicong 已提交
943 944 945 946 947 948 949 950 951

int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
  tDecoderInit(&decoder, msgBody, msgLen);
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
952 953 954
  int32_t      taskId = req.dstTaskId;
  SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
L
Liu Jicong 已提交
955 956 957 958
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
959 960
    streamProcessRetrieveReq(pTask, &req, &rsp);
    return 0;
L
Liu Jicong 已提交
961 962
  } else {
    return -1;
L
Liu Jicong 已提交
963 964 965 966 967 968 969
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
970 971 972 973 974 975 976 977 978 979 980 981 982

void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*    pTq = pVnode->pTq;
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t code = 0;

  SStreamDispatchReq req;
  SDecoder           decoder;
  tDecoderInit(&decoder, msgBody, msgLen);
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
983
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
984 985
    goto FAIL;
  }
L
Liu Jicong 已提交
986
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
987

L
Liu Jicong 已提交
988
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
989

L
Liu Jicong 已提交
990 991
  SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
L
Liu Jicong 已提交
992 993 994 995
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
996
    streamProcessDispatchReq(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
997 998
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
L
Liu Jicong 已提交
999 1000
    return;
  }
L
Liu Jicong 已提交
1001

L
Liu Jicong 已提交
1002 1003 1004 1005 1006 1007 1008
FAIL:
  if (pMsg->info.handle == NULL) return;
  SRpcMsg rsp = {
      .code = code,
      .info = pMsg->info,
  };
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1009 1010
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
L
Liu Jicong 已提交
1011
}