tq.c 35.2 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

L
Liu Jicong 已提交
18
int32_t tqInit() {
L
Liu Jicong 已提交
19 20 21 22 23 24
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

25 26 27 28 29 30
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
31 32 33
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
34
    atomic_store_8(&tqMgmt.inited, 1);
35
  }
36

L
Liu Jicong 已提交
37 38
  return 0;
}
L
Liu Jicong 已提交
39

40
void tqCleanUp() {
L
Liu Jicong 已提交
41 42 43 44 45 46 47 48
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
49
    streamCleanUp();
L
Liu Jicong 已提交
50 51
    atomic_store_8(&tqMgmt.inited, 0);
  }
52
}
L
Liu Jicong 已提交
53

54 55 56 57 58 59 60 61
static void destroySTqHandle(void* data) {
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
    tqCloseReader(pData->execHandle.pExecReader);
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
L
Liu Jicong 已提交
62
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
63 64 65 66 67
    walCloseReader(pData->pWalReader);
    tqCloseReader(pData->execHandle.pExecReader);
  }
}

L
Liu Jicong 已提交
68
STQ* tqOpen(const char* path, SVnode* pVnode) {
69
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
70
  if (pTq == NULL) {
L
Liu Jicong 已提交
71
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
72 73
    return NULL;
  }
H
Hongze Cheng 已提交
74
  pTq->path = strdup(path);
L
Liu Jicong 已提交
75
  pTq->pVnode = pVnode;
76

77
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
78

79 80
  taosHashSetFreeFp(pTq->pHandle, destroySTqHandle);

81
  pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
82

83
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
84

L
Liu Jicong 已提交
85
  if (tqMetaOpen(pTq) < 0) {
86 87 88
    ASSERT(0);
  }

89 90 91 92
  if (tqOffsetOpen(pTq) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
93 94 95 96 97
  pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask);
  if (pTq->pStreamMeta == NULL) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
98 99 100 101
  if (streamLoadTasks(pTq->pStreamMeta) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
102 103
  return pTq;
}
L
Liu Jicong 已提交
104

L
Liu Jicong 已提交
105
void tqClose(STQ* pTq) {
H
Hongze Cheng 已提交
106
  if (pTq) {
107
    tqOffsetClose(pTq->pOffsetStore);
108 109 110
    taosHashCleanup(pTq->pHandle);
    taosHashCleanup(pTq->pPushMgr);
    taosHashCleanup(pTq->pCheckInfo);
111
    taosMemoryFree(pTq->path);
L
Liu Jicong 已提交
112
    tqMetaClose(pTq);
L
Liu Jicong 已提交
113
    streamMetaClose(pTq->pStreamMeta);
wafwerar's avatar
wafwerar 已提交
114
    taosMemoryFree(pTq);
H
Hongze Cheng 已提交
115
  }
L
Liu Jicong 已提交
116
}
L
Liu Jicong 已提交
117

L
Liu Jicong 已提交
118
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
119 120 121 122 123 124 125
  int32_t len = 0;
  int32_t code = 0;
  tEncodeSize(tEncodeSMqMetaRsp, pRsp, len, code);
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
L
Liu Jicong 已提交
126 127 128 129 130 131 132 133 134 135
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_META_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
136 137 138 139 140

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
  tEncodeSMqMetaRsp(&encoder, pRsp);
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
141 142 143 144 145 146 147 148 149

  SRpcMsg resp = {
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
  tmsgSendRsp(&resp);

150 151
  tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, offset type:%d",
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
L
Liu Jicong 已提交
152 153 154 155

  return 0;
}

L
Liu Jicong 已提交
156 157 158 159
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) {
  ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
  ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);

L
Liu Jicong 已提交
160 161
  ASSERT(!pRsp->withSchema);
  ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
L
Liu Jicong 已提交
162

163 164 165 166 167 168
  if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
    if (pRsp->blockNum > 0) {
      ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
    } else {
      ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
    }
L
Liu Jicong 已提交
169 170
  }

wmmhello's avatar
wmmhello 已提交
171 172
  int32_t len = 0;
  int32_t code = 0;
L
Liu Jicong 已提交
173 174 175 176 177
  tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
