tq.c 30.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
17
#include "tdbInt.h"
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19
int32_t tqInit() {
L
Liu Jicong 已提交
20 21 22 23 24 25
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

26 27 28 29 30 31
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
L
Liu Jicong 已提交
32
    atomic_store_8(&tqMgmt.inited, 1);
33
  }
L
Liu Jicong 已提交
34 35
  return 0;
}
L
Liu Jicong 已提交
36

37
void tqCleanUp() {
L
Liu Jicong 已提交
38 39 40 41 42 43 44 45 46 47
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
    atomic_store_8(&tqMgmt.inited, 0);
  }
48
}
L
Liu Jicong 已提交
49

50 51 52 53
int tqExecKeyCompare(const void* pKey1, int32_t kLen1, const void* pKey2, int32_t kLen2) {
  return strcmp(pKey1, pKey2);
}

L
Liu Jicong 已提交
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
int32_t tqStoreExec(STQ* pTq, const char* key, const STqExec* pExec) {
  int32_t code;
  int32_t vlen;
  tEncodeSize(tEncodeSTqExec, pExec, vlen, code);
  ASSERT(code == 0);

  void* buf = taosMemoryCalloc(1, vlen);
  if (buf == NULL) {
    ASSERT(0);
  }

  SEncoder encoder;
  tEncoderInit(&encoder, buf, vlen);

  if (tEncodeSTqExec(&encoder, pExec) < 0) {
    ASSERT(0);
  }

  TXN txn;

  if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
    ASSERT(0);
  }

  if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
    ASSERT(0);
  }

  if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) {
    ASSERT(0);
  }

  if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
    ASSERT(0);
  }

  tEncoderClear(&encoder);
  taosMemoryFree(buf);
  return 0;
}

L
Liu Jicong 已提交
95
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
wafwerar's avatar
wafwerar 已提交
96
  STQ* pTq = taosMemoryMalloc(sizeof(STQ));
L
Liu Jicong 已提交
97
  if (pTq == NULL) {
L
Liu Jicong 已提交
98
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
99 100
    return NULL;
  }
H
Hongze Cheng 已提交
101
  pTq->path = strdup(path);
L
Liu Jicong 已提交
102
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
103
  pTq->pWal = pWal;
104

L
Liu Jicong 已提交
105
  pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
106

L
Liu Jicong 已提交
107 108
  pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);

L
Liu Jicong 已提交
109 110
  pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);

111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
  if (tdbOpen(path, 16 * 1024, 1, &pTq->pMetaStore) < 0) {
    ASSERT(0);
  }

  if (tdbTbOpen("exec", -1, -1, tqExecKeyCompare, pTq->pMetaStore, &pTq->pExecStore) < 0) {
    ASSERT(0);
  }

  TXN txn;

  if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
    ASSERT(0);
  }

  /*if (tdbBegin(pTq->pMetaStore, &txn) < 0) {*/
  /*ASSERT(0);*/
  /*}*/

  TBC* pCur;
  if (tdbTbcOpen(pTq->pExecStore, &pCur, &txn) < 0) {
    ASSERT(0);
  }

  void* pKey;
  int   kLen;
  void* pVal;
  int   vLen;

  tdbTbcMoveToFirst(pCur);
L
Liu Jicong 已提交
140
  SDecoder decoder;
141
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
L
Liu Jicong 已提交
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
    STqExec exec;
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
    tDecodeSTqExec(&decoder, &exec);
    exec.pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
    if (exec.subType == TOPIC_SUB_TYPE__TABLE) {
      for (int32_t i = 0; i < 5; i++) {
        exec.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);

        SReadHandle handle = {
            .reader = exec.pExecReader[i],
            .meta = pTq->pVnode->pMeta,
            .pMsgCb = &pTq->pVnode->msgCb,
        };
        exec.task[i] = qCreateStreamExecTaskInfo(exec.qmsg, &handle);
        ASSERT(exec.task[i]);
      }
    } else {
      for (int32_t i = 0; i < 5; i++) {
        exec.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
      }
      exec.pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
    }
    taosHashPut(pTq->execs, pKey, kLen, &exec, sizeof(STqExec));
165 166 167 168 169 170
  }

  if (tdbTxnClose(&txn) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
171 172
  return pTq;
}
L
Liu Jicong 已提交
173

