tq.c 30.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
17
#include "tdbInt.h"
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19
int32_t tqInit() {
L
Liu Jicong 已提交
20 21 22 23 24 25
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

26 27 28 29 30 31
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
L
Liu Jicong 已提交
32
    atomic_store_8(&tqMgmt.inited, 1);
33
  }
L
Liu Jicong 已提交
34 35
  return 0;
}
L
Liu Jicong 已提交
36

37
void tqCleanUp() {
L
Liu Jicong 已提交
38 39 40 41 42 43 44 45 46 47
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
    atomic_store_8(&tqMgmt.inited, 0);
  }
48
}
L
Liu Jicong 已提交
49

50 51 52 53
int tqExecKeyCompare(const void* pKey1, int32_t kLen1, const void* pKey2, int32_t kLen2) {
  return strcmp(pKey1, pKey2);
}

L
Liu Jicong 已提交
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
int32_t tqStoreExec(STQ* pTq, const char* key, const STqExec* pExec) {
  int32_t code;
  int32_t vlen;
  tEncodeSize(tEncodeSTqExec, pExec, vlen, code);
  ASSERT(code == 0);

  void* buf = taosMemoryCalloc(1, vlen);
  if (buf == NULL) {
    ASSERT(0);
  }

  SEncoder encoder;
  tEncoderInit(&encoder, buf, vlen);

  if (tEncodeSTqExec(&encoder, pExec) < 0) {
    ASSERT(0);
  }

  TXN txn;

  if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
    ASSERT(0);
  }

  if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
    ASSERT(0);
  }

  if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) {
    ASSERT(0);
  }

  if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
    ASSERT(0);
  }

  tEncoderClear(&encoder);
  taosMemoryFree(buf);
  return 0;
}

L
Liu Jicong 已提交
95
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
wafwerar's avatar
wafwerar 已提交
96
  STQ* pTq = taosMemoryMalloc(sizeof(STQ));
L
Liu Jicong 已提交
97
  if (pTq == NULL) {
L
Liu Jicong 已提交
98
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
99 100
    return NULL;
  }
H
Hongze Cheng 已提交
101
  pTq->path = strdup(path);
L
Liu Jicong 已提交
102
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
103
  pTq->pWal = pWal;
104

L
Liu Jicong 已提交
105
  pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
106

L
Liu Jicong 已提交
107 108
  pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);

L
Liu Jicong 已提交
109 110
  pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);

111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
  if (tdbOpen(path, 16 * 1024, 1, &pTq->pMetaStore) < 0) {
    ASSERT(0);
  }

  if (tdbTbOpen("exec", -1, -1, tqExecKeyCompare, pTq->pMetaStore, &pTq->pExecStore) < 0) {
    ASSERT(0);
  }

  TXN txn;

  if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
    ASSERT(0);
  }

  TBC* pCur;
  if (tdbTbcOpen(pTq->pExecStore, &pCur, &txn) < 0) {
    ASSERT(0);
  }

  void* pKey;
  int   kLen;
  void* pVal;
  int   vLen;

  tdbTbcMoveToFirst(pCur);
L
Liu Jicong 已提交
136
  SDecoder decoder;
137
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
L
Liu Jicong 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
    STqExec exec;
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
    tDecodeSTqExec(&decoder, &exec);
    exec.pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
    if (exec.subType == TOPIC_SUB_TYPE__TABLE) {
      for (int32_t i = 0; i < 5; i++) {
        exec.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);

        SReadHandle handle = {
            .reader = exec.pExecReader[i],
            .meta = pTq->pVnode->pMeta,
            .pMsgCb = &pTq->pVnode->msgCb,
        };
        exec.task[i] = qCreateStreamExecTaskInfo(exec.qmsg, &handle);
        ASSERT(exec.task[i]);
      }
    } else {
      for (int32_t i = 0; i < 5; i++) {
        exec.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
      }
      exec.pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
    }
    taosHashPut(pTq->execs, pKey, kLen, &exec, sizeof(STqExec));
161 162 163 164 165 166
  }

  if (tdbTxnClose(&txn) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
167 168
  return pTq;
}
L
Liu Jicong 已提交
169

L
Liu Jicong 已提交
170
void tqClose(STQ* pTq) {
H
Hongze Cheng 已提交
171
  if (pTq) {
wafwerar's avatar
wafwerar 已提交
172
    taosMemoryFreeClear(pTq->path);
L
Liu Jicong 已提交
173 174 175
    taosHashCleanup(pTq->execs);
    taosHashCleanup(pTq->pStreamTasks);
    taosHashCleanup(pTq->pushMgr);
176
    tdbClose(pTq->pMetaStore);
wafwerar's avatar
wafwerar 已提交
177
    taosMemoryFree(pTq);
H
Hongze Cheng 已提交
178
  }
L
Liu Jicong 已提交
179 180
  // TODO
}
L
Liu Jicong 已提交
181

