tq.c 34.2 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
S
Shengliang Guan 已提交
17

L
Liu Jicong 已提交
18
int32_t tqInit() {
L
Liu Jicong 已提交
19 20 21 22 23 24
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

25 26 27 28 29 30
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
31 32 33
    if (streamInit() < 0) {
      return -1;
    }
L
Liu Jicong 已提交
34
    atomic_store_8(&tqMgmt.inited, 1);
35
  }
36

L
Liu Jicong 已提交
37 38
  return 0;
}
L
Liu Jicong 已提交
39

40
void tqCleanUp() {
L
Liu Jicong 已提交
41 42 43 44 45 46 47 48
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
L
Liu Jicong 已提交
49
    streamCleanUp();
L
Liu Jicong 已提交
50 51
    atomic_store_8(&tqMgmt.inited, 0);
  }
52
}
L
Liu Jicong 已提交
53

54 55 56 57 58 59 60 61 62 63 64 65 66 67
static void destroySTqHandle(void* data) {
  STqHandle* pData = (STqHandle*)data;
  qDestroyTask(pData->execHandle.task);
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
    tqCloseReader(pData->execHandle.pExecReader);
    walCloseReader(pData->pWalReader);
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE){
    walCloseReader(pData->pWalReader);
    tqCloseReader(pData->execHandle.pExecReader);
  }
}

L
Liu Jicong 已提交
68
STQ* tqOpen(const char* path, SVnode* pVnode) {
69
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
L
Liu Jicong 已提交
70
  if (pTq == NULL) {
L
Liu Jicong 已提交
71
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
72 73
    return NULL;
  }
H
Hongze Cheng 已提交
74
  pTq->path = strdup(path);
L
Liu Jicong 已提交
75
  pTq->pVnode = pVnode;
76

77
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
78

79 80
  taosHashSetFreeFp(pTq->pHandle, destroySTqHandle);

81
  pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
82

83
  pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
84

L
Liu Jicong 已提交
85
  if (tqMetaOpen(pTq) < 0) {
86 87 88
    ASSERT(0);
  }

89 90 91 92
  if (tqOffsetOpen(pTq) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
93 94 95 96 97
  pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask);
  if (pTq->pStreamMeta == NULL) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
98 99 100 101
  if (streamLoadTasks(pTq->pStreamMeta) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
102 103
  return pTq;
}
L
Liu Jicong 已提交
104

L
Liu Jicong 已提交
105
void tqClose(STQ* pTq) {
H
Hongze Cheng 已提交
106
  if (pTq) {
107
    tqOffsetClose(pTq->pOffsetStore);
108 109 110
    taosHashCleanup(pTq->pHandle);
    taosHashCleanup(pTq->pPushMgr);
    taosHashCleanup(pTq->pCheckInfo);
111
    taosMemoryFree(pTq->path);
L
Liu Jicong 已提交
112
    tqMetaClose(pTq);
L
Liu Jicong 已提交
113
    streamMetaClose(pTq->pStreamMeta);
wafwerar's avatar
wafwerar 已提交
114
    taosMemoryFree(pTq);
H
Hongze Cheng 已提交
115
  }
L
Liu Jicong 已提交
116
}
L
Liu Jicong 已提交
117

L
Liu Jicong 已提交
118
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
119 120 121 122 123 124 125
  int32_t len = 0;
  int32_t code = 0;
  tEncodeSize(tEncodeSMqMetaRsp, pRsp, len, code);
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
L
Liu Jicong 已提交
126 127 128 129 130 131 132 133 134 135
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_META_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
136 137 138 139 140

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
  tEncodeSMqMetaRsp(&encoder, pRsp);
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
141 142 143 144 145 146 147 148 149

  SRpcMsg resp = {
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
  tmsgSendRsp(&resp);

150 151
  tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, offset type:%d",
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
L
Liu Jicong 已提交
152 153 154 155

  return 0;
}

L
Liu Jicong 已提交
156 157 158 159
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) {
  ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
  ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);

L
Liu Jicong 已提交
160 161
  ASSERT(!pRsp->withSchema);
  ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
L
Liu Jicong 已提交
162

163 164 165 166 167 168
  if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
    if (pRsp->blockNum > 0) {
      ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
    } else {
      ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
    }
L
Liu Jicong 已提交
169 170
  }

wmmhello's avatar
wmmhello 已提交
171 172
  int32_t len = 0;
  int32_t code = 0;
L
Liu Jicong 已提交
173 174 175 176 177
  tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
