tq.c 26.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

L
Liu Jicong 已提交
18
int32_t tqInit() {
L
Liu Jicong 已提交
19 20 21 22 23 24
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

25 26 27 28 29 30
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
31 32 33
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
34
    atomic_store_8(&tqMgmt.inited, 1);
35
  }
36

L
Liu Jicong 已提交
37 38
  return 0;
}
L
Liu Jicong 已提交
39

40
void tqCleanUp() {
L
Liu Jicong 已提交
41 42 43 44 45 46 47 48
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
49
    streamCleanUp();
L
Liu Jicong 已提交
50 51
    atomic_store_8(&tqMgmt.inited, 0);
  }
52
}
L
Liu Jicong 已提交
53

L
Liu Jicong 已提交
54
STQ* tqOpen(const char* path, SVnode* pVnode) {
55
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
56
  if (pTq == NULL) {
L
Liu Jicong 已提交
57
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
58 59
    return NULL;
  }
H
Hongze Cheng 已提交
60
  pTq->path = strdup(path);
L
Liu Jicong 已提交
61
  pTq->pVnode = pVnode;
62

L
Liu Jicong 已提交
63
  pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
64

L
Liu Jicong 已提交
65 66
  pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);

L
Liu Jicong 已提交
67 68
  pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);

L
Liu Jicong 已提交
69 70
  pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);

L
Liu Jicong 已提交
71
  if (tqMetaOpen(pTq) < 0) {
72 73 74
    ASSERT(0);
  }

75 76 77 78
  if (tqOffsetOpen(pTq) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
79 80
  return pTq;
}
L
Liu Jicong 已提交
81

L
Liu Jicong 已提交
82
void tqClose(STQ* pTq) {
H
Hongze Cheng 已提交
83
  if (pTq) {
84
    tqOffsetClose(pTq->pOffsetStore);
L
Liu Jicong 已提交
85
    taosHashCleanup(pTq->handles);
86 87 88 89 90 91 92
    void* pIter = NULL;
    while (1) {
      pIter = taosHashIterate(pTq->pStreamTasks, pIter);
      if (pIter == NULL) break;
      SStreamTask* pTask = *(SStreamTask**)pIter;
      tFreeSStreamTask(pTask);
    }
L
Liu Jicong 已提交
93 94
    taosHashCleanup(pTq->pStreamTasks);
    taosHashCleanup(pTq->pushMgr);
L
Liu Jicong 已提交
95
    taosHashCleanup(pTq->pAlterInfo);
96
    taosMemoryFree(pTq->path);
L
Liu Jicong 已提交
97
    tqMetaClose(pTq);
wafwerar's avatar
wafwerar 已提交
98
    taosMemoryFree(pTq);
H
Hongze Cheng 已提交
99
  }
L
Liu Jicong 已提交
100
}
L
Liu Jicong 已提交
101

L
Liu Jicong 已提交
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqMetaRsp(NULL, pRsp);
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_META_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
  tEncodeSMqMetaRsp(&abuf, pRsp);

  SRpcMsg resp = {
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
  tmsgSendRsp(&resp);

L
Liu Jicong 已提交
124 125
  tqDebug("vgId:%d from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, reqOffset:%" PRId64
          ", rspOffset:%" PRId64,
L
Liu Jicong 已提交
126 127 128 129 130
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->reqOffset, pRsp->rspOffset);

  return 0;
}

L
Liu Jicong 已提交
131 132 133 134 135 136 137 138 139 140
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) {
  ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
  ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);

  if (pRsp->withSchema) {
    ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
  } else {
    ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
  }

141 142 143 144 145 146
  if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
    if (pRsp->blockNum > 0) {
      ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
    } else {
      ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
    }
L
Liu Jicong 已提交
147 148 149 150 151 152 153 154 155
  }

  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
L
Liu Jicong 已提交
156 157 158 159 160 161 162 163 164 165 166
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

L
Liu Jicong 已提交
167
  SEncoder encoder;
L
Liu Jicong 已提交
168
  tEncoderInit(&encoder, abuf, len);
L
Liu Jicong 已提交
169 170 171
  tEncodeSMqDataRsp(&encoder, pRsp);

  SRpcMsg rsp = {
L
Liu Jicong 已提交
172 173 174 175 176
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
L
Liu Jicong 已提交
177
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
178

L
Liu Jicong 已提交
179 180 181 182
  char buf1[80];
  char buf2[80];
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);
S
Shengliang Guan 已提交
183
  tqDebug("vgId:%d from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
L
Liu Jicong 已提交
184
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
L
Liu Jicong 已提交
185 186 187 188

  return 0;
}