182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeCStr(pEncoder, pExec->subKey) < 0) return -1;
  if (tEncodeI64(pEncoder, pExec->consumerId) < 0) return -1;
  if (tEncodeI32(pEncoder, pExec->epoch) < 0) return -1;
  if (tEncodeI8(pEncoder, pExec->subType) < 0) return -1;
  if (tEncodeI8(pEncoder, pExec->withTbName) < 0) return -1;
  if (tEncodeI8(pEncoder, pExec->withSchema) < 0) return -1;
  if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1;
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
    if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1;
  }
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pExec->subKey) < 0) return -1;
  if (tDecodeI64(pDecoder, &pExec->consumerId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pExec->epoch) < 0) return -1;
  if (tDecodeI8(pDecoder, &pExec->subType) < 0) return -1;
  if (tDecodeI8(pDecoder, &pExec->withTbName) < 0) return -1;
  if (tDecodeI8(pDecoder, &pExec->withSchema) < 0) return -1;
  if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1;
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
    if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1;
  }
  tEndDecode(pDecoder);
  return 0;
}
213
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
L
Liu Jicong 已提交
214
  void* pIter = NULL;
215 216 217
  while (1) {
    pIter = taosHashIterate(pTq->execs, pIter);
    if (pIter == NULL) break;
L
Liu Jicong 已提交
218
    STqExec* pExec = (STqExec*)pIter;
219
    if (pExec->subType == TOPIC_SUB_TYPE__DB) {
L
Liu Jicong 已提交
220
      if (!isAdd) {
221 222 223 224 225 226
        int32_t sz = taosArrayGetSize(tbUidList);
        for (int32_t i = 0; i < sz; i++) {
          int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
          taosHashPut(pExec->pDropTbUid, &tbUid, sizeof(int64_t), NULL, 0);
        }
      }
L
Liu Jicong 已提交
227 228 229 230 231
    } else {
      for (int32_t i = 0; i < 5; i++) {
        int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, isAdd);
        ASSERT(code == 0);
      }
232 233
    }
  }
234 235 236 237 238 239 240 241 242
  while (1) {
    pIter = taosHashIterate(pTq->pStreamTasks, pIter);
    if (pIter == NULL) break;
    SStreamTask* pTask = (SStreamTask*)pIter;
    if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
      int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd);
      ASSERT(code == 0);
    }
  }
243 244 245
  return 0;
}

dengyihao's avatar
dengyihao 已提交
246
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
L
Liu Jicong 已提交
247 248 249 250 251 252 253 254 255 256
  if (msgType != TDMT_VND_SUBMIT) return 0;
  void*       pIter = NULL;
  STqExec*    pExec = NULL;
  SSubmitReq* pReq = (SSubmitReq*)msg;
  int32_t     workerId = 4;
  int64_t     fetchOffset = ver;

  while (1) {
    pIter = taosHashIterate(pTq->pushMgr, pIter);
    if (pIter == NULL) break;
L
Liu Jicong 已提交
257
    pExec = *(STqExec**)pIter;
L
Liu Jicong 已提交
258 259 260 261 262 263 264 265

    taosWLockLatch(&pExec->pushHandle.lock);

    SRpcMsg* pMsg = atomic_load_ptr(&pExec->pushHandle.handle);
    ASSERT(pMsg);

    SMqDataBlkRsp rsp = {0};
    rsp.reqOffset = pExec->pushHandle.reqOffset;
266
    rsp.blockData = taosArrayInit(0, sizeof(void*));
L
Liu Jicong 已提交
267 268 269 270 271
    rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));

    if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
      qTaskInfo_t task = pExec->task[workerId];
      ASSERT(task);
272
      qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