L
Liu Jicong 已提交
178 179 180 181 182 183 184 185 186 187 188
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

wmmhello's avatar
wmmhello 已提交
189
  SEncoder encoder = {0};
L
Liu Jicong 已提交
190
  tEncoderInit(&encoder, abuf, len);
L
Liu Jicong 已提交
191
  tEncodeSMqDataRsp(&encoder, pRsp);
wmmhello's avatar
wmmhello 已提交
192
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
193 194

  SRpcMsg rsp = {
L
Liu Jicong 已提交
195 196 197 198 199
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
L
Liu Jicong 已提交
200
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
201

wmmhello's avatar
wmmhello 已提交
202 203
  char buf1[80] = {0};
  char buf2[80] = {0};
L
Liu Jicong 已提交
204 205
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);
S
Shengliang Guan 已提交
206
  tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
L
Liu Jicong 已提交
207
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
L
Liu Jicong 已提交
208 209 210 211

  return 0;
}

212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
  ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
  ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);

  if (pRsp->withSchema) {
    ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
  } else {
    ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
  }

  if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
    if (pRsp->blockNum > 0) {
      ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
    } else {
      ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
    }
  }

  int32_t len = 0;
  int32_t code = 0;
  tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
  ((SMqRspHead*)buf)->consumerId = pReq->consumerId;

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTaosxRsp(&encoder, pRsp);
  tEncoderClear(&encoder);

  SRpcMsg rsp = {
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
  tmsgSendRsp(&rsp);

  char buf1[80] = {0};
  char buf2[80] = {0};
  tFormatOffset(buf1, 80, &pRsp->reqOffset);
  tFormatOffset(buf2, 80, &pRsp->rspOffset);
  tqDebug("taosx rsp, vgId:%d, from consumer:%" PRId64
          ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
          TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);

  return 0;
}

272 273 274 275 276 277
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
         pLeft->val.version <= pRight->val.version;
}

int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
278 279 280 281 282 283 284 285 286
  STqOffset offset = {0};
  SDecoder  decoder;
  tDecoderInit(&decoder, msg, msgLen);
  if (tDecodeSTqOffset(&decoder, &offset) < 0) {
    ASSERT(0);
    return -1;
  }
  tDecoderClear(&decoder);

wmmhello's avatar
wmmhello 已提交
287
  if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
288 289
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
            offset.subKey, TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts);
L
Liu Jicong 已提交
290
  } else if (offset.val.type == TMQ_OFFSET__LOG) {
S
Shengliang Guan 已提交
291
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
L
Liu Jicong 已提交
292
            TD_VID(pTq->pVnode), offset.val.version);
293 294 295
    if (offset.val.version + 1 == version) {
      offset.val.version += 1;
    }
296 297 298
  } else {
    ASSERT(0);
  }
299 300 301 302 303
  STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
  if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) {
    return 0;
  }

304 305 306
  if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
    ASSERT(0);
    return -1;
307
  }
308 309

  if (offset.val.type == TMQ_OFFSET__LOG) {
310
    STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
L
Liu Jicong 已提交
311 312 313 314 315
    if (pHandle) {
      if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
        ASSERT(0);
        return -1;
      }
316 317 318
    }
  }

319 320
  // rsp

321 322
  /*}*/
  /*}*/
323 324 325 326

  return 0;
}

L
Liu Jicong 已提交
327
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
L
Liu Jicong 已提交
328 329
  void* pIter = NULL;
  while (1) {
330
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
331
    if (pIter == NULL) break;
332
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
L
Liu Jicong 已提交
333 334
    if (pCheck->ntbUid == tbUid) {
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
L
Liu Jicong 已提交
335
      for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
336 337
        int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
        if (forbidColId == colId) {
338
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
L
Liu Jicong 已提交
339 340 341 342 343 344 345 346
          return -1;
        }
      }
    }
  }
  return 0;
}

L
Liu Jicong 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL) {
    return -1;
  }

  pRsp->withTbName = pReq->withTbName;
  if (pRsp->withTbName) {
    pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
    if (pRsp->blockTbName == NULL) {
      // TODO free
      return -1;
    }
  }

  if (subType == TOPIC_SUB_TYPE__COLUMN) {
    pRsp->withSchema = false;
  } else {
    pRsp->withSchema = true;
    pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
    if (pRsp->blockSchema == NULL) {
      // TODO free
      return -1;
    }
  }
  return 0;
}