189 190 191 192 193 194 195 196 197 198
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
  STqOffset offset = {0};
  SDecoder  decoder;
  tDecoderInit(&decoder, msg, msgLen);
  if (tDecodeSTqOffset(&decoder, &offset) < 0) {
    ASSERT(0);
    return -1;
  }
  tDecoderClear(&decoder);

L
Liu Jicong 已提交
199
  if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA) {
L
Liu Jicong 已提交
200 201
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
            offset.subKey, TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts);
L
Liu Jicong 已提交
202
  } else if (offset.val.type == TMQ_OFFSET__LOG) {
S
Shengliang Guan 已提交
203
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
L
Liu Jicong 已提交
204
            TD_VID(pTq->pVnode), offset.val.version);
205 206 207
  } else {
    ASSERT(0);
  }
208 209 210 211 212 213
  /*STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);*/
  /*if (pOffset != NULL) {*/
  /*if (pOffset->val.type == TMQ_OFFSET__LOG && pOffset->val.version < offset.val.version) {*/
  if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
    ASSERT(0);
    return -1;
214
  }
215 216 217 218 219 220 221 222 223

  if (offset.val.type == TMQ_OFFSET__LOG) {
    STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey));
    if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
      ASSERT(0);
      return -1;
    }
  }

224 225
  /*}*/
  /*}*/
226 227 228 229

  return 0;
}

L
Liu Jicong 已提交
230
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
231 232
  void* pIter = NULL;
  while (1) {
L
Liu Jicong 已提交
233
    pIter = taosHashIterate(pTq->pAlterInfo, pIter);
L
Liu Jicong 已提交
234
    if (pIter == NULL) break;
L
Liu Jicong 已提交
235 236 237
    SCheckAlterInfo* pCheck = (SCheckAlterInfo*)pIter;
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
238
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
239 240 241
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
          taosHashCancelIterate(pTq->pAlterInfo, pIter);
L
Liu Jicong 已提交
242 243 244 245 246 247 248 249
          return -1;
        }
      }
    }
  }
  return 0;
}

L
Liu Jicong 已提交
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL) {
    return -1;
  }

  pRsp->withTbName = pReq->withTbName;
  if (pRsp->withTbName) {
    pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
    if (pRsp->blockTbName == NULL) {
      // TODO free
      return -1;
    }
  }

  if (subType == TOPIC_SUB_TYPE__COLUMN) {
    pRsp->withSchema = false;
  } else {
    pRsp->withSchema = true;
    pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
    if (pRsp->blockSchema == NULL) {
      // TODO free
      return -1;
    }
  }
  return 0;
}

static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return 0; }

L
Liu Jicong 已提交
284
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
285 286 287 288 289 290 291
  SMqPollReq*  pReq = pMsg->pCont;
  int64_t      consumerId = pReq->consumerId;
  int64_t      timeout = pReq->timeout;
  int32_t      reqEpoch = pReq->epoch;
  int32_t      code = 0;
  STqOffsetVal reqOffset = pReq->reqOffset;
  STqOffsetVal fetchOffsetNew;
L
Liu Jicong 已提交
292
  SWalCkHead*  pCkHead = NULL;
L
Liu Jicong 已提交
293 294 295 296 297

  // 1.find handle
  STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
  /*ASSERT(pHandle);*/
  if (pHandle == NULL) {
S
Shengliang Guan 已提交
298 299
    tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId,
            TD_VID(pTq->pVnode), pReq->subKey);
L
Liu Jicong 已提交
300 301 302 303 304
    return -1;
  }

  // check rebalance
  if (pHandle->consumerId != consumerId) {
S
Shengliang Guan 已提交
305 306
    tqError("tmq poll: consumer handle mismatch for consumer:%" PRId64
            ", in vgId:%d, subkey %s, handle consumer id %" PRId64,
L
Liu Jicong 已提交
307 308 309 310 311 312 313 314 315 316
            consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId);
    return -1;
  }

  // update epoch if need
  int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
  while (consumerEpoch < reqEpoch) {
    consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
  }