L
Liu Jicong 已提交
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
      while (1) {
        SSDataBlock* pDataBlock = NULL;
        uint64_t     ts = 0;
        if (qExecTask(task, &pDataBlock, &ts) < 0) {
          ASSERT(0);
        }
        if (pDataBlock == NULL) break;

        ASSERT(pDataBlock->info.rows != 0);
        ASSERT(pDataBlock->info.numOfCols != 0);

        int32_t            dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock);
        void*              buf = taosMemoryCalloc(1, dataStrLen);
        SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
        pRetrieve->useconds = ts;
        pRetrieve->precision = TSDB_DEFAULT_PRECISION;
        pRetrieve->compressed = 0;
        pRetrieve->completed = 1;
        pRetrieve->numOfRows = htonl(pDataBlock->info.rows);

        // TODO enable compress
        int32_t actualLen = 0;
        blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false);
        actualLen += sizeof(SRetrieveTableRsp);
        ASSERT(actualLen <= dataStrLen);
        taosArrayPush(rsp.blockDataLen, &actualLen);
        taosArrayPush(rsp.blockData, &buf);
        rsp.blockNum++;
      }
    } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
      STqReadHandle* pReader = pExec->pExecReader[workerId];
      tqReadHandleSetMsg(pReader, pReq, 0);
      while (tqNextDataBlock(pReader)) {
        SSDataBlock block = {0};
307
        if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
L
Liu Jicong 已提交
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
                                &block.info.numOfCols) < 0) {
          ASSERT(0);
        }
        int32_t            dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block);
        void*              buf = taosMemoryCalloc(1, dataStrLen);
        SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
        /*pRetrieve->useconds = 0;*/
        pRetrieve->precision = TSDB_DEFAULT_PRECISION;
        pRetrieve->compressed = 0;
        pRetrieve->completed = 1;
        pRetrieve->numOfRows = htonl(block.info.rows);

        // TODO enable compress
        int32_t actualLen = 0;
        blockCompressEncode(&block, pRetrieve->data, &actualLen, block.info.numOfCols, false);
        actualLen += sizeof(SRetrieveTableRsp);
        ASSERT(actualLen <= dataStrLen);
        taosArrayPush(rsp.blockDataLen, &actualLen);
        taosArrayPush(rsp.blockData, &buf);
        rsp.blockNum++;
      }
    } else {
      ASSERT(0);
    }

    if (rsp.blockNum == 0) {
      taosWUnLockLatch(&pExec->pushHandle.lock);
      continue;
    }

    ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
    ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);

    rsp.rspOffset = fetchOffset;

    int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
    void*   buf = rpcMallocCont(tlen);
    if (buf == NULL) {
      pMsg->code = -1;
      return -1;
    }

    ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
    ((SMqRspHead*)buf)->epoch = pExec->pushHandle.epoch;
    ((SMqRspHead*)buf)->consumerId = pExec->pushHandle.consumerId;

    void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
    tEncodeSMqDataBlkRsp(&abuf, &rsp);
dengyihao's avatar
dengyihao 已提交
356 357 358

    SRpcMsg resp = {.info = handleInfo, .pCont = buf, .contLen = tlen, .code = 0};
    tmsgSendRsp(&resp);
L
Liu Jicong 已提交
359 360 361 362

    atomic_store_ptr(&pExec->pushHandle.handle, NULL);
    taosWUnLockLatch(&pExec->pushHandle.lock);

H
Hongze Cheng 已提交
363 364 365
    tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
            TD_VID(pTq->pVnode), fetchOffset, pExec->pushHandle.consumerId, pExec->pushHandle.epoch, rsp.blockNum,
            rsp.reqOffset, rsp.rspOffset);
L
Liu Jicong 已提交
366 367 368 369 370 371 372 373 374

    // TODO destroy
    taosArrayDestroy(rsp.blockData);
    taosArrayDestroy(rsp.blockDataLen);
  }

  return 0;
}

L
Liu Jicong 已提交
375
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
L
Liu Jicong 已提交
376 377
  if (msgType == TDMT_VND_SUBMIT) {
    if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0;
L
Liu Jicong 已提交
378

L
Liu Jicong 已提交
379
    if (tdUpdateExpireWindow(pTq->pVnode->pSma, msg, ver) != 0) {
L
Liu Jicong 已提交
380
      // TODO handle sma error
L
Liu Jicong 已提交
381 382 383 384 385 386
    }
    void* data = taosMemoryMalloc(msgLen);
    if (data == NULL) {
      return -1;
    }
    memcpy(data, msg, msgLen);
L
Liu Jicong 已提交
387

L
Liu Jicong 已提交
388
    tqProcessStreamTrigger(pTq, data);
C
Cary Xu 已提交
389
  }
L
Liu Jicong 已提交
390

L
Liu Jicong 已提交
391 392 393
  return 0;
}

L
Liu Jicong 已提交
394 395
int tqCommit(STQ* pTq) {
  // do nothing
L
Liu Jicong 已提交
396
  return 0;
L
Liu Jicong 已提交
397
}
L
Liu Jicong 已提交
398

L
fix  
Liu Jicong 已提交
399
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
L
Liu Jicong 已提交
400 401 402 403 404
  SMqPollReq* pReq = pMsg->pCont;
  int64_t     consumerId = pReq->consumerId;
  int64_t     waitTime = pReq->waitTime;
  int32_t     reqEpoch = pReq->epoch;
  int64_t     fetchOffset;