L
Liu Jicong 已提交
174
void tqClose(STQ* pTq) {
H
Hongze Cheng 已提交
175
  if (pTq) {
wafwerar's avatar
wafwerar 已提交
176
    taosMemoryFreeClear(pTq->path);
L
Liu Jicong 已提交
177 178 179
    taosHashCleanup(pTq->execs);
    taosHashCleanup(pTq->pStreamTasks);
    taosHashCleanup(pTq->pushMgr);
180
    tdbClose(pTq->pMetaStore);
wafwerar's avatar
wafwerar 已提交
181
    taosMemoryFree(pTq);
H
Hongze Cheng 已提交
182
  }
L
Liu Jicong 已提交
183 184
  // TODO
}
L
Liu Jicong 已提交
185

186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeCStr(pEncoder, pExec->subKey) < 0) return -1;
  if (tEncodeI64(pEncoder, pExec->consumerId) < 0) return -1;
  if (tEncodeI32(pEncoder, pExec->epoch) < 0) return -1;
  if (tEncodeI8(pEncoder, pExec->subType) < 0) return -1;
  if (tEncodeI8(pEncoder, pExec->withTbName) < 0) return -1;
  if (tEncodeI8(pEncoder, pExec->withSchema) < 0) return -1;
  if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1;
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
    if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1;
  }
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pExec->subKey) < 0) return -1;
  if (tDecodeI64(pDecoder, &pExec->consumerId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pExec->epoch) < 0) return -1;
  if (tDecodeI8(pDecoder, &pExec->subType) < 0) return -1;
  if (tDecodeI8(pDecoder, &pExec->withTbName) < 0) return -1;
  if (tDecodeI8(pDecoder, &pExec->withSchema) < 0) return -1;
  if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1;
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
    if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1;
  }
  tEndDecode(pDecoder);
  return 0;
}
217
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
L
Liu Jicong 已提交
218
  void* pIter = NULL;
219 220 221
  while (1) {
    pIter = taosHashIterate(pTq->execs, pIter);
    if (pIter == NULL) break;
L
Liu Jicong 已提交
222
    STqExec* pExec = (STqExec*)pIter;
223
    if (pExec->subType == TOPIC_SUB_TYPE__DB) {
L
Liu Jicong 已提交
224
      if (!isAdd) {
225 226 227 228 229 230
        int32_t sz = taosArrayGetSize(tbUidList);
        for (int32_t i = 0; i < sz; i++) {
          int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
          taosHashPut(pExec->pDropTbUid, &tbUid, sizeof(int64_t), NULL, 0);
        }
      }
L
Liu Jicong 已提交
231 232 233 234 235
    } else {
      for (int32_t i = 0; i < 5; i++) {
        int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, isAdd);
        ASSERT(code == 0);
      }
236 237 238 239 240
    }
  }
  return 0;
}

dengyihao's avatar
dengyihao 已提交
241
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
L
Liu Jicong 已提交
242 243 244 245 246 247 248 249 250 251
  if (msgType != TDMT_VND_SUBMIT) return 0;
  void*       pIter = NULL;
  STqExec*    pExec = NULL;
  SSubmitReq* pReq = (SSubmitReq*)msg;
  int32_t     workerId = 4;
  int64_t     fetchOffset = ver;

  while (1) {
    pIter = taosHashIterate(pTq->pushMgr, pIter);
    if (pIter == NULL) break;
L
Liu Jicong 已提交
252
    pExec = *(STqExec**)pIter;
L
Liu Jicong 已提交
253 254 255 256 257 258 259 260

    taosWLockLatch(&pExec->pushHandle.lock);

    SRpcMsg* pMsg = atomic_load_ptr(&pExec->pushHandle.handle);
    ASSERT(pMsg);

    SMqDataBlkRsp rsp = {0};
    rsp.reqOffset = pExec->pushHandle.reqOffset;
261
    rsp.blockData = taosArrayInit(0, sizeof(void*));
L
Liu Jicong 已提交
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
    rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));

    if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
      qTaskInfo_t task = pExec->task[workerId];
      ASSERT(task);
      qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
      while (1) {
        SSDataBlock* pDataBlock = NULL;
        uint64_t     ts = 0;
        if (qExecTask(task, &pDataBlock, &ts) < 0) {
          ASSERT(0);
        }
        if (pDataBlock == NULL) break;

        ASSERT(pDataBlock->info.rows != 0);
        ASSERT(pDataBlock->info.numOfCols != 0);

        int32_t            dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock);
        void*              buf = taosMemoryCalloc(1, dataStrLen);
        SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
        pRetrieve->useconds = ts;
        pRetrieve->precision = TSDB_DEFAULT_PRECISION;
        pRetrieve->compressed = 0;
        pRetrieve->completed = 1;
        pRetrieve->numOfRows = htonl(pDataBlock->info.rows);

        // TODO enable compress
        int32_t actualLen = 0;
        blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false);
        actualLen += sizeof(SRetrieveTableRsp);
        ASSERT(actualLen <= dataStrLen);
        taosArrayPush(rsp.blockDataLen, &actualLen);
        taosArrayPush(rsp.blockData, &buf);
        rsp.blockNum++;
      }
    } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
      STqReadHandle* pReader = pExec->pExecReader[workerId];
      tqReadHandleSetMsg(pReader, pReq, 0);
      while (tqNextDataBlock(pReader)) {
        SSDataBlock block = {0};
302
        if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
L
Liu Jicong 已提交
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
                                &block.info.numOfCols) < 0) {
          ASSERT(0);
        }
        int32_t            dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block);
        void*              buf = taosMemoryCalloc(1, dataStrLen);
        SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
        /*pRetrieve->useconds = 0;*/
        pRetrieve->precision = TSDB_DEFAULT_PRECISION;
        pRetrieve->compressed = 0;
        pRetrieve->completed = 1;
        pRetrieve->numOfRows = htonl(block.info.rows);

        // TODO enable compress
        int32_t actualLen = 0;
        blockCompressEncode(&block, pRetrieve->data, &actualLen, block.info.numOfCols, false);
        actualLen += sizeof(SRetrieveTableRsp);
        ASSERT(actualLen <= dataStrLen);
        taosArrayPush(rsp.blockDataLen, &actualLen);
        taosArrayPush(rsp.blockData, &buf);
        rsp.blockNum++;
      }
    } else {
      ASSERT(0);
    }

    if (rsp.blockNum == 0) {
      taosWUnLockLatch(&pExec->pushHandle.lock);
      continue;
    }

    ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
    ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);

    rsp.rspOffset = fetchOffset;

    int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
    void*   buf = rpcMallocCont(tlen);
    if (buf == NULL) {
      pMsg->code = -1;
      return -1;
    }

    ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
    ((SMqRspHead*)buf)->epoch = pExec->pushHandle.epoch;
    ((SMqRspHead*)buf)->consumerId = pExec->pushHandle.consumerId;

    void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
    tEncodeSMqDataBlkRsp(&abuf, &rsp);