379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->withTbName = 1;
  pRsp->withSchema = 1;
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
395
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
396 397 398 399 400 401
  SMqPollReq*  pReq = pMsg->pCont;
  int64_t      consumerId = pReq->consumerId;
  int32_t      reqEpoch = pReq->epoch;
  int32_t      code = 0;
  STqOffsetVal reqOffset = pReq->reqOffset;
  STqOffsetVal fetchOffsetNew;
wmmhello's avatar
wmmhello 已提交
402
  SWalCkHead*  pCkHead = NULL;
L
Liu Jicong 已提交
403 404

  // 1.find handle
405
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
L
Liu Jicong 已提交
406 407
  /*ASSERT(pHandle);*/
  if (pHandle == NULL) {
S
Shengliang Guan 已提交
408 409
    tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId,
            TD_VID(pTq->pVnode), pReq->subKey);
L
Liu Jicong 已提交
410 411 412 413 414
    return -1;
  }

  // check rebalance
  if (pHandle->consumerId != consumerId) {
S
Shengliang Guan 已提交
415 416
    tqError("tmq poll: consumer handle mismatch for consumer:%" PRId64
            ", in vgId:%d, subkey %s, handle consumer id %" PRId64,
L
Liu Jicong 已提交
417
            consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId);
L
Liu Jicong 已提交
418
    terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
L
Liu Jicong 已提交
419 420 421 422 423 424 425 426 427
    return -1;
  }

  // update epoch if need
  int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
  while (consumerEpoch < reqEpoch) {
    consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
  }

L
Liu Jicong 已提交
428 429
  char buf[80];
  tFormatOffset(buf, 80, &reqOffset);
S
Shengliang Guan 已提交
430
  tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
L
Liu Jicong 已提交
431 432
          pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);

L
Liu Jicong 已提交
433 434 435 436 437 438 439
  // 2.reset offset if needed
  if (reqOffset.type > 0) {
    fetchOffsetNew = reqOffset;
  } else {
    STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
    if (pOffset != NULL) {
      fetchOffsetNew = pOffset->val;
L
Liu Jicong 已提交
440 441
      char formatBuf[80];
      tFormatOffset(formatBuf, 80, &fetchOffsetNew);
L
Liu Jicong 已提交
442 443
      tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey,
              TD_VID(pTq->pVnode), formatBuf);
L
Liu Jicong 已提交
444 445
    } else {
      if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
L
Liu Jicong 已提交
446 447
        if (pReq->useSnapshot) {
          if (pHandle->fetchMeta) {
wmmhello's avatar
wmmhello 已提交
448
            tqOffsetResetToMeta(&fetchOffsetNew, 0);
L
Liu Jicong 已提交
449
          } else {
450
            tqOffsetResetToData(&fetchOffsetNew, 0, 0);
L
Liu Jicong 已提交
451 452 453 454 455
          }
        } else {
          tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal));
        }
      } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
456 457 458
        SMqDataRsp dataRsp = {0};
        tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);

L
Liu Jicong 已提交
459
        tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
S
Shengliang Guan 已提交
460 461
        tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId,
                pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
L
Liu Jicong 已提交
462 463 464
        if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
          code = -1;
        }
L
Liu Jicong 已提交
465 466
        tDeleteSMqDataRsp(&dataRsp);
        return code;
L
Liu Jicong 已提交
467
      } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
468 469
        tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64
                " in vg %d, subkey %s, reset none failed",
L
Liu Jicong 已提交
470
                pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
L
Liu Jicong 已提交
471
        terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
472
        return -1;
L
Liu Jicong 已提交
473 474 475 476
      }
    }
  }

477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
    SMqDataRsp dataRsp = {0};
    tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
    tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);

    if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
      code = -1;
    }

    tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid:%ld, version:%ld",
            consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
            dataRsp.rspOffset.uid, dataRsp.rspOffset.version);

    tDeleteSMqDataRsp(&dataRsp);
    return code;
  }

  // for taosx
  ASSERT(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN);

  SMqMetaRsp metaRsp = {0};

  STaosxRsp taosxRsp = {0};
  tqInitTaosxRsp(&taosxRsp, pReq);

  if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
    tqScan(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew);
L
Liu Jicong 已提交
504

L
Liu Jicong 已提交
505
    if (metaRsp.metaRspLen > 0) {
wmmhello's avatar
wmmhello 已提交
506 507 508
      if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
        code = -1;
      }
L
Liu Jicong 已提交
509 510 511
      tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send meta offset type:%d,uid:%ld,version:%ld", consumerId,
              pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
              metaRsp.rspOffset.version);