405

L
Liu Jicong 已提交
406
  // get offset to fetch message
L
Liu Jicong 已提交
407
  if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
L
Liu Jicong 已提交
408
    fetchOffset = walGetFirstVer(pTq->pWal);
L
Liu Jicong 已提交
409
  } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
L
Liu Jicong 已提交
410
    fetchOffset = walGetCommittedVer(pTq->pWal);
L
Liu Jicong 已提交
411 412 413 414
  } else {
    fetchOffset = pReq->currentOffset + 1;
  }

H
Hongze Cheng 已提交
415 416
  tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
          TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
L
fix  
Liu Jicong 已提交
417

L
Liu Jicong 已提交
418
  STqExec* pExec = taosHashGet(pTq->execs, pReq->subKey, strlen(pReq->subKey));
L
Liu Jicong 已提交
419
  ASSERT(pExec);
L
Liu Jicong 已提交
420

L
Liu Jicong 已提交
421
  int32_t consumerEpoch = atomic_load_32(&pExec->epoch);
L
Liu Jicong 已提交
422
  while (consumerEpoch < reqEpoch) {
L
Liu Jicong 已提交
423
    consumerEpoch = atomic_val_compare_exchange_32(&pExec->epoch, consumerEpoch, reqEpoch);
L
Liu Jicong 已提交
424 425
  }

426 427 428 429 430 431 432
  SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048);
  if (pHeadWithCkSum == NULL) {
    return -1;
  }

  walSetReaderCapacity(pExec->pWalReader, 2048);

L
Liu Jicong 已提交
433
  SMqDataBlkRsp rsp = {0};
L
Liu Jicong 已提交
434
  rsp.reqOffset = pReq->currentOffset;
L
Liu Jicong 已提交
435
  rsp.withSchema = pExec->withSchema;
L
Liu Jicong 已提交
436

L
Liu Jicong 已提交
437
  rsp.blockData = taosArrayInit(0, sizeof(void*));
L
Liu Jicong 已提交
438
  rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
L
Liu Jicong 已提交
439
  rsp.blockSchema = taosArrayInit(0, sizeof(void*));
L
Liu Jicong 已提交
440
  rsp.blockTbName = taosArrayInit(0, sizeof(void*));
441

442 443 444 445 446 447
  int8_t withTbName = pExec->withTbName;
  if (pReq->withTbName != -1) {
    withTbName = pReq->withTbName;
  }
  rsp.withTbName = withTbName;

448
  while (1) {
L
Liu Jicong 已提交
449
    consumerEpoch = atomic_load_32(&pExec->epoch);
L
Liu Jicong 已提交
450
    if (consumerEpoch > reqEpoch) {
H
Hongze Cheng 已提交
451 452
      tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
              consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
L
Liu Jicong 已提交
453 454
      break;
    }
L
Liu Jicong 已提交
455

456 457 458
    taosThreadMutexLock(&pExec->pWalReader->mutex);

    if (walFetchHead(pExec->pWalReader, fetchOffset, pHeadWithCkSum) < 0) {
H
Hongze Cheng 已提交
459 460
      tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
              TD_VID(pTq->pVnode), fetchOffset);
461 462 463 464 465
      taosThreadMutexUnlock(&pExec->pWalReader->mutex);
      break;
    }

    if (pHeadWithCkSum->head.msgType != TDMT_VND_SUBMIT) {
L
Liu Jicong 已提交
466
      ASSERT(walSkipFetchBody(pExec->pWalReader, pHeadWithCkSum) == 0);
467
    } else {
L
Liu Jicong 已提交
468
      ASSERT(walFetchBody(pExec->pWalReader, &pHeadWithCkSum) == 0);
469 470 471 472 473 474 475
    }

    SWalReadHead* pHead = &pHeadWithCkSum->head;

    taosThreadMutexUnlock(&pExec->pWalReader->mutex);

#if 0
L
fix  
Liu Jicong 已提交
476
    SWalReadHead* pHead;
L
Liu Jicong 已提交
477
    if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) {
L
Liu Jicong 已提交
478 479
      // TODO: no more log, set timer to wait blocking time
      // if data inserted during waiting, launch query and
L
Liu Jicong 已提交
480
      // response to user
H
Hongze Cheng 已提交
481
      tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
H
refact  
Hongze Cheng 已提交
482
             TD_VID(pTq->pVnode), fetchOffset);
L
Liu Jicong 已提交
483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504