L
Liu Jicong 已提交
178 179 180 181 182 183 184 185 186 187 188
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

wmmhello's avatar
wmmhello 已提交
189
  SEncoder encoder = {0};
L
Liu Jicong 已提交
190
  tEncoderInit(&encoder, abuf, len);
L
Liu Jicong 已提交
191
  tEncodeSMqDataRsp(&encoder, pRsp);
wmmhello's avatar
wmmhello 已提交
192
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
193 194

  SRpcMsg rsp = {
L
Liu Jicong 已提交
195 196 197 198 199
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
L
Liu Jicong 已提交
200
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
201

wmmhello's avatar
wmmhello 已提交
202 203
  char buf1[80] = {0};
  char buf2[80] = {0};
L
Liu Jicong 已提交
204 205
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);
S
Shengliang Guan 已提交
206
  tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
L
Liu Jicong 已提交
207
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
L
Liu Jicong 已提交
208 209 210 211

  return 0;
}

212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
  ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
  ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);

  if (pRsp->withSchema) {
    ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
  } else {
    ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
  }

  if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
    if (pRsp->blockNum > 0) {
      ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
    } else {
      ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
    }
  }

  int32_t len = 0;
  int32_t code = 0;
  tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTaosxRsp(&encoder, pRsp);
  tEncoderClear(&encoder);

  SRpcMsg rsp = {
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
  tmsgSendRsp(&rsp);

  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);
  tqDebug("taosx rsp, vgId:%d, from consumer:%" PRId64
          ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);

  return 0;
}

272 273 274 275 276 277
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
278 279 280 281 282 283 284 285 286
  STqOffset offset = {0};
  SDecoder  decoder;
  tDecoderInit(&decoder, msg, msgLen);
  if (tDecodeSTqOffset(&decoder, &offset) < 0) {
    ASSERT(0);
    return -1;
  }
  tDecoderClear(&decoder);

wmmhello's avatar
wmmhello 已提交
287
  if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
288 289
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
            offset.subKey, TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts);
L
Liu Jicong 已提交
290
  } else if (offset.val.type == TMQ_OFFSET__LOG) {
S
Shengliang Guan 已提交
291
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
L
Liu Jicong 已提交
292
            TD_VID(pTq->pVnode), offset.val.version);
293 294 295
    if (offset.val.version + 1 == version) {
      offset.val.version += 1;
    }
296 297 298
  } else {
    ASSERT(0);
  }
299 300 301 302 303
  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
  if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) {
    return 0;
  }

304 305 306
  if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
    ASSERT(0);
    return -1;
307
  }
308 309

  if (offset.val.type == TMQ_OFFSET__LOG) {
310
    STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
L
Liu Jicong 已提交
311 312 313 314 315
    if (pHandle) {
      if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
        ASSERT(0);
        return -1;
      }
316 317 318
    }
  }

319 320
  // rsp

321 322
  /*}*/
  /*}*/
323 324 325 326

  return 0;
}

L
Liu Jicong 已提交
327
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
328 329
  void* pIter = NULL;
  while (1) {
330
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
331
    if (pIter == NULL) break;
332
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
L
Liu Jicong 已提交
333 334
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
335
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
336 337
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
338
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
339 340 341 342 343 344 345 346
          return -1;
        }
      }
    }
  }
  return 0;
}

L
Liu Jicong 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL) {
    return -1;
  }

  pRsp->withTbName = pReq->withTbName;
  if (pRsp->withTbName) {
    pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
    if (pRsp->blockTbName == NULL) {
      // TODO free
      return -1;
    }
  }

  if (subType == TOPIC_SUB_TYPE__COLUMN) {
    pRsp->withSchema = false;
  } else {
    pRsp->withSchema = true;
    pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
    if (pRsp->blockSchema == NULL) {
      // TODO free
      return -1;
    }
  }
  return 0;
}

379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->withTbName = 1;
  pRsp->withSchema = 1;
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
395
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
396 397 398 399 400 401
  SMqPollReq*  pReq = pMsg->pCont;
  int64_t      consumerId = pReq->consumerId;
  int32_t      reqEpoch = pReq->epoch;
  int32_t      code = 0;
  STqOffsetVal reqOffset = pReq->reqOffset;
  STqOffsetVal fetchOffsetNew;