L
Liu Jicong 已提交
317 318 319 320 321
  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
  tqDebug("tmq poll: consumer %ld (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
          pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);

L
Liu Jicong 已提交
322 323 324
  SMqDataRsp dataRsp = {0};
  tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);

L
Liu Jicong 已提交
325 326 327 328 329 330 331
  // 2.reset offset if needed
  if (reqOffset.type > 0) {
    fetchOffsetNew = reqOffset;
  } else {
    STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
    if (pOffset != NULL) {
      fetchOffsetNew = pOffset->val;
L
Liu Jicong 已提交
332 333
      char formatBuf[80];
      tFormatOffset(formatBuf, 80, &fetchOffsetNew);
L
Liu Jicong 已提交
334 335
      tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey,
              TD_VID(pTq->pVnode), formatBuf);
L
Liu Jicong 已提交
336 337
    } else {
      if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
338
        if (pReq->useSnapshot && pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
339 340 341 342 343 344 345 346 347 348
          if (!pHandle->fetchMeta) {
            tqOffsetResetToData(&fetchOffsetNew, 0, 0);
          } else {
            // reset to meta
            ASSERT(0);
          }
        } else {
          tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal));
        }
      } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
L
Liu Jicong 已提交
349
        tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
L
Liu Jicong 已提交
350 351
        tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, offset reset to %ld", consumerId, pHandle->subKey,
                TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
L
Liu Jicong 已提交
352 353 354
        if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
          code = -1;
        }
L
Liu Jicong 已提交
355
        goto OVER;
L
Liu Jicong 已提交
356
      } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
357 358
        tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64
                " in vg %d, subkey %s, reset none failed",
L
Liu Jicong 已提交
359
                pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
L
Liu Jicong 已提交
360
        terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
L
Liu Jicong 已提交
361 362
        code = -1;
        goto OVER;
L
Liu Jicong 已提交
363 364 365 366 367
      }
    }
  }

  // 3.query
L
Liu Jicong 已提交
368
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
369 370 371
    /*if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {*/
    /*fetchOffsetNew.version++;*/
    /*}*/
L
Liu Jicong 已提交
372
    if (tqScan(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) {
L
Liu Jicong 已提交
373 374 375 376 377
      ASSERT(0);
      code = -1;
      goto OVER;
    }
    if (dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
378
      // TODO add to async task pool
L
Liu Jicong 已提交
379
      /*dataRsp.rspOffset.version--;*/
L
Liu Jicong 已提交
380 381 382 383 384 385 386
    }
    if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
      code = -1;
    }
    goto OVER;
  }

L
Liu Jicong 已提交
387
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
388 389
    int64_t fetchVer = fetchOffsetNew.version + 1;
    pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
L
Liu Jicong 已提交
390
    if (pCkHead == NULL) {
L
Liu Jicong 已提交
391 392
      code = -1;
      goto OVER;
L
Liu Jicong 已提交
393 394 395 396 397 398 399
    }

    walSetReaderCapacity(pHandle->pWalReader, 2048);

    while (1) {
      consumerEpoch = atomic_load_32(&pHandle->epoch);
      if (consumerEpoch > reqEpoch) {
400
        tqWarn("tmq poll: consumer %ld (epoch %d), subkey %s, vg %d offset %" PRId64
S
Shengliang Guan 已提交
401
               ", found new consumer epoch %d, discard req epoch %d",
402
               consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
L
Liu Jicong 已提交
403 404 405
        break;
      }

L
Liu Jicong 已提交
406
      if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
L
Liu Jicong 已提交
407 408
        // TODO add push mgr

L
Liu Jicong 已提交
409 410
        tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
        ASSERT(dataRsp.rspOffset.version >= dataRsp.reqOffset.version);
L
Liu Jicong 已提交
411 412 413 414 415 416
        if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
          code = -1;
        }
        goto OVER;
      }

L
Liu Jicong 已提交
417
      SWalCont* pHead = &pCkHead->head;
L
Liu Jicong 已提交
418

S
Shengliang Guan 已提交
419 420
      tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
              pReq->epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