dengyihao's avatar
dengyihao 已提交
351 352 353

    SRpcMsg resp = {.info = handleInfo, .pCont = buf, .contLen = tlen, .code = 0};
    tmsgSendRsp(&resp);
L
Liu Jicong 已提交
354 355 356 357

    atomic_store_ptr(&pExec->pushHandle.handle, NULL);
    taosWUnLockLatch(&pExec->pushHandle.lock);

H
Hongze Cheng 已提交
358 359 360
    tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
            TD_VID(pTq->pVnode), fetchOffset, pExec->pushHandle.consumerId, pExec->pushHandle.epoch, rsp.blockNum,
            rsp.reqOffset, rsp.rspOffset);
L
Liu Jicong 已提交
361 362 363 364 365 366 367 368 369

    // TODO destroy
    taosArrayDestroy(rsp.blockData);
    taosArrayDestroy(rsp.blockDataLen);
  }

  return 0;
}

L
Liu Jicong 已提交
370
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
L
Liu Jicong 已提交
371 372
  if (msgType == TDMT_VND_SUBMIT) {
    if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0;
L
Liu Jicong 已提交
373

L
Liu Jicong 已提交
374
    if (tdUpdateExpireWindow(pTq->pVnode->pSma, msg, ver) != 0) {
L
Liu Jicong 已提交
375
      // TODO handle sma error
L
Liu Jicong 已提交
376 377 378 379 380 381
    }
    void* data = taosMemoryMalloc(msgLen);
    if (data == NULL) {
      return -1;
    }
    memcpy(data, msg, msgLen);
L
Liu Jicong 已提交
382

L
Liu Jicong 已提交
383
    tqProcessStreamTrigger(pTq, data);
C
Cary Xu 已提交
384
  }
L
Liu Jicong 已提交
385

L
Liu Jicong 已提交
386 387 388
  return 0;
}

L
Liu Jicong 已提交
389 390
int tqCommit(STQ* pTq) {
  // do nothing
L
Liu Jicong 已提交
391
  return 0;
L
Liu Jicong 已提交
392
}
L
Liu Jicong 已提交
393

L
fix  
Liu Jicong 已提交
394
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
L
Liu Jicong 已提交
395 396 397 398 399
  SMqPollReq* pReq = pMsg->pCont;
  int64_t     consumerId = pReq->consumerId;
  int64_t     waitTime = pReq->waitTime;
  int32_t     reqEpoch = pReq->epoch;
  int64_t     fetchOffset;
400

L
Liu Jicong 已提交
401
  // get offset to fetch message
L
Liu Jicong 已提交
402
  if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
L
Liu Jicong 已提交
403
    fetchOffset = walGetFirstVer(pTq->pWal);