wmmhello's avatar
wmmhello 已提交
402
  SWalCkHead*  pCkHead = NULL;
L
Liu Jicong 已提交
403 404

  // 1.find handle
405
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
L
Liu Jicong 已提交
406 407
  /*ASSERT(pHandle);*/
  if (pHandle == NULL) {
S
Shengliang Guan 已提交
408 409
    tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId,
            TD_VID(pTq->pVnode), pReq->subKey);
L
Liu Jicong 已提交
410 411 412 413 414
    return -1;
  }

  // check rebalance
  if (pHandle->consumerId != consumerId) {
S
Shengliang Guan 已提交
415 416
    tqError("tmq poll: consumer handle mismatch for consumer:%" PRId64
            ", in vgId:%d, subkey %s, handle consumer id %" PRId64,
L
Liu Jicong 已提交
417
            consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId);
L
Liu Jicong 已提交
418
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
L
Liu Jicong 已提交
419 420 421 422 423 424 425 426 427
    return -1;
  }

  // update epoch if need
  int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
  while (consumerEpoch < reqEpoch) {
    consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
  }

L
Liu Jicong 已提交
428 429
  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
S
Shengliang Guan 已提交
430
  tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
L
Liu Jicong 已提交
431 432
          pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);

L
Liu Jicong 已提交
433 434 435 436 437 438 439
  // 2.reset offset if needed
  if (reqOffset.type > 0) {
    fetchOffsetNew = reqOffset;
  } else {
    STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
    if (pOffset != NULL) {
      fetchOffsetNew = pOffset->val;
L
Liu Jicong 已提交
440 441
      char formatBuf[80];
      tFormatOffset(formatBuf, 80, &fetchOffsetNew);
L
Liu Jicong 已提交
442 443
      tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey,
              TD_VID(pTq->pVnode), formatBuf);
L
Liu Jicong 已提交
444 445
    } else {
      if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
L
Liu Jicong 已提交
446 447
        if (pReq->useSnapshot) {
          if (pHandle->fetchMeta) {
wmmhello's avatar
wmmhello 已提交
448
            tqOffsetResetToMeta(&fetchOffsetNew, 0);
L
Liu Jicong 已提交
449
          } else {
450
            tqOffsetResetToData(&fetchOffsetNew, 0, 0);
L
Liu Jicong 已提交
451 452 453 454 455
          }
        } else {
          tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal));
        }
      } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
456 457 458
        SMqDataRsp dataRsp = {0};
        tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);

L
Liu Jicong 已提交
459
        tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
S
Shengliang Guan 已提交
460 461
        tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId,
                pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
L
Liu Jicong 已提交
462 463 464
        if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
          code = -1;
        }
L
Liu Jicong 已提交
465 466
        tDeleteSMqDataRsp(&dataRsp);
        return code;
L
Liu Jicong 已提交
467
      } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
468 469
        tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64
                " in vg %d, subkey %s, reset none failed",
L
Liu Jicong 已提交
470
                pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
L
Liu Jicong 已提交
471
        terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
472
        return -1;
L
Liu Jicong 已提交
473 474 475 476
      }
    }
  }

477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
    SMqDataRsp dataRsp = {0};
    tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
    tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);

    if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
      code = -1;
    }

    tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid:%ld, version:%ld",
            consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
            dataRsp.rspOffset.uid, dataRsp.rspOffset.version);

    tDeleteSMqDataRsp(&dataRsp);
    return code;
  }

  // for taosx
  ASSERT(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN);

  SMqMetaRsp metaRsp = {0};

  STaosxRsp taosxRsp = {0};
  tqInitTaosxRsp(&taosxRsp, pReq);

  if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
    tqScan(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew);
L
Liu Jicong 已提交
504

L
Liu Jicong 已提交
505
    if (metaRsp.metaRspLen > 0) {
wmmhello's avatar
wmmhello 已提交
506 507 508
      if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
        code = -1;
      }