L
Liu Jicong 已提交
421 422 423 424

      if (pHead->msgType == TDMT_VND_SUBMIT) {
        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;

L
Liu Jicong 已提交
425
        if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp) < 0) {
L
Liu Jicong 已提交
426
          /*ASSERT(0);*/
L
Liu Jicong 已提交
427 428 429 430 431
        }
        // TODO batch optimization:
        // TODO continue scan until meeting batch requirement
        if (dataRsp.blockNum > 0 /* threshold */) {
          tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
L
Liu Jicong 已提交
432
          ASSERT(dataRsp.rspOffset.version >= dataRsp.reqOffset.version);
L
Liu Jicong 已提交
433 434 435 436

          if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
            code = -1;
          }
L
Liu Jicong 已提交
437
          goto OVER;
L
Liu Jicong 已提交
438 439 440 441 442 443 444
        } else {
          fetchVer++;
        }

      } else {
        ASSERT(pHandle->fetchMeta);
        ASSERT(IS_META_MSG(pHead->msgType));
L
Liu Jicong 已提交
445
        tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
L
Liu Jicong 已提交
446
        SMqMetaRsp metaRsp = {0};
447
        /*metaRsp.reqOffset = pReq->reqOffset.version;*/
wmmhello's avatar
wmmhello 已提交
448
        metaRsp.rspOffset = fetchVer;
449 450 451
        /*metaRsp.rspOffsetNew.version = fetchVer;*/
        tqOffsetResetToLog(&metaRsp.reqOffsetNew, pReq->reqOffset.version);
        tqOffsetResetToLog(&metaRsp.rspOffsetNew, fetchVer);
L
Liu Jicong 已提交
452 453 454 455 456 457 458 459 460 461 462 463
        metaRsp.resMsgType = pHead->msgType;
        metaRsp.metaRspLen = pHead->bodyLen;
        metaRsp.metaRsp = pHead->body;
        if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
          code = -1;
          goto OVER;
        }
        code = 0;
        goto OVER;
      }
    }

L
Liu Jicong 已提交
464
    taosMemoryFree(pCkHead);
L
Liu Jicong 已提交
465
#if 0
L
Liu Jicong 已提交
466
  } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) {
S
Shengliang Guan 已提交
467
    tqInfo("retrieve using snapshot actual offset: uid %" PRId64 " ts %" PRId64, fetchOffsetNew.uid, fetchOffsetNew.ts);
L
Liu Jicong 已提交
468
    if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
L
Liu Jicong 已提交
469 470 471 472
      ASSERT(0);
    }

    // 4. send rsp
L
Liu Jicong 已提交
473 474
    if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
      code = -1;
L
Liu Jicong 已提交
475
    }
L
Liu Jicong 已提交
476
#endif
L
Liu Jicong 已提交
477 478 479 480 481
  } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) {
    ASSERT(0);
  }

OVER:
L
Liu Jicong 已提交
482
  if (pCkHead) taosMemoryFree(pCkHead);
L
Liu Jicong 已提交
483 484 485 486 487 488 489 490 491 492 493 494 495 496 497
  // TODO wrap in destroy func
  taosArrayDestroy(dataRsp.blockDataLen);
  taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree);

  if (dataRsp.withSchema) {
    taosArrayDestroyP(dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
  }

  if (dataRsp.withTbName) {
    taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree);
  }

  return code;
}

L
Liu Jicong 已提交
498 499
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
L
Liu Jicong 已提交
500

L
Liu Jicong 已提交
501
  int32_t code = taosHashRemove(pTq->handles, pReq->subKey, strlen(pReq->subKey));
L
Liu Jicong 已提交
502
  ASSERT(code == 0);
503

L
Liu Jicong 已提交
504 505
  tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);

L
Liu Jicong 已提交
506
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
507 508
    ASSERT(0);
  }
L
Liu Jicong 已提交
509
  return 0;
L
Liu Jicong 已提交
510 511
}