L
Liu Jicong 已提交
404
  } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
L
Liu Jicong 已提交
405
    fetchOffset = walGetCommittedVer(pTq->pWal);
L
Liu Jicong 已提交
406 407 408 409
  } else {
    fetchOffset = pReq->currentOffset + 1;
  }

H
Hongze Cheng 已提交
410 411
  tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
          TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
L
fix  
Liu Jicong 已提交
412

L
Liu Jicong 已提交
413
  STqExec* pExec = taosHashGet(pTq->execs, pReq->subKey, strlen(pReq->subKey));
L
Liu Jicong 已提交
414
  ASSERT(pExec);
L
Liu Jicong 已提交
415

L
Liu Jicong 已提交
416
  int32_t consumerEpoch = atomic_load_32(&pExec->epoch);
L
Liu Jicong 已提交
417
  while (consumerEpoch < reqEpoch) {
L
Liu Jicong 已提交
418
    consumerEpoch = atomic_val_compare_exchange_32(&pExec->epoch, consumerEpoch, reqEpoch);
L
Liu Jicong 已提交
419 420
  }

421 422 423 424 425 426 427
  SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048);
  if (pHeadWithCkSum == NULL) {
    return -1;
  }

  walSetReaderCapacity(pExec->pWalReader, 2048);

L
Liu Jicong 已提交
428
  SMqDataBlkRsp rsp = {0};
L
Liu Jicong 已提交
429
  rsp.reqOffset = pReq->currentOffset;
L
Liu Jicong 已提交
430
  rsp.withSchema = pExec->withSchema;
L
Liu Jicong 已提交
431

L
Liu Jicong 已提交
432
  rsp.blockData = taosArrayInit(0, sizeof(void*));
L
Liu Jicong 已提交
433
  rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
L
Liu Jicong 已提交
434
  rsp.blockSchema = taosArrayInit(0, sizeof(void*));
L
Liu Jicong 已提交
435
  rsp.blockTbName = taosArrayInit(0, sizeof(void*));
436

437 438 439 440 441 442
  int8_t withTbName = pExec->withTbName;
  if (pReq->withTbName != -1) {
    withTbName = pReq->withTbName;
  }
  rsp.withTbName = withTbName;

443
  while (1) {
L
Liu Jicong 已提交
444
    consumerEpoch = atomic_load_32(&pExec->epoch);
L
Liu Jicong 已提交
445
    if (consumerEpoch > reqEpoch) {
H
Hongze Cheng 已提交
446 447
      tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
              consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
L
Liu Jicong 已提交
448 449
      break;
    }
L
Liu Jicong 已提交
450

451 452 453
    taosThreadMutexLock(&pExec->pWalReader->mutex);

    if (walFetchHead(pExec->pWalReader, fetchOffset, pHeadWithCkSum) < 0) {
H
Hongze Cheng 已提交
454 455
      tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
              TD_VID(pTq->pVnode), fetchOffset);
456 457 458 459 460
      taosThreadMutexUnlock(&pExec->pWalReader->mutex);
      break;
    }

    if (pHeadWithCkSum->head.msgType != TDMT_VND_SUBMIT) {
L
Liu Jicong 已提交
461
      ASSERT(walSkipFetchBody(pExec->pWalReader, pHeadWithCkSum) == 0);
462
    } else {
L
Liu Jicong 已提交
463
      ASSERT(walFetchBody(pExec->pWalReader, &pHeadWithCkSum) == 0);
464 465 466 467 468 469 470
    }

    SWalReadHead* pHead = &pHeadWithCkSum->head;

    taosThreadMutexUnlock(&pExec->pWalReader->mutex);

#if 0
L
fix  
Liu Jicong 已提交
471
    SWalReadHead* pHead;
L
Liu Jicong 已提交
472
    if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) {
L
Liu Jicong 已提交
473 474
      // TODO: no more log, set timer to wait blocking time
      // if data inserted during waiting, launch query and
L
Liu Jicong 已提交
475
      // response to user
H
Hongze Cheng 已提交
476
      tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
H
refact  
Hongze Cheng 已提交
477
             TD_VID(pTq->pVnode), fetchOffset);
L
Liu Jicong 已提交
478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499

#if 0
      // add to pushMgr
      taosWLockLatch(&pExec->pushHandle.lock);

      pExec->pushHandle.consumerId = consumerId;
      pExec->pushHandle.epoch = reqEpoch;
      pExec->pushHandle.reqOffset = rsp.reqOffset;
      pExec->pushHandle.skipLogNum = rsp.skipLogNum;
      pExec->pushHandle.handle = pMsg;

      taosWUnLockLatch(&pExec->pushHandle.lock);

      // TODO add timer

      // TODO: the pointer will always be valid?
      taosHashPut(pTq->pushMgr, &consumerId, sizeof(int64_t), &pExec, sizeof(void*));
      taosArrayDestroy(rsp.blockData);
      taosArrayDestroy(rsp.blockDataLen);
      return 0;