#if 0
      // add to pushMgr
      taosWLockLatch(&pExec->pushHandle.lock);

      pExec->pushHandle.consumerId = consumerId;
      pExec->pushHandle.epoch = reqEpoch;
      pExec->pushHandle.reqOffset = rsp.reqOffset;
      pExec->pushHandle.skipLogNum = rsp.skipLogNum;
      pExec->pushHandle.handle = pMsg;

      taosWUnLockLatch(&pExec->pushHandle.lock);

      // TODO add timer

      // TODO: the pointer will always be valid?
      taosHashPut(pTq->pushMgr, &consumerId, sizeof(int64_t), &pExec, sizeof(void*));
      taosArrayDestroy(rsp.blockData);
      taosArrayDestroy(rsp.blockDataLen);
      return 0;
#endif

505 506 507
    break;
  }
#endif
L
Liu Jicong 已提交
508

H
Hongze Cheng 已提交
509 510
    tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
            TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
L
Liu Jicong 已提交
511

L
fix  
Liu Jicong 已提交
512 513
    if (pHead->msgType == TDMT_VND_SUBMIT) {
      SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
514
      // table subscribe
L
Liu Jicong 已提交
515 516 517
      if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
        qTaskInfo_t task = pExec->task[workerId];
        ASSERT(task);
518
        qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
L
Liu Jicong 已提交
519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545
        while (1) {
          SSDataBlock* pDataBlock = NULL;
          uint64_t     ts = 0;
          if (qExecTask(task, &pDataBlock, &ts) < 0) {
            ASSERT(0);
          }
          if (pDataBlock == NULL) break;

          ASSERT(pDataBlock->info.rows != 0);
          ASSERT(pDataBlock->info.numOfCols != 0);

          int32_t            dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock);
          void*              buf = taosMemoryCalloc(1, dataStrLen);
          SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
          pRetrieve->useconds = ts;
          pRetrieve->precision = TSDB_DEFAULT_PRECISION;
          pRetrieve->compressed = 0;
          pRetrieve->completed = 1;
          pRetrieve->numOfRows = htonl(pDataBlock->info.rows);

          // TODO enable compress
          int32_t actualLen = 0;
          blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false);
          actualLen += sizeof(SRetrieveTableRsp);
          ASSERT(actualLen <= dataStrLen);
          taosArrayPush(rsp.blockDataLen, &actualLen);
          taosArrayPush(rsp.blockData, &buf);
L
Liu Jicong 已提交
546 547 548 549 550 551

          if (pExec->withSchema) {
            SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
            taosArrayPush(rsp.blockSchema, &pSW);
          }

552
          if (withTbName) {
L
Liu Jicong 已提交
553 554 555 556 557 558 559 560 561 562 563
            SMetaReader mr = {0};
            metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
            int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
            if (metaGetTableEntryByUid(&mr, uid) < 0) {
              ASSERT(0);
            }
            char* tbName = strdup(mr.me.name);
            taosArrayPush(rsp.blockTbName, &tbName);
            metaReaderClear(&mr);
          }

L
Liu Jicong 已提交
564
          rsp.blockNum++;
565
        }
566
        // db subscribe
L
Liu Jicong 已提交
567
      } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
L
Liu Jicong 已提交
568
        rsp.withSchema = 1;
L
Liu Jicong 已提交
569
        STqReadHandle* pReader = pExec->pExecReader[workerId];
L
Liu Jicong 已提交
570
        tqReadHandleSetMsg(pReader, pCont, 0);
571
        while (tqNextDataBlockFilterOut(pReader, pExec->pDropTbUid)) {
L
Liu Jicong 已提交
572
          SSDataBlock block = {0};
573
          if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
L
Liu Jicong 已提交
574
                                  &block.info.numOfCols) < 0) {
575
            if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
L
Liu Jicong 已提交
576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593
            ASSERT(0);
          }
          int32_t            dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block);
          void*              buf = taosMemoryCalloc(1, dataStrLen);
          SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
          /*pRetrieve->useconds = 0;*/
          pRetrieve->precision = TSDB_DEFAULT_PRECISION;
          pRetrieve->compressed = 0;
          pRetrieve->completed = 1;
          pRetrieve->numOfRows = htonl(block.info.rows);

          // TODO enable compress
          int32_t actualLen = 0;
          blockCompressEncode(&block, pRetrieve->data, &actualLen, block.info.numOfCols, false);
          actualLen += sizeof(SRetrieveTableRsp);
          ASSERT(actualLen <= dataStrLen);
          taosArrayPush(rsp.blockDataLen, &actualLen);
          taosArrayPush(rsp.blockData, &buf);