L
Liu Jicong 已提交
509 510 511
      tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send meta offset type:%d,uid:%ld,version:%ld", consumerId,
              pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
              metaRsp.rspOffset.version);
wmmhello's avatar
wmmhello 已提交
512
      taosMemoryFree(metaRsp.metaRsp);
513 514
      tDeleteSTaosxRsp(&taosxRsp);
      return code;
wmmhello's avatar
wmmhello 已提交
515 516
    }

517 518
    if (taosxRsp.blockNum > 0) {
      if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
519 520
        code = -1;
      }
521 522
      tDeleteSTaosxRsp(&taosxRsp);
      return code;
L
Liu Jicong 已提交
523
    } else {
524
      fetchOffsetNew = taosxRsp.rspOffset;
525
    }
wmmhello's avatar
wmmhello 已提交
526

527 528 529
    tqDebug("taosx poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld",
            consumerId, pHandle->subKey, TD_VID(pTq->pVnode), taosxRsp.blockNum, taosxRsp.rspOffset.type,
            taosxRsp.rspOffset.uid, taosxRsp.rspOffset.version);
530 531
  }

532
  if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
wmmhello's avatar
wmmhello 已提交
533 534 535
    int64_t fetchVer = fetchOffsetNew.version + 1;
    pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
    if (pCkHead == NULL) {
536
      tDeleteSTaosxRsp(&taosxRsp);
537
      return -1;
538 539
    }

wmmhello's avatar
wmmhello 已提交
540 541 542 543 544 545
    walSetReaderCapacity(pHandle->pWalReader, 2048);

    while (1) {
      consumerEpoch = atomic_load_32(&pHandle->epoch);
      if (consumerEpoch > reqEpoch) {
        tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64
L
Liu Jicong 已提交
546
               ", found new consumer epoch %d, discard req epoch %d",
wmmhello's avatar
wmmhello 已提交
547 548 549 550 551
               consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
        break;
      }

      if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
552 553
        tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
        if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
554 555
          code = -1;
        }
556 557 558
        tDeleteSTaosxRsp(&taosxRsp);
        if (pCkHead) taosMemoryFree(pCkHead);
        return code;
wmmhello's avatar
wmmhello 已提交
559 560 561
      }

      SWalCont* pHead = &pCkHead->head;
wmmhello's avatar
wmmhello 已提交
562

wmmhello's avatar
wmmhello 已提交
563 564 565 566 567 568
      tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
              pReq->epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);

      if (pHead->msgType == TDMT_VND_SUBMIT) {
        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;

569
        if (tqTaosxScanLog(pTq, pHandle, pCont, &taosxRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
570 571 572 573
          /*ASSERT(0);*/
        }
        // TODO batch optimization:
        // TODO continue scan until meeting batch requirement
574 575 576
        if (taosxRsp.blockNum > 0 /* threshold */) {
          tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
          if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
577 578
            code = -1;
          }
579 580 581
          tDeleteSTaosxRsp(&taosxRsp);
          if (pCkHead) taosMemoryFree(pCkHead);
          return code;
wmmhello's avatar
wmmhello 已提交
582 583 584 585 586 587 588 589 590 591 592 593 594 595
        } else {
          fetchVer++;
        }

      } else {
        ASSERT(pHandle->fetchMeta);
        ASSERT(IS_META_MSG(pHead->msgType));
        tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
        tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
        metaRsp.resMsgType = pHead->msgType;
        metaRsp.metaRspLen = pHead->bodyLen;
        metaRsp.metaRsp = pHead->body;
        if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
          code = -1;
596
          taosMemoryFree(pCkHead);
597
          tDeleteSTaosxRsp(&taosxRsp);
598
          return code;
wmmhello's avatar
wmmhello 已提交
599 600
        }
        code = 0;
601
        if (pCkHead) taosMemoryFree(pCkHead);
602
        tDeleteSTaosxRsp(&taosxRsp);
603
        return code;
wmmhello's avatar
wmmhello 已提交
604 605 606
      }
    }
  }
607
  tDeleteSTaosxRsp(&taosxRsp);
608
  return 0;
L
Liu Jicong 已提交
609 610
}