#endif

500 501 502
    break;
  }
#endif
L
Liu Jicong 已提交
503

H
Hongze Cheng 已提交
504 505
    tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
            TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
L
Liu Jicong 已提交
506

L
fix  
Liu Jicong 已提交
507 508
    if (pHead->msgType == TDMT_VND_SUBMIT) {
      SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
509
      // table subscribe
L
Liu Jicong 已提交
510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
      if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
        qTaskInfo_t task = pExec->task[workerId];
        ASSERT(task);
        qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
        while (1) {
          SSDataBlock* pDataBlock = NULL;
          uint64_t     ts = 0;
          if (qExecTask(task, &pDataBlock, &ts) < 0) {
            ASSERT(0);
          }
          if (pDataBlock == NULL) break;

          ASSERT(pDataBlock->info.rows != 0);
          ASSERT(pDataBlock->info.numOfCols != 0);

          int32_t            dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock);
          void*              buf = taosMemoryCalloc(1, dataStrLen);
          SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
          pRetrieve->useconds = ts;
          pRetrieve->precision = TSDB_DEFAULT_PRECISION;
          pRetrieve->compressed = 0;
          pRetrieve->completed = 1;
          pRetrieve->numOfRows = htonl(pDataBlock->info.rows);

          // TODO enable compress
          int32_t actualLen = 0;
          blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false);
          actualLen += sizeof(SRetrieveTableRsp);
          ASSERT(actualLen <= dataStrLen);
          taosArrayPush(rsp.blockDataLen, &actualLen);
          taosArrayPush(rsp.blockData, &buf);
L
Liu Jicong 已提交
541 542 543 544 545 546

          if (pExec->withSchema) {
            SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
            taosArrayPush(rsp.blockSchema, &pSW);
          }

547
          if (withTbName) {
L
Liu Jicong 已提交
548 549 550 551 552 553 554 555 556 557 558
            SMetaReader mr = {0};
            metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
            int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
            if (metaGetTableEntryByUid(&mr, uid) < 0) {
              ASSERT(0);
            }
            char* tbName = strdup(mr.me.name);
            taosArrayPush(rsp.blockTbName, &tbName);
            metaReaderClear(&mr);
          }

L
Liu Jicong 已提交
559
          rsp.blockNum++;
560
        }
561
        // db subscribe
L
Liu Jicong 已提交
562
      } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
L
Liu Jicong 已提交
563
        rsp.withSchema = 1;
L
Liu Jicong 已提交
564
        STqReadHandle* pReader = pExec->pExecReader[workerId];
L
Liu Jicong 已提交
565
        tqReadHandleSetMsg(pReader, pCont, 0);
566
        while (tqNextDataBlockFilterOut(pReader, pExec->pDropTbUid)) {
L
Liu Jicong 已提交
567
          SSDataBlock block = {0};
568
          if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
L
Liu Jicong 已提交
569
                                  &block.info.numOfCols) < 0) {
570
            if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
L
Liu Jicong 已提交
571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588
            ASSERT(0);
          }
          int32_t            dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block);
          void*              buf = taosMemoryCalloc(1, dataStrLen);
          SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
          /*pRetrieve->useconds = 0;*/
          pRetrieve->precision = TSDB_DEFAULT_PRECISION;
          pRetrieve->compressed = 0;
          pRetrieve->completed = 1;
          pRetrieve->numOfRows = htonl(block.info.rows);

          // TODO enable compress
          int32_t actualLen = 0;
          blockCompressEncode(&block, pRetrieve->data, &actualLen, block.info.numOfCols, false);
          actualLen += sizeof(SRetrieveTableRsp);
          ASSERT(actualLen <= dataStrLen);
          taosArrayPush(rsp.blockDataLen, &actualLen);
          taosArrayPush(rsp.blockData, &buf);
589
          if (withTbName) {
L
Liu Jicong 已提交
590 591 592 593 594 595 596 597 598
            SMetaReader mr = {0};
            metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
            if (metaGetTableEntryByUid(&mr, block.info.uid) < 0) {
              ASSERT(0);
            }
            char* tbName = strdup(mr.me.name);
            taosArrayPush(rsp.blockTbName, &tbName);
            metaReaderClear(&mr);
          }
L
Liu Jicong 已提交
599 600 601 602

          SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
          taosArrayPush(rsp.blockSchema, &pSW);