L
Liu Jicong 已提交
512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen) {
  SCheckAlterInfo info = {0};
  SDecoder        decoder;
  tDecoderInit(&decoder, msg, msgLen);
  if (tDecodeSCheckAlterInfo(&decoder, &info) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
  if (taosHashPut(pTq->pAlterInfo, info.topic, strlen(info.topic), &info, sizeof(SCheckAlterInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
528
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
529
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
530 531
  tDecodeSMqRebVgReq(msg, &req);
  // todo lock
L
Liu Jicong 已提交
532 533
  STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
L
Liu Jicong 已提交
534 535
    ASSERT(req.oldConsumerId == -1);
    ASSERT(req.newConsumerId != -1);
L
Liu Jicong 已提交
536 537
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
538 539
    /*taosInitRWLatch(&pExec->lock);*/

L
Liu Jicong 已提交
540 541 542
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
543

L
Liu Jicong 已提交
544
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
545
    pHandle->fetchMeta = req.withMeta;
546 547 548 549 550 551 552
    // TODO version should be assigned and refed during preprocess
    SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
    if (pRef == NULL) {
      ASSERT(0);
    }
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
553

L
Liu Jicong 已提交
554
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
555
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
556
      pHandle->snapshotVer = ver;
L
Liu Jicong 已提交
557
      req.qmsg = NULL;
L
Liu Jicong 已提交
558 559 560 561 562 563 564 565
      SReadHandle handle = {
          .meta = pTq->pVnode->pMeta,
          .vnode = pTq->pVnode,
          .initTableReader = true,
          .initTqReader = true,
          .version = ver,
      };
      pHandle->execHandle.execCol.task =
566
          qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols,
L
Liu Jicong 已提交
567
                                   &pHandle->execHandle.pSchemaWrapper);
L
Liu Jicong 已提交
568 569 570 571 572 573
      ASSERT(pHandle->execHandle.execCol.task);
      void* scanner = NULL;
      qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner);
      ASSERT(scanner);
      pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
      ASSERT(pHandle->execHandle.pExecReader);
L
Liu Jicong 已提交
574
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
575 576
      pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);

L
Liu Jicong 已提交
577
      pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
L
Liu Jicong 已提交
578
      pHandle->execHandle.execDb.pFilterOutTbUid =
L
Liu Jicong 已提交
579 580
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
581 582
      pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);

L
Liu Jicong 已提交
583
      pHandle->execHandle.execTb.suid = req.suid;
L
Liu Jicong 已提交
584
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
H
Hongze Cheng 已提交
585
      vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
S
Shengliang Guan 已提交
586
      tqDebug("vgId:%d, tq try get suid:%" PRId64, pTq->pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
587 588
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
S
Shengliang Guan 已提交
589
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
L
Liu Jicong 已提交
590
      }
L
Liu Jicong 已提交
591 592
      pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
      tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
L
Liu Jicong 已提交
593
      taosArrayDestroy(tbUidList);
L
Liu Jicong 已提交
594
    }
L
Liu Jicong 已提交
595
    taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
P
plum-lihui 已提交
596
    tqDebug("try to persist handle %s consumer %ld", req.subKey, pHandle->consumerId);
L
Liu Jicong 已提交
597 598
    if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
      // TODO
P
plum-lihui 已提交
599
      ASSERT(0);
L
Liu Jicong 已提交
600
    }
L
Liu Jicong 已提交
601
  } else {
L
Liu Jicong 已提交
602
    /*ASSERT(pExec->consumerId == req.oldConsumerId);*/
L
Liu Jicong 已提交
603
    // TODO handle qmsg and exec modification
L
Liu Jicong 已提交
604 605 606
    atomic_store_32(&pHandle->epoch, -1);
    atomic_store_64(&pHandle->consumerId, req.newConsumerId);
    atomic_add_fetch_32(&pHandle->epoch, 1);
L
Liu Jicong 已提交
607 608 609
    if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
      // TODO
    }
L
Liu Jicong 已提交
610
  }
L
Liu Jicong 已提交
611

L
Liu Jicong 已提交
612
  return 0;
L
Liu Jicong 已提交
613
}
614

L
Liu Jicong 已提交
615
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
616 617 618 619 620 621 622 623 624 625
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  if (tDecodeSStreamTask(&decoder, pTask) < 0) {
    ASSERT(0);
  }
  tDecoderClear(&decoder);
626
  ASSERT(pTask->isDataScan == 0 || pTask->isDataScan == 1);
L
Liu Jicong 已提交
627 628 629
  if (pTask->isDataScan == 0 && pTask->sinkType == TASK_SINK__NONE) {
    ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
  }
L
Liu Jicong 已提交
630

L
Liu Jicong 已提交
631
  pTask->execStatus = TASK_EXEC_STATUS__IDLE;
L
Liu Jicong 已提交
632 633 634

  pTask->inputQueue = streamQueueOpen();
  pTask->outputQueue = streamQueueOpen();
L
Liu Jicong 已提交
635 636 637
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;

L
Liu Jicong 已提交
638
  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) goto FAIL;