594
          if (withTbName) {
L
Liu Jicong 已提交
595 596 597 598 599 600 601 602 603
            SMetaReader mr = {0};
            metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
            if (metaGetTableEntryByUid(&mr, block.info.uid) < 0) {
              ASSERT(0);
            }
            char* tbName = strdup(mr.me.name);
            taosArrayPush(rsp.blockTbName, &tbName);
            metaReaderClear(&mr);
          }
L
Liu Jicong 已提交
604 605 606 607

          SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
          taosArrayPush(rsp.blockSchema, &pSW);

L
Liu Jicong 已提交
608 609 610 611
          rsp.blockNum++;
        }
      } else {
        ASSERT(0);
L
fix  
Liu Jicong 已提交
612
      }
L
Liu Jicong 已提交
613
    }
614

L
Liu Jicong 已提交
615 616
    // TODO batch optimization:
    // TODO continue scan until meeting batch requirement
L
Liu Jicong 已提交
617 618 619 620
    if (rsp.blockNum != 0) break;
    rsp.skipLogNum++;
    fetchOffset++;
  }
621

L
Liu Jicong 已提交
622
  taosMemoryFree(pHeadWithCkSum);
L
Liu Jicong 已提交
623 624
  ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
  ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
L
fix  
Liu Jicong 已提交
625

L
Liu Jicong 已提交
626 627 628 629
  if (rsp.blockNum != 0)
    rsp.rspOffset = fetchOffset;
  else
    rsp.rspOffset = fetchOffset - 1;
630

L
Liu Jicong 已提交
631
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
632 633 634 635 636
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    pMsg->code = -1;
    return -1;
  }
L
Liu Jicong 已提交
637

L
Liu Jicong 已提交
638 639
  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
L
Liu Jicong 已提交
640
  ((SMqRspHead*)buf)->consumerId = consumerId;
641

L
Liu Jicong 已提交
642
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
643
  tEncodeSMqDataBlkRsp(&abuf, &rsp);
dengyihao's avatar
dengyihao 已提交
644 645 646

  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
  tmsgSendRsp(&resp);
L
Liu Jicong 已提交
647

H
Hongze Cheng 已提交
648 649
  tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
          TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset);
L
Liu Jicong 已提交
650 651

  // TODO destroy
L
Liu Jicong 已提交
652 653
  taosArrayDestroy(rsp.blockData);
  taosArrayDestroy(rsp.blockDataLen);
L
Liu Jicong 已提交
654
  taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
L
Liu Jicong 已提交
655
  taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree);
L
Liu Jicong 已提交
656

657 658
  return 0;
}
L
Liu Jicong 已提交
659

L
Liu Jicong 已提交
660 661
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
L
Liu Jicong 已提交
662 663 664

  int32_t code = taosHashRemove(pTq->execs, pReq->subKey, strlen(pReq->subKey));
  ASSERT(code == 0);
665 666 667 668 669 670 671 672 673 674 675

  TXN txn;

  if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
    ASSERT(0);
  }

  if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
676 677 678
  if (tdbTbDelete(pTq->pExecStore, pReq->subKey, (int)strlen(pReq->subKey), &txn) < 0) {
    /*ASSERT(0);*/
  }
679 680 681 682 683

  if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
684
  return 0;
L
Liu Jicong 已提交
685 686
}

L
Liu Jicong 已提交
687 688
// TODO: persist meta into tdb
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
689
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
690 691
  tDecodeSMqRebVgReq(msg, &req);
  // todo lock