L
Liu Jicong 已提交
603 604 605 606
          rsp.blockNum++;
        }
      } else {
        ASSERT(0);
L
fix  
Liu Jicong 已提交
607
      }
L
Liu Jicong 已提交
608
    }
609

L
Liu Jicong 已提交
610 611
    // TODO batch optimization:
    // TODO continue scan until meeting batch requirement
L
Liu Jicong 已提交
612 613 614 615
    if (rsp.blockNum != 0) break;
    rsp.skipLogNum++;
    fetchOffset++;
  }
616

L
Liu Jicong 已提交
617
  taosMemoryFree(pHeadWithCkSum);
L
Liu Jicong 已提交
618 619
  ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
  ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
L
fix  
Liu Jicong 已提交
620

L
Liu Jicong 已提交
621 622 623 624
  if (rsp.blockNum != 0)
    rsp.rspOffset = fetchOffset;
  else
    rsp.rspOffset = fetchOffset - 1;
625

L
Liu Jicong 已提交
626
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
627 628 629 630 631
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    pMsg->code = -1;
    return -1;
  }
L
Liu Jicong 已提交
632

L
Liu Jicong 已提交
633 634
  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
L
Liu Jicong 已提交
635
  ((SMqRspHead*)buf)->consumerId = consumerId;
636

L
Liu Jicong 已提交
637
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
638
  tEncodeSMqDataBlkRsp(&abuf, &rsp);
dengyihao's avatar
dengyihao 已提交
639 640 641

  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
  tmsgSendRsp(&resp);
L
Liu Jicong 已提交
642

H
Hongze Cheng 已提交
643 644
  tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
          TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset);
L
Liu Jicong 已提交
645 646

  // TODO destroy
L
Liu Jicong 已提交
647 648
  taosArrayDestroy(rsp.blockData);
  taosArrayDestroy(rsp.blockDataLen);
L
Liu Jicong 已提交
649
  taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
L
Liu Jicong 已提交
650
  taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree);
L
Liu Jicong 已提交
651

652 653
  return 0;
}
L
Liu Jicong 已提交
654

L
Liu Jicong 已提交
655 656
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
L
Liu Jicong 已提交
657 658 659

  int32_t code = taosHashRemove(pTq->execs, pReq->subKey, strlen(pReq->subKey));
  ASSERT(code == 0);
660 661 662 663 664 665 666 667 668 669 670

  TXN txn;

  if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
    ASSERT(0);
  }

  if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
671 672 673
  if (tdbTbDelete(pTq->pExecStore, pReq->subKey, (int)strlen(pReq->subKey), &txn) < 0) {
    /*ASSERT(0);*/
  }
674 675 676 677 678

  if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
679
  return 0;
L
Liu Jicong 已提交
680 681
}

L
Liu Jicong 已提交
682 683
// TODO: persist meta into tdb
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
684
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
685 686
  tDecodeSMqRebVgReq(msg, &req);
  // todo lock
L
Liu Jicong 已提交
687
  STqExec* pExec = taosHashGet(pTq->execs, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
688 689 690 691 692 693 694 695 696 697
  if (pExec == NULL) {
    ASSERT(req.oldConsumerId == -1);
    ASSERT(req.newConsumerId != -1);
    STqExec exec = {0};
    pExec = &exec;
    /*taosInitRWLatch(&pExec->lock);*/

    memcpy(pExec->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pExec->consumerId = req.newConsumerId;
    pExec->epoch = -1;
L
Liu Jicong 已提交
698 699 700 701 702 703

    pExec->subType = req.subType;
    pExec->withTbName = req.withTbName;
    pExec->withSchema = req.withSchema;
    pExec->withTag = req.withTag;

L
Liu Jicong 已提交
704 705
    pExec->qmsg = req.qmsg;
    req.qmsg = NULL;
L
Liu Jicong 已提交
706 707

    pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
708 709 710 711
    if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
      for (int32_t i = 0; i < 5; i++) {
        pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);

L
Liu Jicong 已提交
712 713 714
        SReadHandle handle = {
            .reader = pExec->pExecReader[i],
            .meta = pTq->pVnode->pMeta,
715
            .pMsgCb = &pTq->pVnode->msgCb,
L
Liu Jicong 已提交
716 717 718 719
        };
        pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
        ASSERT(pExec->task[i]);
      }
720 721 722 723 724
    } else {
      for (int32_t i = 0; i < 5; i++) {
        pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
      }
      pExec->pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
L
Liu Jicong 已提交
725
    }
L
Liu Jicong 已提交
726
    taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
727

L
Liu Jicong 已提交
728 729
    if (tqStoreExec(pTq, req.subKey, pExec) < 0) {
      // TODO
730
    }
L
Liu Jicong 已提交
731 732
    return 0;
  } else {
L
Liu Jicong 已提交
733
    /*ASSERT(pExec->consumerId == req.oldConsumerId);*/
L
Liu Jicong 已提交
734 735 736 737
    // TODO handle qmsg and exec modification
    atomic_store_32(&pExec->epoch, -1);
    atomic_store_64(&pExec->consumerId, req.newConsumerId);
    atomic_add_fetch_32(&pExec->epoch, 1);
L
Liu Jicong 已提交
738 739 740 741

    if (tqStoreExec(pTq, req.subKey, pExec) < 0) {
      // TODO
    }
L
Liu Jicong 已提交
742
    return 0;
L
Liu Jicong 已提交
743 744
  }
}
745