611
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
612
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
L
Liu Jicong 已提交
613

614
  int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
L
Liu Jicong 已提交
615
  ASSERT(code == 0);
616

L
Liu Jicong 已提交
617 618
  tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);

L
Liu Jicong 已提交
619
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
620 621
    ASSERT(0);
  }
L
Liu Jicong 已提交
622
  return 0;
L
Liu Jicong 已提交
623 624
}

625 626 627
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
  STqCheckInfo info = {0};
  SDecoder     decoder;
L
Liu Jicong 已提交
628
  tDecoderInit(&decoder, msg, msgLen);
629
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
630 631 632 633
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
634 635 636 637 638
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
639 640 641 642 643 644
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

645 646 647 648 649 650 651 652 653 654 655 656 657
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
658
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
659 660
  tDecodeSMqRebVgReq(msg, &req);
  // todo lock
661
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
662
  if (pHandle == NULL) {
L
Liu Jicong 已提交
663 664 665 666
    if (req.oldConsumerId != -1) {
      tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey,
              req.newConsumerId, req.oldConsumerId);
    }
L
Liu Jicong 已提交
667 668 669 670
    if (req.newConsumerId == -1) {
      tqError("vgId:%d, tq invalid rebalance request, new consumerId %ld", req.vgId, req.newConsumerId);
      return 0;
    }
L
Liu Jicong 已提交
671 672
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
673 674
    /*taosInitRWLatch(&pExec->lock);*/

L
Liu Jicong 已提交
675 676 677
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
678

L
Liu Jicong 已提交
679
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
680
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
681

682 683 684 685
    // TODO version should be assigned and refed during preprocess
    SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
    if (pRef == NULL) {
      ASSERT(0);
L
Liu Jicong 已提交
686
      return -1;
687 688 689
    }
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
690

691 692 693 694 695 696 697
    SReadHandle handle = {
        .meta = pTq->pVnode->pMeta,
        .vnode = pTq->pVnode,
        .initTableReader = true,
        .initTqReader = true,
        .version = ver,
    };
wmmhello's avatar
wmmhello 已提交
698
    pHandle->snapshotVer = ver;
699

L
Liu Jicong 已提交
700
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
701
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
702
      req.qmsg = NULL;
703 704

      pHandle->execHandle.task =
wmmhello's avatar
wmmhello 已提交
705
          qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols,
L
Liu Jicong 已提交
706
                                   &pHandle->execHandle.pSchemaWrapper);
707
      ASSERT(pHandle->execHandle.task);
L
Liu Jicong 已提交
708
      void* scanner = NULL;
709
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
L
Liu Jicong 已提交
710 711 712
      ASSERT(scanner);
      pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
      ASSERT(pHandle->execHandle.pExecReader);
L
Liu Jicong 已提交
713
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
wmmhello's avatar
wmmhello 已提交
714
      pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
L
Liu Jicong 已提交
715
      pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
L
Liu Jicong 已提交
716
      pHandle->execHandle.execDb.pFilterOutTbUid =
L
Liu Jicong 已提交
717
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
718 719
      buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
720

721
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
L
Liu Jicong 已提交
722
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
wmmhello's avatar
wmmhello 已提交
723 724 725 726
      pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);

      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
727
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
H
Hongze Cheng 已提交
728
      vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
729
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
730 731
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
S
Shengliang Guan 已提交
732
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
L
Liu Jicong 已提交
733
      }
L
Liu Jicong 已提交
734 735
      pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
      tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
L
Liu Jicong 已提交
736
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
737

L
Liu Jicong 已提交
738 739 740
      buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
L
Liu Jicong 已提交
741
    }
742
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
S
Shengliang Guan 已提交
743
    tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
L
Liu Jicong 已提交
744 745
    if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
      // TODO
P
plum-lihui 已提交
746
      ASSERT(0);
L
Liu Jicong 已提交
747
    }