wmmhello's avatar
wmmhello 已提交
512
      taosMemoryFree(metaRsp.metaRsp);
513 514
      tDeleteSTaosxRsp(&taosxRsp);
      return code;
wmmhello's avatar
wmmhello 已提交
515 516
    }

517 518
    if (taosxRsp.blockNum > 0) {
      if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
519 520
        code = -1;
      }
521 522
      tDeleteSTaosxRsp(&taosxRsp);
      return code;
L
Liu Jicong 已提交
523
    } else {
524
      fetchOffsetNew = taosxRsp.rspOffset;
525
    }
wmmhello's avatar
wmmhello 已提交
526

527 528 529
    tqDebug("taosx poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld",
            consumerId, pHandle->subKey, TD_VID(pTq->pVnode), taosxRsp.blockNum, taosxRsp.rspOffset.type,
            taosxRsp.rspOffset.uid, taosxRsp.rspOffset.version);
530 531
  }

532
  if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
wmmhello's avatar
wmmhello 已提交
533 534 535
    int64_t fetchVer = fetchOffsetNew.version + 1;
    pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
    if (pCkHead == NULL) {
536
      tDeleteSTaosxRsp(&taosxRsp);
537
      return -1;
538 539
    }

wmmhello's avatar
wmmhello 已提交
540 541 542 543 544 545
    walSetReaderCapacity(pHandle->pWalReader, 2048);

    while (1) {
      consumerEpoch = atomic_load_32(&pHandle->epoch);
      if (consumerEpoch > reqEpoch) {
        tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64
L
Liu Jicong 已提交
546
               ", found new consumer epoch %d, discard req epoch %d",
wmmhello's avatar
wmmhello 已提交
547 548 549 550 551
               consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
        break;
      }

      if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
552 553
        tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
        if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
554 555
          code = -1;
        }
556 557 558
        tDeleteSTaosxRsp(&taosxRsp);
        if (pCkHead) taosMemoryFree(pCkHead);
        return code;
wmmhello's avatar
wmmhello 已提交
559 560 561
      }

      SWalCont* pHead = &pCkHead->head;
wmmhello's avatar
wmmhello 已提交
562

wmmhello's avatar
wmmhello 已提交
563 564 565 566 567 568
      tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
              pReq->epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);

      if (pHead->msgType == TDMT_VND_SUBMIT) {
        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;

569
        if (tqTaosxScanLog(pTq, pHandle, pCont, &taosxRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
570 571 572 573
          /*ASSERT(0);*/
        }
        // TODO batch optimization:
        // TODO continue scan until meeting batch requirement
574 575 576
        if (taosxRsp.blockNum > 0 /* threshold */) {
          tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
          if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
wmmhello's avatar
wmmhello 已提交
577 578
            code = -1;
          }
579 580 581
          tDeleteSTaosxRsp(&taosxRsp);
          if (pCkHead) taosMemoryFree(pCkHead);
          return code;
wmmhello's avatar
wmmhello 已提交
582 583 584 585 586 587 588 589 590 591 592 593 594 595
        } else {
          fetchVer++;
        }

      } else {
        ASSERT(pHandle->fetchMeta);
        ASSERT(IS_META_MSG(pHead->msgType));
        tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
        tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
        metaRsp.resMsgType = pHead->msgType;
        metaRsp.metaRspLen = pHead->bodyLen;
        metaRsp.metaRsp = pHead->body;
        if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
          code = -1;
596
          taosMemoryFree(pCkHead);
597
          tDeleteSTaosxRsp(&taosxRsp);
598
          return code;
wmmhello's avatar
wmmhello 已提交
599 600
        }
        code = 0;
601
        if (pCkHead) taosMemoryFree(pCkHead);
602
        tDeleteSTaosxRsp(&taosxRsp);
603
        return code;
wmmhello's avatar
wmmhello 已提交
604 605 606
      }
    }
  }
607
  tDeleteSTaosxRsp(&taosxRsp);
608
  return 0;
L
Liu Jicong 已提交
609 610
}

611
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
612
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
L
Liu Jicong 已提交
613

614
  int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
L
Liu Jicong 已提交
615
  ASSERT(code == 0);
616

L
Liu Jicong 已提交
617 618
  tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);

L
Liu Jicong 已提交
619
  if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
620 621
    ASSERT(0);
  }
L
Liu Jicong 已提交
622
  return 0;
L
Liu Jicong 已提交
623 624
}