L
Liu Jicong 已提交
746 747 748 749 750
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
  const SArray* pRes = (const SArray*)data;
  SVnode*       pVnode = (SVnode*)vnode;

  ASSERT(pTask->tbSink.pTSchema);
L
Liu Jicong 已提交
751 752
  SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
                                     pTask->tbSink.stbFullName, pVnode->config.vgId);
L
Liu Jicong 已提交
753
  /*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
L
Liu Jicong 已提交
754
  // build write msg
L
Liu Jicong 已提交
755 756 757 758 759
  SRpcMsg msg = {
      .msgType = TDMT_VND_SUBMIT,
      .pCont = pReq,
      .contLen = ntohl(pReq->length),
  };
L
Liu Jicong 已提交
760

L
Liu Jicong 已提交
761
  ASSERT(tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) == 0);
L
Liu Jicong 已提交
762
}
L
Liu Jicong 已提交
763

L
Liu Jicong 已提交
764 765 766 767 768 769 770 771 772 773 774 775
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  if (tDecodeSStreamTask(&decoder, pTask) < 0) {
    ASSERT(0);
  }
  tDecoderClear(&decoder);

L
Liu Jicong 已提交
776 777 778 779 780 781 782 783 784 785 786 787
  pTask->status = TASK_STATUS__IDLE;
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;

  pTask->inputQ = taosOpenQueue();
  pTask->outputQ = taosOpenQueue();
  pTask->inputQAll = taosAllocateQall();
  pTask->outputQAll = taosAllocateQall();

  if (pTask->inputQ == NULL || pTask->outputQ == NULL || pTask->inputQAll == NULL || pTask->outputQAll == NULL)
    goto FAIL;

L
Liu Jicong 已提交
788
  // exec
L
Liu Jicong 已提交
789 790
  if (pTask->execType != TASK_EXEC__NONE) {
    // expand runners
L
Liu Jicong 已提交
791 792 793 794 795 796 797 798 799 800
    STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
    SReadHandle    handle = {
           .reader = pStreamReader,
           .meta = pTq->pVnode->pMeta,
           .pMsgCb = &pTq->pVnode->msgCb,
           .vnode = pTq->pVnode,
    };
    pTask->exec.inputHandle = pStreamReader;
    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
    ASSERT(pTask->exec.executor);
L
Liu Jicong 已提交
801
  }
L
Liu Jicong 已提交
802 803

  // sink
L
Liu Jicong 已提交
804
  pTask->ahandle = pTq->pVnode;
L
Liu Jicong 已提交
805
  if (pTask->sinkType == TASK_SINK__SMA) {
L
Liu Jicong 已提交
806
    pTask->smaSink.smaSink = smaHandleRes;
L
Liu Jicong 已提交
807
  } else if (pTask->sinkType == TASK_SINK__TABLE) {
L
Liu Jicong 已提交
808 809 810
    pTask->tbSink.vnode = pTq->pVnode;
    pTask->tbSink.tbSinkFunc = tqTableSink;

L
Liu Jicong 已提交
811 812
    ASSERT(pTask->tbSink.pSchemaWrapper);
    ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
L
Liu Jicong 已提交
813

L
Liu Jicong 已提交
814 815 816
    pTask->tbSink.pTSchema =
        tdGetSTSChemaFromSSChema(&pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols);
    ASSERT(pTask->tbSink.pTSchema);
L
Liu Jicong 已提交
817
  }
L
Liu Jicong 已提交
818

L
Liu Jicong 已提交
819 820
  taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));

L
Liu Jicong 已提交
821 822
  return 0;
FAIL:
L
Liu Jicong 已提交
823 824 825 826 827
  if (pTask->inputQ) taosCloseQueue(pTask->inputQ);
  if (pTask->outputQ) taosCloseQueue(pTask->outputQ);
  if (pTask->inputQAll) taosFreeQall(pTask->inputQAll);
  if (pTask->outputQAll) taosFreeQall(pTask->outputQAll);
  if (pTask) taosMemoryFree(pTask);
L
Liu Jicong 已提交
828 829
  return -1;
}
L
Liu Jicong 已提交
830

L
Liu Jicong 已提交
831
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
L
Liu Jicong 已提交
832 833 834 835 836 837
  void* pIter = NULL;
  bool  failed = false;

  SStreamDataSubmit* pSubmit = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
  if (pSubmit == NULL) {
    failed = true;
L
Liu Jicong 已提交
838
    goto SET_TASK_FAIL;
L
Liu Jicong 已提交
839 840 841 842
  }
  pSubmit->dataRef = taosMemoryMalloc(sizeof(int32_t));
  if (pSubmit->dataRef == NULL) {
    failed = true;
L
Liu Jicong 已提交
843
    goto SET_TASK_FAIL;
L
Liu Jicong 已提交
844 845
  }

L
Liu Jicong 已提交
846 847 848
  pSubmit->type = STREAM_INPUT__DATA_SUBMIT;
  /*pSubmit->sourceVer = ver;*/
  /*pSubmit->sourceVg = pTq->pVnode->config.vgId;*/