L
Liu Jicong 已提交
639

640 641
  pTask->pMsgCb = &pTq->pVnode->msgCb;

L
Liu Jicong 已提交
642
  // exec
L
Liu Jicong 已提交
643 644
  if (pTask->execType != TASK_EXEC__NONE) {
    // expand runners
L
Liu Jicong 已提交
645
    if (pTask->isDataScan) {
646 647 648
      SReadHandle handle = {
          .meta = pTq->pVnode->pMeta,
          .vnode = pTq->pVnode,
L
Liu Jicong 已提交
649
          .initTqReader = 1,
L
Liu Jicong 已提交
650 651 652
      };
      pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
    } else {
5
54liuyao 已提交
653 654 655 656 657
      SReadHandle mgHandle = {
          .vnode = NULL,
          .numOfVgroups = pTask->numOfVgroups,
      };
      pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
L
Liu Jicong 已提交
658
    }
L
Liu Jicong 已提交
659
    ASSERT(pTask->exec.executor);
L
Liu Jicong 已提交
660
  }
L
Liu Jicong 已提交
661 662

  // sink
L
Liu Jicong 已提交
663
  /*pTask->ahandle = pTq->pVnode;*/
L
Liu Jicong 已提交
664
  if (pTask->sinkType == TASK_SINK__SMA) {
L
Liu Jicong 已提交
665
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
666
    pTask->smaSink.smaSink = smaHandleRes;
L
Liu Jicong 已提交
667
  } else if (pTask->sinkType == TASK_SINK__TABLE) {
L
Liu Jicong 已提交
668 669 670
    pTask->tbSink.vnode = pTq->pVnode;
    pTask->tbSink.tbSinkFunc = tqTableSink;

L
Liu Jicong 已提交
671 672
    ASSERT(pTask->tbSink.pSchemaWrapper);
    ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
L
Liu Jicong 已提交
673

L
Liu Jicong 已提交
674
    pTask->tbSink.pTSchema =
C
Cary Xu 已提交
675
        tdGetSTSChemaFromSSChema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, 1);
L
Liu Jicong 已提交
676
    ASSERT(pTask->tbSink.pTSchema);
L
Liu Jicong 已提交
677
  }
678 679 680

  streamSetupTrigger(pTask);

681 682
  tqInfo("deploy stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
         pTask->selfChildId);
L
Liu Jicong 已提交
683

684
  taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
L
Liu Jicong 已提交
685

L
Liu Jicong 已提交
686 687
  return 0;
FAIL:
L
Liu Jicong 已提交
688 689
  if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
  if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
L
Liu Jicong 已提交
690
  if (pTask) taosMemoryFree(pTask);
L
Liu Jicong 已提交
691 692
  return -1;
}
L
Liu Jicong 已提交
693

L
Liu Jicong 已提交
694
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
L
Liu Jicong 已提交
695 696 697
  void*              pIter = NULL;
  bool               failed = false;
  SStreamDataSubmit* pSubmit = NULL;
L
Liu Jicong 已提交
698

L
Liu Jicong 已提交
699
  pSubmit = streamDataSubmitNew(pReq);
L
Liu Jicong 已提交
700
  if (pSubmit == NULL) {
L
Liu Jicong 已提交
701 702
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    qError("failed to create data submit for stream since out of memory");
L
Liu Jicong 已提交
703 704 705 706 707 708
    failed = true;
  }

  while (1) {
    pIter = taosHashIterate(pTq->pStreamTasks, pIter);
    if (pIter == NULL) break;
709
    SStreamTask* pTask = *(SStreamTask**)pIter;
710
    if (!pTask->isDataScan) continue;
L
Liu Jicong 已提交
711

L
Liu Jicong 已提交
712 713
    qDebug("data submit enqueue stream task: %d", pTask->taskId);

L
Liu Jicong 已提交
714 715
    if (!failed) {
      if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
L
Liu Jicong 已提交
716
        qError("stream task input failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
717 718 719
        continue;
      }

L
Liu Jicong 已提交
720
      if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode)) < 0) {
L
Liu Jicong 已提交
721
        qError("stream task launch failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
722 723
        continue;
      }
L
Liu Jicong 已提交
724
    } else {
L
Liu Jicong 已提交
725
      streamTaskInputFail(pTask);
L
Liu Jicong 已提交
726 727 728
    }
  }