L
Liu Jicong 已提交
692
  STqExec* pExec = taosHashGet(pTq->execs, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
693 694 695 696 697 698 699 700 701 702
  if (pExec == NULL) {
    ASSERT(req.oldConsumerId == -1);
    ASSERT(req.newConsumerId != -1);
    STqExec exec = {0};
    pExec = &exec;
    /*taosInitRWLatch(&pExec->lock);*/

    memcpy(pExec->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pExec->consumerId = req.newConsumerId;
    pExec->epoch = -1;
L
Liu Jicong 已提交
703 704 705 706 707 708

    pExec->subType = req.subType;
    pExec->withTbName = req.withTbName;
    pExec->withSchema = req.withSchema;
    pExec->withTag = req.withTag;

L
Liu Jicong 已提交
709 710
    pExec->qmsg = req.qmsg;
    req.qmsg = NULL;
L
Liu Jicong 已提交
711 712

    pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
713 714 715 716
    if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
      for (int32_t i = 0; i < 5; i++) {
        pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);

L
Liu Jicong 已提交
717 718 719
        SReadHandle handle = {
            .reader = pExec->pExecReader[i],
            .meta = pTq->pVnode->pMeta,
720
            .pMsgCb = &pTq->pVnode->msgCb,
L
Liu Jicong 已提交
721 722 723 724
        };
        pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
        ASSERT(pExec->task[i]);
      }
725 726 727 728 729
    } else {
      for (int32_t i = 0; i < 5; i++) {
        pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
      }
      pExec->pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
L
Liu Jicong 已提交
730
    }
L
Liu Jicong 已提交
731
    taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
732

L
Liu Jicong 已提交
733 734
    if (tqStoreExec(pTq, req.subKey, pExec) < 0) {
      // TODO
735
    }
L
Liu Jicong 已提交
736 737
    return 0;
  } else {
L
Liu Jicong 已提交
738
    /*ASSERT(pExec->consumerId == req.oldConsumerId);*/
L
Liu Jicong 已提交
739 740 741 742
    // TODO handle qmsg and exec modification
    atomic_store_32(&pExec->epoch, -1);
    atomic_store_64(&pExec->consumerId, req.newConsumerId);
    atomic_add_fetch_32(&pExec->epoch, 1);
L
Liu Jicong 已提交
743 744 745 746

    if (tqStoreExec(pTq, req.subKey, pExec) < 0) {
      // TODO
    }
L
Liu Jicong 已提交
747
    return 0;
L
Liu Jicong 已提交
748 749
  }
}
750

L
Liu Jicong 已提交
751 752 753 754 755
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
  const SArray* pRes = (const SArray*)data;
  SVnode*       pVnode = (SVnode*)vnode;

  ASSERT(pTask->tbSink.pTSchema);
L
Liu Jicong 已提交
756 757
  SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
                                     pTask->tbSink.stbFullName, pVnode->config.vgId);