625 626 627
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
  STqCheckInfo info = {0};
  SDecoder     decoder;
L
Liu Jicong 已提交
628
  tDecoderInit(&decoder, msg, msgLen);
629
  if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
L
Liu Jicong 已提交
630 631 632 633
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  tDecoderClear(&decoder);
634 635 636 637 638
  if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
L
Liu Jicong 已提交
639 640 641 642 643 644
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

645 646 647 648 649 650 651 652 653 654 655 656 657
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
658
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
659 660
  tDecodeSMqRebVgReq(msg, &req);
  // todo lock
661
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
L
Liu Jicong 已提交
662
  if (pHandle == NULL) {
L
Liu Jicong 已提交
663 664 665 666
    if (req.oldConsumerId != -1) {
      tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey,
              req.newConsumerId, req.oldConsumerId);
    }
L
Liu Jicong 已提交
667
    ASSERT(req.newConsumerId != -1);
L
Liu Jicong 已提交
668 669
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
670 671
    /*taosInitRWLatch(&pExec->lock);*/

L
Liu Jicong 已提交
672 673 674
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
675

L
Liu Jicong 已提交
676
    pHandle->execHandle.subType = req.subType;
L
Liu Jicong 已提交
677
    pHandle->fetchMeta = req.withMeta;
wmmhello's avatar
wmmhello 已提交
678

679 680 681 682
    // TODO version should be assigned and refed during preprocess
    SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
    if (pRef == NULL) {
      ASSERT(0);
L
Liu Jicong 已提交
683
      return -1;
684 685 686
    }
    int64_t ver = pRef->refVer;
    pHandle->pRef = pRef;
L
Liu Jicong 已提交
687

688 689 690 691 692 693 694
    SReadHandle handle = {
        .meta = pTq->pVnode->pMeta,
        .vnode = pTq->pVnode,
        .initTableReader = true,
        .initTqReader = true,
        .version = ver,
    };
wmmhello's avatar
wmmhello 已提交
695
    pHandle->snapshotVer = ver;
696

L
Liu Jicong 已提交
697
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
698
      pHandle->execHandle.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
699
      req.qmsg = NULL;
700 701

      pHandle->execHandle.task =
wmmhello's avatar
wmmhello 已提交
702
          qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols,
L
Liu Jicong 已提交
703
                                   &pHandle->execHandle.pSchemaWrapper);
704
      ASSERT(pHandle->execHandle.task);
L
Liu Jicong 已提交
705
      void* scanner = NULL;
706
      qExtractStreamScanner(pHandle->execHandle.task, &scanner);
L
Liu Jicong 已提交
707 708 709
      ASSERT(scanner);
      pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
      ASSERT(pHandle->execHandle.pExecReader);
L
Liu Jicong 已提交
710
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
wmmhello's avatar
wmmhello 已提交
711
      pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
L
Liu Jicong 已提交
712
      pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
L
Liu Jicong 已提交
713
      pHandle->execHandle.execDb.pFilterOutTbUid =
L
Liu Jicong 已提交
714
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
715 716
      buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
717

718
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
L
Liu Jicong 已提交
719
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
wmmhello's avatar
wmmhello 已提交
720 721 722 723
      pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);

      pHandle->execHandle.execTb.suid = req.suid;

L
Liu Jicong 已提交
724
      SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
H
Hongze Cheng 已提交
725
      vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
726
      tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid);
L
Liu Jicong 已提交
727 728
      for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
        int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
S
Shengliang Guan 已提交
729
        tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
L
Liu Jicong 已提交
730
      }
L
Liu Jicong 已提交
731 732
      pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
      tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
L
Liu Jicong 已提交
733
      taosArrayDestroy(tbUidList);
wmmhello's avatar
wmmhello 已提交
734

L
Liu Jicong 已提交
735 736 737
      buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
                       (SSnapContext**)(&handle.sContext));
      pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
L
Liu Jicong 已提交
738
    }
739
    taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
S
Shengliang Guan 已提交
740
    tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
L
Liu Jicong 已提交
741 742
    if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
      // TODO
P
plum-lihui 已提交
743
      ASSERT(0);
L
Liu Jicong 已提交
744
    }
L
Liu Jicong 已提交
745
  } else {
L
Liu Jicong 已提交
746
    /*ASSERT(pExec->consumerId == req.oldConsumerId);*/
L
Liu Jicong 已提交
747
    // TODO handle qmsg and exec modification
L
Liu Jicong 已提交
748 749 750
    atomic_store_32(&pHandle->epoch, -1);
    atomic_store_64(&pHandle->consumerId, req.newConsumerId);
    atomic_add_fetch_32(&pHandle->epoch, 1);
L
Liu Jicong 已提交
751 752 753
    if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
      // TODO
    }