L
Liu Jicong 已提交
729
  if (pSubmit) {
L
Liu Jicong 已提交
730
    streamDataSubmitRefDec(pSubmit);
L
Liu Jicong 已提交
731
    taosFreeQitem(pSubmit);
L
Liu Jicong 已提交
732
  }
L
Liu Jicong 已提交
733 734

  return failed ? -1 : 0;
L
Liu Jicong 已提交
735 736
}

L
Liu Jicong 已提交
737
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
738
  //
L
Liu Jicong 已提交
739 740
  SStreamTaskRunReq* pReq = pMsg->pCont;
  int32_t            taskId = pReq->taskId;
L
Liu Jicong 已提交
741 742 743
  SStreamTask**      ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
  if (ppTask) {
    streamProcessRunReq(*ppTask);
L
Liu Jicong 已提交
744
    return 0;
745 746
  } else {
    return -1;
L
Liu Jicong 已提交
747
  }
L
Liu Jicong 已提交
748 749 750
}

int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
751 752 753 754 755 756 757
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamDispatchReq req;
  SDecoder           decoder;
  tDecoderInit(&decoder, msgBody, msgLen);
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
758 759 760
  int32_t       taskId = req.taskId;
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
  if (ppTask) {
761 762 763 764
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
765
    streamProcessDispatchReq(*ppTask, &req, &rsp);
L
Liu Jicong 已提交
766
    return 0;
767 768
  } else {
    return -1;
L
Liu Jicong 已提交
769
  }
L
Liu Jicong 已提交
770 771 772 773 774
}

int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRecoverReq* pReq = pMsg->pCont;
  int32_t                taskId = pReq->taskId;
L
Liu Jicong 已提交
775 776 777
  SStreamTask**          ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
  if (ppTask) {
    streamProcessRecoverReq(*ppTask, pReq, pMsg);
L
Liu Jicong 已提交
778
    return 0;
779 780
  } else {
    return -1;
L
Liu Jicong 已提交
781
  }
L
Liu Jicong 已提交
782 783 784
}

int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
785
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
L
Liu Jicong 已提交
786
  int32_t             taskId = pRsp->taskId;
L
Liu Jicong 已提交
787 788 789
  SStreamTask**       ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
  if (ppTask) {
    streamProcessDispatchRsp(*ppTask, pRsp);
L
Liu Jicong 已提交
790
    return 0;
791 792
  } else {
    return -1;
L
Liu Jicong 已提交
793
  }
L
Liu Jicong 已提交
794 795 796 797 798
}

int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
  int32_t                taskId = pRsp->taskId;
L
Liu Jicong 已提交
799 800 801
  SStreamTask**          ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
  if (ppTask) {
    streamProcessRecoverRsp(*ppTask, pRsp);
L
Liu Jicong 已提交
802
    return 0;
803 804
  } else {
    return -1;
L
Liu Jicong 已提交
805
  }
L
Liu Jicong 已提交
806
}
L
Liu Jicong 已提交
807 808 809

int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
L
Liu Jicong 已提交
810

L
Liu Jicong 已提交
811 812
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
  if (ppTask) {
L
Liu Jicong 已提交
813
    SStreamTask* pTask = *ppTask;
814
    taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
L
Liu Jicong 已提交
815
    atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
816
  }
L
Liu Jicong 已提交
817 818 819 820 821 822 823 824
  // todo
  // clear queue
  // push drop req into queue
  // launch exec to free memory
  // remove from hash
  return 0;

#if 0
L
Liu Jicong 已提交
825
  int32_t              code = taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
L
Liu Jicong 已提交
826
  // set status dropping
L
Liu Jicong 已提交
827
  ASSERT(code == 0);
L
Liu Jicong 已提交
828 829 830 831
  if (code == 0) {
    // sendrsp
  }
  return code;
L
Liu Jicong 已提交
832
#endif
L
Liu Jicong 已提交
833
}
L
Liu Jicong 已提交
834 835 836 837 838 839 840 841 842

int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
  tDecoderInit(&decoder, msgBody, msgLen);
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
843 844 845 846 847 848 849 850 851 852
  int32_t       taskId = req.dstTaskId;
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
  if (ppTask) {
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
    streamProcessRetrieveReq(*ppTask, &req, &rsp);
  } else {
    return -1;
L
Liu Jicong 已提交
853 854 855 856 857 858 859 860
  }
  return 0;
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}