L
Liu Jicong 已提交
849 850 851
  pSubmit->data = pReq;
  *pSubmit->dataRef = 1;

L
Liu Jicong 已提交
852
SET_TASK_FAIL:
L
Liu Jicong 已提交
853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870
  while (1) {
    pIter = taosHashIterate(pTq->pStreamTasks, pIter);
    if (pIter == NULL) break;
    SStreamTask* pTask = (SStreamTask*)pIter;
    if (pTask->inputType != STREAM_INPUT__DATA_SUBMIT) continue;

    int8_t inputStatus = atomic_load_8(&pTask->inputStatus);
    if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
      if (failed) {
        atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
        continue;
      }

      streamDataSubmitRefInc(pSubmit);
      taosWriteQitem(pTask->inputQ, pSubmit);

      int8_t execStatus = atomic_load_8(&pTask->status);
      if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
L
Liu Jicong 已提交
871 872 873 874 875 876 877 878 879 880 881 882
        SStreamTaskRunReq* pRunReq = taosMemoryMalloc(sizeof(SStreamTaskRunReq));
        if (pRunReq == NULL) continue;
        // TODO: do we need htonl?
        pRunReq->head.vgId = pTq->pVnode->config.vgId;
        pRunReq->streamId = pTask->streamId;
        pRunReq->taskId = pTask->taskId;
        SRpcMsg msg = {
            .msgType = TDMT_VND_TASK_RUN,
            .pCont = pRunReq,
            .contLen = sizeof(SStreamTaskRunReq),
        };
        tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &msg);
L
Liu Jicong 已提交
883 884 885 886 887 888 889 890 891 892 893
      }

    } else {
      // blocked or stopped, do nothing
    }
  }

  if (!failed) {
    streamDataSubmitRefDec(pSubmit);
    return 0;
  } else {
L
Liu Jicong 已提交
894 895 896 897 898 899
    if (pSubmit) {
      if (pSubmit->dataRef) {
        taosMemoryFree(pSubmit->dataRef);
      }
      taosFreeQitem(pSubmit);
    }
L
Liu Jicong 已提交
900 901 902 903
    return -1;
  }
}

L
Liu Jicong 已提交
904
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
905
  //
L
Liu Jicong 已提交
906 907 908 909 910 911 912 913 914 915 916
  SStreamTaskRunReq* pReq = pMsg->pCont;
  int32_t            taskId = pReq->taskId;
  SStreamTask*       pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
  streamTaskProcessRunReq(pTask, &pTq->pVnode->msgCb);
  return 0;
}

int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchReq* pReq = pMsg->pCont;
  int32_t             taskId = pReq->taskId;
  SStreamTask*        pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
917
  streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
L
Liu Jicong 已提交
918 919 920 921 922 923 924
  return 0;
}

int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRecoverReq* pReq = pMsg->pCont;
  int32_t                taskId = pReq->taskId;
  SStreamTask*           pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
925
  streamProcessRecoverReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
L
Liu Jicong 已提交
926 927 928 929 930 931 932
  return 0;
}

int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = pMsg->pCont;
  int32_t             taskId = pRsp->taskId;
  SStreamTask*        pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
933
  streamProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp);
L
Liu Jicong 已提交
934 935 936 937 938 939 940
  return 0;
}

int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
  int32_t                taskId = pRsp->taskId;
  SStreamTask*           pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
941
  streamProcessRecoverRsp(pTask, pRsp);
L
Liu Jicong 已提交
942 943
  return 0;
}