L
Liu Jicong 已提交
754
  }
L
Liu Jicong 已提交
755

L
Liu Jicong 已提交
756
  return 0;
L
Liu Jicong 已提交
757
}
758

L
Liu Jicong 已提交
759
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
760
  if (pTask->taskLevel == TASK_LEVEL__AGG) {
L
Liu Jicong 已提交
761 762
    ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
  }
L
Liu Jicong 已提交
763

L
Liu Jicong 已提交
764
  pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
L
Liu Jicong 已提交
765 766 767

  pTask->inputQueue = streamQueueOpen();
  pTask->outputQueue = streamQueueOpen();
L
Liu Jicong 已提交
768 769

  if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
L
Liu Jicong 已提交
770
    return -1;
L
Liu Jicong 已提交
771 772
  }

L
Liu Jicong 已提交
773 774 775
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;

776 777
  pTask->pMsgCb = &pTq->pVnode->msgCb;

778 779
  // expand executor
  if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
L
Liu Jicong 已提交
780
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false);
781 782 783 784
    if (pTask->pState == NULL) {
      return -1;
    }

785 786 787 788
    SReadHandle handle = {
        .meta = pTq->pVnode->pMeta,
        .vnode = pTq->pVnode,
        .initTqReader = 1,
789
        .pStateBackend = pTask->pState,
790 791 792 793
    };
    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
    ASSERT(pTask->exec.executor);
  } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
L
Liu Jicong 已提交
794
    pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false);
795 796 797
    if (pTask->pState == NULL) {
      return -1;
    }
798 799 800
    SReadHandle mgHandle = {
        .vnode = NULL,
        .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
801
        .pStateBackend = pTask->pState,
802 803
    };
    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
L
Liu Jicong 已提交
804
    ASSERT(pTask->exec.executor);
L
Liu Jicong 已提交
805
  }
L
Liu Jicong 已提交
806 807

  // sink
L
Liu Jicong 已提交
808
  /*pTask->ahandle = pTq->pVnode;*/
809
  if (pTask->outputType == TASK_OUTPUT__SMA) {
L
Liu Jicong 已提交
810
    pTask->smaSink.vnode = pTq->pVnode;
L
Liu Jicong 已提交
811
    pTask->smaSink.smaSink = smaHandleRes;
812
  } else if (pTask->outputType == TASK_OUTPUT__TABLE) {
L
Liu Jicong 已提交
813 814 815
    pTask->tbSink.vnode = pTq->pVnode;
    pTask->tbSink.tbSinkFunc = tqTableSink;

L
Liu Jicong 已提交
816 817
    ASSERT(pTask->tbSink.pSchemaWrapper);
    ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
L
Liu Jicong 已提交
818

L
Liu Jicong 已提交
819
    pTask->tbSink.pTSchema =
C
Cary Xu 已提交
820
        tdGetSTSChemaFromSSChema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, 1);
L
Liu Jicong 已提交
821
    ASSERT(pTask->tbSink.pTSchema);
L
Liu Jicong 已提交
822
  }
823 824 825

  streamSetupTrigger(pTask);

L
Liu Jicong 已提交
826
  tqInfo("expand stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
827
         pTask->selfChildId);
L
Liu Jicong 已提交
828
  return 0;
L
Liu Jicong 已提交
829
}
L
Liu Jicong 已提交
830

831
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
832
  //
833
  return streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
L
Liu Jicong 已提交
834
}
L
Liu Jicong 已提交
835

L
Liu Jicong 已提交
836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
  bool        failed = false;
  SDecoder*   pCoder = &(SDecoder){0};
  SDeleteRes* pRes = &(SDeleteRes){0};

  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
  if (pRes->uidList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    failed = true;
  }

  tDecoderInit(pCoder, pReq, len);
  tDecodeDeleteRes(pCoder, pRes);
  tDecoderClear(pCoder);

  int32_t sz = taosArrayGetSize(pRes->uidList);