L
Liu Jicong 已提交
748
  } else {
L
Liu Jicong 已提交
749
    /*ASSERT(pExec->consumerId == req.oldConsumerId);*/
L
Liu Jicong 已提交
750
    // TODO handle qmsg and exec modification
L
Liu Jicong 已提交
751 752 753
    atomic_store_32(&pHandle->epoch, -1);
    atomic_store_64(&pHandle->consumerId, req.newConsumerId);
    atomic_add_fetch_32(&pHandle->epoch, 1);
L
Liu Jicong 已提交
754 755 756
    if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
      // TODO
    }
L
Liu Jicong 已提交
757
  }
L
Liu Jicong 已提交
758

L
Liu Jicong 已提交
759
  return 0;
L
Liu Jicong 已提交
760
}
761

L
Liu Jicong 已提交
762
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
763
  if (pTask->taskLevel == TASK_LEVEL__AGG) {
L
Liu Jicong 已提交
764 765
    ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
  }
L
Liu Jicong 已提交
766

L
Liu Jicong 已提交
767
  pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
L
Liu Jicong 已提交
768 769 770

  pTask->inputQueue = streamQueueOpen();
  pTask->outputQueue = streamQueueOpen();
L
Liu Jicong 已提交
771 772

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
773
    return -1;
L
Liu Jicong 已提交
774 775
  }

L
Liu Jicong 已提交
776 777 778
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;

779 780
  pTask->pMsgCb = &pTq->pVnode->msgCb;

781 782
  // expand executor
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
L
Liu Jicong 已提交
783
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false);
784 785 786 787
    if (pTask->pState == NULL) {
      return -1;
    }

788 789 790 791
    SReadHandle handle = {
        .meta = pTq->pVnode->pMeta,
        .vnode = pTq->pVnode,
        .initTqReader = 1,
792
        .pStateBackend = pTask->pState,
793 794 795 796
    };
    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
    ASSERT(pTask->exec.executor);
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
L
Liu Jicong 已提交
797
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false);
798 799 800
    if (pTask->pState == NULL) {
      return -1;
    }
801 802 803
    SReadHandle mgHandle = {
        .vnode = NULL,
        .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
804
        .pStateBackend = pTask->pState,
805 806
    };
    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
L
Liu Jicong 已提交
807
    ASSERT(pTask->exec.executor);
L
Liu Jicong 已提交
808
  }
L
Liu Jicong 已提交
809 810

  // sink
L
Liu Jicong 已提交
811
  /*pTask->ahandle = pTq->pVnode;*/
812
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
813
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
814
    pTask->smaSink.smaSink = smaHandleRes;
815
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
816 817 818
    pTask->tbSink.vnode = pTq->pVnode;
    pTask->tbSink.tbSinkFunc = tqTableSink;

L
Liu Jicong 已提交
819 820
    ASSERT(pTask->tbSink.pSchemaWrapper);
    ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
L
Liu Jicong 已提交
821

L
Liu Jicong 已提交
822
    pTask->tbSink.pTSchema =
C
Cary Xu 已提交
823
        tdGetSTSChemaFromSSChema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, 1);
L
Liu Jicong 已提交
824
    ASSERT(pTask->tbSink.pTSchema);
L
Liu Jicong 已提交
825
  }
826 827 828

  streamSetupTrigger(pTask);

L
Liu Jicong 已提交
829
  tqInfo("expand stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
830
         pTask->selfChildId);
L
Liu Jicong 已提交
831
  return 0;
L
Liu Jicong 已提交
832
}
L
Liu Jicong 已提交
833

834
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
835
  //
836
  return streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
L
Liu Jicong 已提交
837
}
L
Liu Jicong 已提交
838