L
Liu Jicong 已提交
758
  /*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
L
Liu Jicong 已提交
759
  // build write msg
L
Liu Jicong 已提交
760 761 762 763 764
  SRpcMsg msg = {
      .msgType = TDMT_VND_SUBMIT,
      .pCont = pReq,
      .contLen = ntohl(pReq->length),
  };
L
Liu Jicong 已提交
765

L
Liu Jicong 已提交
766
  ASSERT(tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) == 0);
L
Liu Jicong 已提交
767
}
L
Liu Jicong 已提交
768

L
Liu Jicong 已提交
769 770 771 772 773 774 775 776 777 778 779 780
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  if (tDecodeSStreamTask(&decoder, pTask) < 0) {
    ASSERT(0);
  }
  tDecoderClear(&decoder);

L
Liu Jicong 已提交
781 782 783 784 785 786 787 788 789 790 791 792
  pTask->status = TASK_STATUS__IDLE;
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;

  pTask->inputQ = taosOpenQueue();
  pTask->outputQ = taosOpenQueue();
  pTask->inputQAll = taosAllocateQall();
  pTask->outputQAll = taosAllocateQall();

  if (pTask->inputQ == NULL || pTask->outputQ == NULL || pTask->inputQAll == NULL || pTask->outputQAll == NULL)
    goto FAIL;

L
Liu Jicong 已提交
793
  // exec
L
Liu Jicong 已提交
794 795
  if (pTask->execType != TASK_EXEC__NONE) {
    // expand runners
L
Liu Jicong 已提交
796 797 798 799 800 801 802 803 804 805
    STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
    SReadHandle    handle = {
           .reader = pStreamReader,
           .meta = pTq->pVnode->pMeta,
           .pMsgCb = &pTq->pVnode->msgCb,
           .vnode = pTq->pVnode,
    };
    pTask->exec.inputHandle = pStreamReader;
    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
    ASSERT(pTask->exec.executor);
L
Liu Jicong 已提交
806
  }
L
Liu Jicong 已提交
807 808

  // sink
L
Liu Jicong 已提交
809
  pTask->ahandle = pTq->pVnode;
L
Liu Jicong 已提交
810
  if (pTask->sinkType == TASK_SINK__SMA) {
L
Liu Jicong 已提交
811
    pTask->smaSink.smaSink = smaHandleRes;
L
Liu Jicong 已提交
812
  } else if (pTask->sinkType == TASK_SINK__TABLE) {
L
Liu Jicong 已提交
813 814 815
    pTask->tbSink.vnode = pTq->pVnode;
    pTask->tbSink.tbSinkFunc = tqTableSink;

L
Liu Jicong 已提交
816 817
    ASSERT(pTask->tbSink.pSchemaWrapper);
    ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
L
Liu Jicong 已提交
818

L
Liu Jicong 已提交
819 820 821
    pTask->tbSink.pTSchema =
        tdGetSTSChemaFromSSChema(&pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols);
    ASSERT(pTask->tbSink.pTSchema);
L
Liu Jicong 已提交
822
  }
L
Liu Jicong 已提交
823

L
Liu Jicong 已提交
824 825
  taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));

L
Liu Jicong 已提交
826 827
  return 0;
FAIL:
L
Liu Jicong 已提交
828 829 830 831 832
  if (pTask->inputQ) taosCloseQueue(pTask->inputQ);
  if (pTask->outputQ) taosCloseQueue(pTask->outputQ);
  if (pTask->inputQAll) taosFreeQall(pTask->inputQAll);
  if (pTask->outputQAll) taosFreeQall(pTask->outputQAll);
  if (pTask) taosMemoryFree(pTask);
L
Liu Jicong 已提交
833 834
  return -1;
}
L
Liu Jicong 已提交
835

L
Liu Jicong 已提交
836
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
L
Liu Jicong 已提交
837 838 839
  void*              pIter = NULL;
  bool               failed = false;
  SStreamDataSubmit* pSubmit = NULL;
L
Liu Jicong 已提交
840

L
Liu Jicong 已提交
841
  pSubmit = streamDataSubmitNew(pReq);
L
Liu Jicong 已提交
842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859
  if (pSubmit == NULL) {
    failed = true;
  }

  while (1) {
    pIter = taosHashIterate(pTq->pStreamTasks, pIter);
    if (pIter == NULL) break;
    SStreamTask* pTask = (SStreamTask*)pIter;
    if (pTask->inputType != STREAM_INPUT__DATA_SUBMIT) continue;

    int8_t inputStatus = atomic_load_8(&pTask->inputStatus);
    if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
      if (failed) {
        atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
        continue;
      }

      streamDataSubmitRefInc(pSubmit);
L
Liu Jicong 已提交
860 861 862
      SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
      memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit));
      taosWriteQitem(pTask->inputQ, pSubmitClone);
L
Liu Jicong 已提交
863 864 865

      int8_t execStatus = atomic_load_8(&pTask->status);
      if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
L
Liu Jicong 已提交
866 867 868 869 870 871 872 873 874 875 876 877
        SStreamTaskRunReq* pRunReq = taosMemoryMalloc(sizeof(SStreamTaskRunReq));
        if (pRunReq == NULL) continue;
        // TODO: do we need htonl?
        pRunReq->head.vgId = pTq->pVnode->config.vgId;
        pRunReq->streamId = pTask->streamId;
        pRunReq->taskId = pTask->taskId;
        SRpcMsg msg = {
            .msgType = TDMT_VND_TASK_RUN,
            .pCont = pRunReq,
            .contLen = sizeof(SStreamTaskRunReq),
        };
        tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &msg);
L
Liu Jicong 已提交
878 879 880 881 882 883 884
      }

    } else {
      // blocked or stopped, do nothing
    }
  }

L
Liu Jicong 已提交
885
  if (pSubmit) {
L
Liu Jicong 已提交
886
    streamDataSubmitRefDec(pSubmit);
L
Liu Jicong 已提交
887
    taosFreeQitem(pSubmit);
L
Liu Jicong 已提交
888
  }
L
Liu Jicong 已提交
889 890

  return failed ? -1 : 0;
L
Liu Jicong 已提交
891 892
}

L
Liu Jicong 已提交
893
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
894
  //
L
Liu Jicong 已提交
895 896 897 898 899 900 901 902 903 904 905
  SStreamTaskRunReq* pReq = pMsg->pCont;
  int32_t            taskId = pReq->taskId;
  SStreamTask*       pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
  streamTaskProcessRunReq(pTask, &pTq->pVnode->msgCb);
  return 0;
}

int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchReq* pReq = pMsg->pCont;
  int32_t             taskId = pReq->taskId;
  SStreamTask*        pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
906
  streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
L
Liu Jicong 已提交
907 908 909 910 911 912 913
  return 0;
}

int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRecoverReq* pReq = pMsg->pCont;
  int32_t                taskId = pReq->taskId;
  SStreamTask*           pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
914
  streamProcessRecoverReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
L
Liu Jicong 已提交
915 916 917 918 919 920 921
  return 0;
}

int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = pMsg->pCont;
  int32_t             taskId = pRsp->taskId;
  SStreamTask*        pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
922
  streamProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp);
L
Liu Jicong 已提交
923 924 925 926 927 928 929
  return 0;
}

int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
  int32_t                taskId = pRsp->taskId;
  SStreamTask*           pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
930
  streamProcessRecoverRsp(pTask, pRsp);
L
Liu Jicong 已提交
931 932
  return 0;
}