L
Liu Jicong 已提交
852
  if (sz == 0 || pRes->affectedRows == 0) {
L
Liu Jicong 已提交
853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876
    taosArrayDestroy(pRes->uidList);
    return 0;
  }
  SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
  blockDataEnsureCapacity(pDelBlock, sz);
  pDelBlock->info.rows = sz;
  pDelBlock->info.version = ver;

  for (int32_t i = 0; i < sz; i++) {
    // start key column
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
    colDataAppend(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
    colDataAppend(pEndCol, i, (const char*)&pRes->ekey, false);
    // uid column
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
    int64_t*         pUid = taosArrayGet(pRes->uidList, i);
    colDataAppend(pUidCol, i, (const char*)pUid, false);

    colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
    colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
    colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
  }

L
Liu Jicong 已提交
877 878
  taosArrayDestroy(pRes->uidList);

L
Liu Jicong 已提交
879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
    if (pIter == NULL) break;
    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;

    qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);

    SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
    pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
    pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
    SSDataBlock block = {0};
    assignOneDataBlock(&block, pDelBlock);
    block.info.type = STREAM_DELETE_DATA;
    taosArrayPush(pStreamBlock->blocks, &block);

    if (!failed) {
      if (streamTaskInput(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
        qError("stream task input del failed, task id %d", pTask->taskId);
        continue;
      }

      if (streamSchedExec(pTask) < 0) {
        qError("stream task launch failed, task id %d", pTask->taskId);
        continue;
      }
    } else {
      streamTaskInputFail(pTask);
    }
  }
L
Liu Jicong 已提交
910
  blockDataDestroy(pDelBlock);
L
Liu Jicong 已提交
911 912 913 914 915

  return 0;
}

int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
L
Liu Jicong 已提交
916 917 918
  void*              pIter = NULL;
  bool               failed = false;
  SStreamDataSubmit* pSubmit = NULL;
L
Liu Jicong 已提交
919

L
Liu Jicong 已提交
920
  pSubmit = streamDataSubmitNew(pReq);
L
Liu Jicong 已提交
921
  if (pSubmit == NULL) {
L
Liu Jicong 已提交
922 923
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    qError("failed to create data submit for stream since out of memory");
L
Liu Jicong 已提交
924 925 926 927
    failed = true;
  }

  while (1) {
L
Liu Jicong 已提交
928
    pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
L
Liu Jicong 已提交
929
    if (pIter == NULL) break;
930
    SStreamTask* pTask = *(SStreamTask**)pIter;
931
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
L
Liu Jicong 已提交
932

S
Shengliang Guan 已提交
933
    qDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);
L
Liu Jicong 已提交
934

L
Liu Jicong 已提交
935 936
    if (!failed) {
      if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
L
Liu Jicong 已提交
937
        qError("stream task input failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
938 939 940
        continue;
      }

L
Liu Jicong 已提交
941
      if (streamSchedExec(pTask) < 0) {
L
Liu Jicong 已提交
942
        qError("stream task launch failed, task id %d", pTask->taskId);
L
Liu Jicong 已提交
943 944
        continue;
      }
L
Liu Jicong 已提交
945
    } else {
L
Liu Jicong 已提交
946
      streamTaskInputFail(pTask);
L
Liu Jicong 已提交
947 948 949
    }
  }

L
Liu Jicong 已提交
950
  if (pSubmit) {
L
Liu Jicong 已提交
951
    streamDataSubmitRefDec(pSubmit);
L
Liu Jicong 已提交
952
    taosFreeQitem(pSubmit);
L
Liu Jicong 已提交
953
  }
L
Liu Jicong 已提交
954 955

  return failed ? -1 : 0;
L
Liu Jicong 已提交
956 957
}

L
Liu Jicong 已提交
958
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
959
  //
L
Liu Jicong 已提交
960 961
  SStreamTaskRunReq* pReq = pMsg->pCont;
  int32_t            taskId = pReq->taskId;
L
Liu Jicong 已提交
962 963 964
  SStreamTask*       pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
    streamProcessRunReq(pTask);
L
Liu Jicong 已提交
965
    return 0;
966 967
  } else {
    return -1;
L
Liu Jicong 已提交
968
  }
L
Liu Jicong 已提交
969 970
}

L
Liu Jicong 已提交
971
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
L
Liu Jicong 已提交
972
  ASSERT(0);
973 974 975 976 977
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamDispatchReq req;
  SDecoder           decoder;
L
Liu Jicong 已提交
978
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
979
  tDecodeStreamDispatchReq(&decoder, &req);
L
Liu Jicong 已提交
980 981 982 983
  int32_t taskId = req.taskId;

  SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
984 985 986 987
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
988
    streamProcessDispatchReq(pTask, &req, &rsp, exec);
L
Liu Jicong 已提交
989
    return 0;