L
Liu Jicong 已提交
839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
  bool        failed = false;
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    failed = true;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t sz = taosArrayGetSize(pRes->uidList);
L
Liu Jicong 已提交
855
  if (sz == 0 || pRes->affectedRows == 0) {
L
Liu Jicong 已提交
856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879
    taosArrayDestroy(pRes->uidList);
    return 0;
  }
  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, sz);
  pDelBlock->info.rows = sz;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < sz; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataAppend(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataAppend(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataAppend(pUidCol, i, (const char*)pUid, false);

    colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

L
Liu Jicong 已提交
880 881
  taosArrayDestroy(pRes->uidList);

L
Liu Jicong 已提交
882 883 884
  int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
  *pRef = 1;

L
Liu Jicong 已提交
885 886 887 888 889 890 891 892 893
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
    if (pIter == NULL) break;
    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;

    qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);

L
Liu Jicong 已提交
894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920
    if (!failed) {
      SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM);
      pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
      pRefBlock->pBlock = pDelBlock;
      pRefBlock->dataRef = pRef;
      atomic_add_fetch_32(pRefBlock->dataRef, 1);

      if (streamTaskInput(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
        qError("stream task input del failed, task id %d", pTask->taskId);
        continue;
      }
      if (streamSchedExec(pTask) < 0) {
        qError("stream task launch failed, task id %d", pTask->taskId);
        continue;
      }
    } else {
      streamTaskInputFail(pTask);
    }
  }
  int32_t ref = atomic_sub_fetch_32(pRef, 1);
  ASSERT(ref >= 0);
  if (ref == 0) {
    taosMemoryFree(pDelBlock);
    taosMemoryFree(pRef);
  }

#if 0
L
Liu Jicong 已提交
921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942
    SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
    pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
    pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
    SSDataBlock block = {0};
    assignOneDataBlock(&block, pDelBlock);
    block.info.type = STREAM_DELETE_DATA;
    taosArrayPush(pStreamBlock->blocks, &block);

    if (!failed) {
      if (streamTaskInput(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
        qError("stream task input del failed, task id %d", pTask->taskId);
        continue;
      }

      if (streamSchedExec(pTask) < 0) {
        qError("stream task launch failed, task id %d", pTask->taskId);
        continue;
      }
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
943
  blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
944
#endif
L
Liu Jicong 已提交
945 946 947 948 949

  return 0;
}

int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
L
Liu Jicong 已提交
950 951 952
  void*              pIter = NULL;
  bool               failed = false;
  SStreamDataSubmit* pSubmit = NULL;
L
Liu Jicong 已提交
953

L
Liu Jicong 已提交
954
  pSubmit = streamDataSubmitNew(pReq);
L
Liu Jicong 已提交
955
  if (pSubmit == NULL) {
L
Liu Jicong 已提交
956 957
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    qError("failed to create data submit for stream since out of memory");
L
Liu Jicong 已提交
958 959 960 961
    failed = true;
  }

  while (1) {
L
Liu Jicong 已提交
962
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
L
Liu Jicong 已提交
963
    if (pIter == NULL) break;
964
    SStreamTask* pTask = *(SStreamTask**)pIter;
965
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
L
Liu Jicong 已提交
966

S
Shengliang Guan 已提交
967
    qDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);
L
Liu Jicong 已提交
968

L
Liu Jicong 已提交
969 970
    if (!failed) {
      if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
L
Liu Jicong 已提交
971
        qError("stream task input failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
972 973 974
        continue;
      }

L
Liu Jicong 已提交
975
      if (streamSchedExec(pTask) < 0) {
L
Liu Jicong 已提交
976
        qError("stream task launch failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
977 978
        continue;
      }
L
Liu Jicong 已提交
979
    } else {
L
Liu Jicong 已提交
980
      streamTaskInputFail(pTask);
L
Liu Jicong 已提交
981 982 983
    }
  }

L
Liu Jicong 已提交
984
  if (pSubmit) {
L
Liu Jicong 已提交
985
    streamDataSubmitRefDec(pSubmit);
L
Liu Jicong 已提交
986
    taosFreeQitem(pSubmit);
L
Liu Jicong 已提交
987
  }
L
Liu Jicong 已提交
988 989

  return failed ? -1 : 0;
L
Liu Jicong 已提交
990 991
}

L
Liu Jicong 已提交
992
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
993
  //
L
Liu Jicong 已提交
994 995
  SStreamTaskRunReq* pReq = pMsg->pCont;
  int32_t            taskId = pReq->taskId;
L
Liu Jicong 已提交
996 997 998
  SStreamTask*       pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
    streamProcessRunReq(pTask);
L
Liu Jicong 已提交
999
    return 0;
1000 1001
  } else {
    return -1;
L
Liu Jicong 已提交
1002
  }
L
Liu Jicong 已提交
1003 1004
}

L
Liu Jicong 已提交
1005
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
L
Liu Jicong 已提交
1006
  ASSERT(0);
1007 1008 1009 1010 1011
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamDispatchReq req;
  SDecoder           decoder;
L
Liu Jicong 已提交
1012
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
1013
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
1014 1015 1016 1017
  int32_t taskId = req.taskId;

  SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
1018 1019 1020 1021
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1022
    streamProcessDispatchReq(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
1023
    return 0;
1024 1025
  } else {
    return -1;
L
Liu Jicong 已提交
1026
  }
L
Liu Jicong 已提交
1027 1028
}

L
Liu Jicong 已提交
1029
#if 0
L
Liu Jicong 已提交
1030 1031 1032
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRecoverReq* pReq = pMsg->pCont;
  int32_t                taskId = pReq->taskId;
L
Liu Jicong 已提交
1033 1034 1035
  SStreamTask*           pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
    streamProcessRecoverReq(pTask, pReq, pMsg);
L
Liu Jicong 已提交
1036
    return 0;
1037 1038
  } else {
    return -1;
L
Liu Jicong 已提交
1039
  }
L
Liu Jicong 已提交
1040 1041
}

L
Liu Jicong 已提交
1042 1043 1044 1045 1046
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
  int32_t                taskId = pRsp->rspTaskId;

  SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1047
  if (pTask) {
L
Liu Jicong 已提交
1048
    streamProcessRecoverRsp(pTask, pRsp);
L
Liu Jicong 已提交
1049
    return 0;
1050 1051
  } else {
    return -1;
L
Liu Jicong 已提交
1052
  }
L
Liu Jicong 已提交
1053
}
L
Liu Jicong 已提交
1054
#endif
L
Liu Jicong 已提交
1055

L
Liu Jicong 已提交
1056 1057 1058 1059
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t             taskId = pRsp->taskId;
  SStreamTask*        pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1060
  if (pTask) {
L
Liu Jicong 已提交
1061
    streamProcessDispatchRsp(pTask, pRsp);
L
Liu Jicong 已提交
1062
    return 0;
1063 1064
  } else {
    return -1;
L
Liu Jicong 已提交
1065
  }
L
Liu Jicong 已提交
1066
}
L
Liu Jicong 已提交
1067

1068
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1069
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
L
Liu Jicong 已提交
1070

L
Liu Jicong 已提交
1071
  return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1072
}
L
Liu Jicong 已提交
1073 1074 1075 1076 1077 1078 1079 1080 1081

int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
  tDecoderInit(&decoder, msgBody, msgLen);
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1082
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1083 1084 1085
  int32_t      taskId = req.dstTaskId;
  SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
L
Liu Jicong 已提交
1086 1087 1088 1089
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1090
    streamProcessRetrieveReq(pTask, &req, &rsp);
L
Liu Jicong 已提交
1091
    tDeleteStreamRetrieveReq(&req);
L
Liu Jicong 已提交
1092
    return 0;
L
Liu Jicong 已提交
1093 1094
  } else {
    return -1;
L
Liu Jicong 已提交
1095 1096 1097 1098 1099 1100 1101
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114

void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*    pTq = pVnode->pTq;
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t code = 0;

  SStreamDispatchReq req;
  SDecoder           decoder;
  tDecoderInit(&decoder, msgBody, msgLen);
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1115
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1116 1117
    goto FAIL;
  }
L
Liu Jicong 已提交
1118
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1119

L
Liu Jicong 已提交
1120
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1121

L
Liu Jicong 已提交
1122 1123
  SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
L
Liu Jicong 已提交
1124 1125 1126 1127
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1128
    streamProcessDispatchReq(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1129 1130
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
L
Liu Jicong 已提交
1131 1132
    return;
  }
L
Liu Jicong 已提交
1133

L
Liu Jicong 已提交
1134 1135 1136 1137 1138 1139 1140
FAIL:
  if (pMsg->info.handle == NULL) return;
  SRpcMsg rsp = {
      .code = code,
      .info = pMsg->info,
  };
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1141 1142
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
L
Liu Jicong 已提交
1143
}