990 991
  } else {
    return -1;
L
Liu Jicong 已提交
992
  }
L
Liu Jicong 已提交
993 994
}

L
Liu Jicong 已提交
995
#if 0
L
Liu Jicong 已提交
996 997 998
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRecoverReq* pReq = pMsg->pCont;
  int32_t                taskId = pReq->taskId;
L
Liu Jicong 已提交
999 1000 1001
  SStreamTask*           pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
    streamProcessRecoverReq(pTask, pReq, pMsg);
L
Liu Jicong 已提交
1002
    return 0;
1003 1004
  } else {
    return -1;
L
Liu Jicong 已提交
1005
  }
L
Liu Jicong 已提交
1006 1007
}

L
Liu Jicong 已提交
1008 1009 1010 1011 1012
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
  int32_t                taskId = pRsp->rspTaskId;

  SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1013
  if (pTask) {
L
Liu Jicong 已提交
1014
    streamProcessRecoverRsp(pTask, pRsp);
L
Liu Jicong 已提交
1015
    return 0;
1016 1017
  } else {
    return -1;
L
Liu Jicong 已提交
1018
  }
L
Liu Jicong 已提交
1019
}
L
Liu Jicong 已提交
1020
#endif
L
Liu Jicong 已提交
1021

L
Liu Jicong 已提交
1022 1023 1024 1025
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
  int32_t             taskId = pRsp->taskId;
  SStreamTask*        pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
L
Liu Jicong 已提交
1026
  if (pTask) {
L
Liu Jicong 已提交
1027
    streamProcessDispatchRsp(pTask, pRsp);
L
Liu Jicong 已提交
1028
    return 0;
1029 1030
  } else {
    return -1;
L
Liu Jicong 已提交
1031
  }
L
Liu Jicong 已提交
1032
}
L
Liu Jicong 已提交
1033

1034
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
1035
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
L
Liu Jicong 已提交
1036

L
Liu Jicong 已提交
1037
  return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
L
Liu Jicong 已提交
1038
}
L
Liu Jicong 已提交
1039 1040 1041 1042 1043 1044 1045 1046 1047

int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
  char*              msgStr = pMsg->pCont;
  char*              msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t            msgLen = pMsg->contLen - sizeof(SMsgHead);
  SStreamRetrieveReq req;
  SDecoder           decoder;
  tDecoderInit(&decoder, msgBody, msgLen);
  tDecodeStreamRetrieveReq(&decoder, &req);
L
Liu Jicong 已提交
1048 1049 1050
  int32_t      taskId = req.dstTaskId;
  SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
L
Liu Jicong 已提交
1051 1052 1053 1054
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1055 1056
    streamProcessRetrieveReq(pTask, &req, &rsp);
    return 0;
L
Liu Jicong 已提交
1057 1058
  } else {
    return -1;
L
Liu Jicong 已提交
1059 1060 1061 1062 1063 1064 1065
  }
}

int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
  //
  return 0;
}
L
Liu Jicong 已提交
1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078

void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
  STQ*    pTq = pVnode->pTq;
  char*   msgStr = pMsg->pCont;
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
  int32_t code = 0;

  SStreamDispatchReq req;
  SDecoder           decoder;
  tDecoderInit(&decoder, msgBody, msgLen);
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
    code = TSDB_CODE_MSG_DECODE_ERROR;
L
Liu Jicong 已提交
1079
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1080 1081
    goto FAIL;
  }
L
Liu Jicong 已提交
1082
  tDecoderClear(&decoder);
L
Liu Jicong 已提交
1083

L
Liu Jicong 已提交
1084
  int32_t taskId = req.taskId;
L
Liu Jicong 已提交
1085

L
Liu Jicong 已提交
1086 1087
  SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
  if (pTask) {
L
Liu Jicong 已提交
1088 1089 1090 1091
    SRpcMsg rsp = {
        .info = pMsg->info,
        .code = 0,
    };
L
Liu Jicong 已提交
1092
    streamProcessDispatchReq(pTask, &req, &rsp, false);
L
Liu Jicong 已提交
1093 1094
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
L
Liu Jicong 已提交
1095 1096
    return;
  }
L
Liu Jicong 已提交
1097

L
Liu Jicong 已提交
1098 1099 1100 1101 1102 1103 1104
FAIL:
  if (pMsg->info.handle == NULL) return;
  SRpcMsg rsp = {
      .code = code,
      .info = pMsg->info,
  };
  tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
1105 1106
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
L
Liu Jicong 已提交
1107
}