tq.c 25.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
17
#include "tdbInt.h"
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19
int32_t tqInit() {
L
Liu Jicong 已提交
20 21 22 23 24 25
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
    if (old != 2) break;
  }

26 27 28 29 30 31
  if (old == 0) {
    tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
    if (tqMgmt.timer == NULL) {
      atomic_store_8(&tqMgmt.inited, 0);
      return -1;
    }
L
Liu Jicong 已提交
32
    atomic_store_8(&tqMgmt.inited, 1);
33
  }
L
Liu Jicong 已提交
34 35
  return 0;
}
L
Liu Jicong 已提交
36

37
void tqCleanUp() {
L
Liu Jicong 已提交
38 39 40 41 42 43 44 45 46 47
  int8_t old;
  while (1) {
    old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
    if (old != 2) break;
  }

  if (old == 1) {
    taosTmrCleanUp(tqMgmt.timer);
    atomic_store_8(&tqMgmt.inited, 0);
  }
48
}
L
Liu Jicong 已提交
49

50 51 52 53
int tqExecKeyCompare(const void* pKey1, int32_t kLen1, const void* pKey2, int32_t kLen2) {
  return strcmp(pKey1, pKey2);
}

L
Liu Jicong 已提交
54
int32_t tqStoreHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
L
Liu Jicong 已提交
55 56
  int32_t code;
  int32_t vlen;
L
Liu Jicong 已提交
57
  tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
L
Liu Jicong 已提交
58 59 60 61 62 63 64 65 66 67
  ASSERT(code == 0);

  void* buf = taosMemoryCalloc(1, vlen);
  if (buf == NULL) {
    ASSERT(0);
  }

  SEncoder encoder;
  tEncoderInit(&encoder, buf, vlen);

L
Liu Jicong 已提交
68
  if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
L
Liu Jicong 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
    ASSERT(0);
  }

  TXN txn;

  if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
    ASSERT(0);
  }

  if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
    ASSERT(0);
  }

  if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) {
    ASSERT(0);
  }

  if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
    ASSERT(0);
  }

  tEncoderClear(&encoder);
  taosMemoryFree(buf);
  return 0;
}

L
Liu Jicong 已提交
95
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
wafwerar's avatar
wafwerar 已提交
96
  STQ* pTq = taosMemoryMalloc(sizeof(STQ));
L
Liu Jicong 已提交
97
  if (pTq == NULL) {
L
Liu Jicong 已提交
98
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
99 100
    return NULL;
  }
H
Hongze Cheng 已提交
101
  pTq->path = strdup(path);
L
Liu Jicong 已提交
102
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
103
  pTq->pWal = pWal;
104

L
Liu Jicong 已提交
105
  pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
106

L
Liu Jicong 已提交
107 108
  pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);

L
Liu Jicong 已提交
109 110
  pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);

111 112 113 114
  if (tdbOpen(path, 16 * 1024, 1, &pTq->pMetaStore) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
115
  if (tdbTbOpen("handles", -1, -1, tqExecKeyCompare, pTq->pMetaStore, &pTq->pExecStore) < 0) {
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
    ASSERT(0);
  }

  TXN txn;

  if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
    ASSERT(0);
  }

  TBC* pCur;
  if (tdbTbcOpen(pTq->pExecStore, &pCur, &txn) < 0) {
    ASSERT(0);
  }

  void* pKey;
  int   kLen;
  void* pVal;
  int   vLen;

  tdbTbcMoveToFirst(pCur);
L
Liu Jicong 已提交
136
  SDecoder decoder;
L
Liu Jicong 已提交
137

138
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
L
Liu Jicong 已提交
139
    STqHandle handle;
L
Liu Jicong 已提交
140
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
L
Liu Jicong 已提交
141 142 143 144 145 146
    tDecodeSTqHandle(&decoder, &handle);
    handle.pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
    for (int32_t i = 0; i < 5; i++) {
      handle.execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
    }
    if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
147
      for (int32_t i = 0; i < 5; i++) {
L
Liu Jicong 已提交
148 149
        SReadHandle reader = {
            .reader = handle.execHandle.pExecReader[i],
L
Liu Jicong 已提交
150 151 152
            .meta = pTq->pVnode->pMeta,
            .pMsgCb = &pTq->pVnode->msgCb,
        };
L
Liu Jicong 已提交
153 154 155
        handle.execHandle.exec.execCol.task[i] =
            qCreateStreamExecTaskInfo(handle.execHandle.exec.execCol.qmsg, &reader);
        ASSERT(handle.execHandle.exec.execCol.task[i]);
L
Liu Jicong 已提交
156 157
      }
    } else {
L
Liu Jicong 已提交
158 159
      handle.execHandle.exec.execDb.pFilterOutTbUid =
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
L
Liu Jicong 已提交
160
    }
L
Liu Jicong 已提交
161
    taosHashPut(pTq->handles, pKey, kLen, &handle, sizeof(STqHandle));
162 163 164 165 166 167
  }

  if (tdbTxnClose(&txn) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
168 169
  return pTq;
}
L
Liu Jicong 已提交
170

L
Liu Jicong 已提交
171
void tqClose(STQ* pTq) {
H
Hongze Cheng 已提交
172
  if (pTq) {
wafwerar's avatar
wafwerar 已提交
173
    taosMemoryFreeClear(pTq->path);
L
Liu Jicong 已提交
174
    taosHashCleanup(pTq->handles);
L
Liu Jicong 已提交
175 176
    taosHashCleanup(pTq->pStreamTasks);
    taosHashCleanup(pTq->pushMgr);
177
    tdbClose(pTq->pMetaStore);
wafwerar's avatar
wafwerar 已提交
178
    taosMemoryFree(pTq);
H
Hongze Cheng 已提交
179
  }
L
Liu Jicong 已提交
180 181
  // TODO
}
L
Liu Jicong 已提交
182

L
Liu Jicong 已提交
183
#if 0
184 185 186 187 188 189
int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeCStr(pEncoder, pExec->subKey) < 0) return -1;
  if (tEncodeI64(pEncoder, pExec->consumerId) < 0) return -1;
  if (tEncodeI32(pEncoder, pExec->epoch) < 0) return -1;
  if (tEncodeI8(pEncoder, pExec->subType) < 0) return -1;
L
Liu Jicong 已提交
190 191 192 193
  /*if (tEncodeI8(pEncoder, pExec->withTbName) < 0) return -1;*/
  /*if (tEncodeI8(pEncoder, pExec->withSchema) < 0) return -1;*/
  /*if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1;*/
  if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
194 195 196 197 198 199 200 201 202 203 204 205
    if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1;
  }
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pExec->subKey) < 0) return -1;
  if (tDecodeI64(pDecoder, &pExec->consumerId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pExec->epoch) < 0) return -1;
  if (tDecodeI8(pDecoder, &pExec->subType) < 0) return -1;
L
Liu Jicong 已提交
206 207 208 209
  /*if (tDecodeI8(pDecoder, &pExec->withTbName) < 0) return -1;*/
  /*if (tDecodeI8(pDecoder, &pExec->withSchema) < 0) return -1;*/
  /*if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1;*/
  if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
210 211 212 213 214
    if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1;
  }
  tEndDecode(pDecoder);
  return 0;
}
L
Liu Jicong 已提交
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
#endif

int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
  if (tStartEncode(pEncoder) < 0) return -1;
  if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1;
  if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1;
  if (tEncodeI32(pEncoder, pHandle->epoch) < 0) return -1;
  if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1;
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
    if (tEncodeCStr(pEncoder, pHandle->execHandle.exec.execCol.qmsg) < 0) return -1;
  }
  tEndEncode(pEncoder);
  return pEncoder->pos;
}

int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
  if (tStartDecode(pDecoder) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1;
  if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pHandle->epoch) < 0) return -1;
  if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1;
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
    if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.exec.execCol.qmsg) < 0) return -1;
  }
  tEndDecode(pDecoder);
  return 0;
}

243
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
L
Liu Jicong 已提交
244
  void* pIter = NULL;
245
  while (1) {
L
Liu Jicong 已提交
246
    pIter = taosHashIterate(pTq->handles, pIter);
247
    if (pIter == NULL) break;
L
Liu Jicong 已提交
248 249 250 251 252 253 254
    STqHandle* pExec = (STqHandle*)pIter;
    if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
      for (int32_t i = 0; i < 5; i++) {
        int32_t code = qUpdateQualifiedTableId(pExec->execHandle.exec.execCol.task[i], tbUidList, isAdd);
        ASSERT(code == 0);
      }
    } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) {
L
Liu Jicong 已提交
255
      if (!isAdd) {
256 257 258
        int32_t sz = taosArrayGetSize(tbUidList);
        for (int32_t i = 0; i < sz; i++) {
          int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
259
          taosHashPut(pExec->execHandle.exec.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
260 261
        }
      }
L
Liu Jicong 已提交
262
    } else {
L
Liu Jicong 已提交
263
      // tq update id
264 265
    }
  }
266 267 268 269 270 271 272 273 274
  while (1) {
    pIter = taosHashIterate(pTq->pStreamTasks, pIter);
    if (pIter == NULL) break;
    SStreamTask* pTask = (SStreamTask*)pIter;
    if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
      int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd);
      ASSERT(code == 0);
    }
  }
275 276 277
  return 0;
}

dengyihao's avatar
dengyihao 已提交
278
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
L
Liu Jicong 已提交
279 280
  if (msgType != TDMT_VND_SUBMIT) return 0;
  void*       pIter = NULL;
L
Liu Jicong 已提交
281
  STqHandle*  pHandle = NULL;
L
Liu Jicong 已提交
282 283 284 285 286 287 288
  SSubmitReq* pReq = (SSubmitReq*)msg;
  int32_t     workerId = 4;
  int64_t     fetchOffset = ver;

  while (1) {
    pIter = taosHashIterate(pTq->pushMgr, pIter);
    if (pIter == NULL) break;
L
Liu Jicong 已提交
289
    pHandle = *(STqHandle**)pIter;
L
Liu Jicong 已提交
290

L
Liu Jicong 已提交
291
    taosWLockLatch(&pHandle->pushHandle.lock);
L
Liu Jicong 已提交
292

L
Liu Jicong 已提交
293
    SRpcMsg* pMsg = atomic_load_ptr(&pHandle->pushHandle.handle);
L
Liu Jicong 已提交
294 295 296
    ASSERT(pMsg);

    SMqDataBlkRsp rsp = {0};
L
Liu Jicong 已提交
297
    rsp.reqOffset = pHandle->pushHandle.reqOffset;
298
    rsp.blockData = taosArrayInit(0, sizeof(void*));
L
Liu Jicong 已提交
299 300
    rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));

L
Liu Jicong 已提交
301 302
    if (msgType == TDMT_VND_SUBMIT) {
      tqDataExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId);
L
Liu Jicong 已提交
303
    } else {
L
Liu Jicong 已提交
304
      // TODO
L
Liu Jicong 已提交
305 306 307 308
      ASSERT(0);
    }

    if (rsp.blockNum == 0) {
L
Liu Jicong 已提交
309
      taosWUnLockLatch(&pHandle->pushHandle.lock);
L
Liu Jicong 已提交
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
      continue;
    }

    ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
    ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);

    rsp.rspOffset = fetchOffset;

    int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
    void*   buf = rpcMallocCont(tlen);
    if (buf == NULL) {
      pMsg->code = -1;
      return -1;
    }

    ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
L
Liu Jicong 已提交
326 327
    ((SMqRspHead*)buf)->epoch = pHandle->pushHandle.epoch;
    ((SMqRspHead*)buf)->consumerId = pHandle->pushHandle.consumerId;
L
Liu Jicong 已提交
328 329 330

    void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
    tEncodeSMqDataBlkRsp(&abuf, &rsp);
dengyihao's avatar
dengyihao 已提交
331 332 333

    SRpcMsg resp = {.info = handleInfo, .pCont = buf, .contLen = tlen, .code = 0};
    tmsgSendRsp(&resp);
L
Liu Jicong 已提交
334

L
Liu Jicong 已提交
335 336
    atomic_store_ptr(&pHandle->pushHandle.handle, NULL);
    taosWUnLockLatch(&pHandle->pushHandle.lock);
L
Liu Jicong 已提交
337

H
Hongze Cheng 已提交
338
    tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
L
Liu Jicong 已提交
339
            TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum,
H
Hongze Cheng 已提交
340
            rsp.reqOffset, rsp.rspOffset);
L
Liu Jicong 已提交
341 342 343 344 345 346 347 348 349

    // TODO destroy
    taosArrayDestroy(rsp.blockData);
    taosArrayDestroy(rsp.blockDataLen);
  }

  return 0;
}

L
Liu Jicong 已提交
350
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
L
Liu Jicong 已提交
351 352
  if (msgType == TDMT_VND_SUBMIT) {
    if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0;
L
Liu Jicong 已提交
353

L
Liu Jicong 已提交
354
    if (tdUpdateExpireWindow(pTq->pVnode->pSma, msg, ver) != 0) {
L
Liu Jicong 已提交
355
      // TODO handle sma error
L
Liu Jicong 已提交
356 357 358 359 360 361
    }
    void* data = taosMemoryMalloc(msgLen);
    if (data == NULL) {
      return -1;
    }
    memcpy(data, msg, msgLen);
L
Liu Jicong 已提交
362

L
Liu Jicong 已提交
363
    tqProcessStreamTrigger(pTq, data);
C
Cary Xu 已提交
364
  }
L
Liu Jicong 已提交
365

L
Liu Jicong 已提交
366 367 368
  return 0;
}

L
Liu Jicong 已提交
369 370
int tqCommit(STQ* pTq) {
  // do nothing
L
Liu Jicong 已提交
371
  return 0;
L
Liu Jicong 已提交
372
}
L
Liu Jicong 已提交
373

L
fix  
Liu Jicong 已提交
374
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
L
Liu Jicong 已提交
375 376 377 378 379
  SMqPollReq* pReq = pMsg->pCont;
  int64_t     consumerId = pReq->consumerId;
  int64_t     waitTime = pReq->waitTime;
  int32_t     reqEpoch = pReq->epoch;
  int64_t     fetchOffset;
380

L
Liu Jicong 已提交
381
  // get offset to fetch message
L
Liu Jicong 已提交
382
  if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
L
Liu Jicong 已提交
383
    fetchOffset = walGetFirstVer(pTq->pWal);
L
Liu Jicong 已提交
384
  } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
L
Liu Jicong 已提交
385
    fetchOffset = walGetCommittedVer(pTq->pWal);
L
Liu Jicong 已提交
386 387 388 389
  } else {
    fetchOffset = pReq->currentOffset + 1;
  }

H
Hongze Cheng 已提交
390 391
  tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
          TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
L
fix  
Liu Jicong 已提交
392

L
Liu Jicong 已提交
393 394
  STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
  ASSERT(pHandle);
L
Liu Jicong 已提交
395

L
Liu Jicong 已提交
396
  int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
L
Liu Jicong 已提交
397
  while (consumerEpoch < reqEpoch) {
L
Liu Jicong 已提交
398
    consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
L
Liu Jicong 已提交
399 400
  }

401 402 403 404 405
  SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048);
  if (pHeadWithCkSum == NULL) {
    return -1;
  }

L
Liu Jicong 已提交
406
  walSetReaderCapacity(pHandle->pWalReader, 2048);
407

L
Liu Jicong 已提交
408
  SMqDataBlkRsp rsp = {0};
L
Liu Jicong 已提交
409
  rsp.reqOffset = pReq->currentOffset;
L
Liu Jicong 已提交
410

L
Liu Jicong 已提交
411
  rsp.blockData = taosArrayInit(0, sizeof(void*));
L
Liu Jicong 已提交
412
  rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
L
Liu Jicong 已提交
413
  rsp.blockSchema = taosArrayInit(0, sizeof(void*));
L
Liu Jicong 已提交
414
  rsp.blockTbName = taosArrayInit(0, sizeof(void*));
415

L
Liu Jicong 已提交
416 417 418 419 420 421 422
  rsp.withTbName = pReq->withTbName;
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
    rsp.withSchema = false;
    rsp.withTag = false;
  } else {
    rsp.withSchema = true;
    rsp.withTag = false;
423
  }
L
Liu Jicong 已提交
424 425 426 427 428 429

  /*int8_t withTbName = pExec->withTbName;*/
  /*if (pReq->withTbName != -1) {*/
  /*withTbName = pReq->withTbName;*/
  /*}*/
  /*rsp.withTbName = withTbName;*/
430

431
  while (1) {
L
Liu Jicong 已提交
432
    consumerEpoch = atomic_load_32(&pHandle->epoch);
L
Liu Jicong 已提交
433
    if (consumerEpoch > reqEpoch) {
H
Hongze Cheng 已提交
434 435
      tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
              consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
L
Liu Jicong 已提交
436 437
      break;
    }
L
Liu Jicong 已提交
438

L
Liu Jicong 已提交
439 440
    if (tqFetchLog(pTq, pHandle, &fetchOffset, &pHeadWithCkSum) < 0) {
      // TODO add push mgr
441 442 443 444 445 446
      break;
    }

    SWalReadHead* pHead = &pHeadWithCkSum->head;

#if 0
L
fix  
Liu Jicong 已提交
447
    SWalReadHead* pHead;
L
Liu Jicong 已提交
448
    if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) {
L
Liu Jicong 已提交
449 450
      // TODO: no more log, set timer to wait blocking time
      // if data inserted during waiting, launch query and
L
Liu Jicong 已提交
451
      // response to user
H
Hongze Cheng 已提交
452
      tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
H
refact  
Hongze Cheng 已提交
453
             TD_VID(pTq->pVnode), fetchOffset);
L
Liu Jicong 已提交
454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475

#if 0
      // add to pushMgr
      taosWLockLatch(&pExec->pushHandle.lock);

      pExec->pushHandle.consumerId = consumerId;
      pExec->pushHandle.epoch = reqEpoch;
      pExec->pushHandle.reqOffset = rsp.reqOffset;
      pExec->pushHandle.skipLogNum = rsp.skipLogNum;
      pExec->pushHandle.handle = pMsg;

      taosWUnLockLatch(&pExec->pushHandle.lock);

      // TODO add timer

      // TODO: the pointer will always be valid?
      taosHashPut(pTq->pushMgr, &consumerId, sizeof(int64_t), &pExec, sizeof(void*));
      taosArrayDestroy(rsp.blockData);
      taosArrayDestroy(rsp.blockDataLen);
      return 0;
#endif

476 477 478
    break;
  }
#endif
L
Liu Jicong 已提交
479

H
Hongze Cheng 已提交
480 481
    tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
            TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
L
Liu Jicong 已提交
482

L
fix  
Liu Jicong 已提交
483 484
    if (pHead->msgType == TDMT_VND_SUBMIT) {
      SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
L
Liu Jicong 已提交
485 486 487 488 489

      tqDataExec(pTq, &pHandle->execHandle, pCont, &rsp, workerId);
    } else {
      // TODO
      ASSERT(0);
L
Liu Jicong 已提交
490
    }
491

L
Liu Jicong 已提交
492 493
    // TODO batch optimization:
    // TODO continue scan until meeting batch requirement
L
Liu Jicong 已提交
494 495
    if (rsp.blockNum > 0 /* threshold */) {
      break;
L
Liu Jicong 已提交
496 497
    } else {
      fetchOffset++;
L
Liu Jicong 已提交
498
    }
L
Liu Jicong 已提交
499
  }
500

L
Liu Jicong 已提交
501
  taosMemoryFree(pHeadWithCkSum);
L
Liu Jicong 已提交
502

L
Liu Jicong 已提交
503 504
  ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
  ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
L
fix  
Liu Jicong 已提交
505

L
Liu Jicong 已提交
506
  rsp.rspOffset = fetchOffset;
507

L
Liu Jicong 已提交
508
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
509 510 511 512 513
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    pMsg->code = -1;
    return -1;
  }
L
Liu Jicong 已提交
514

L
Liu Jicong 已提交
515 516
  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
L
Liu Jicong 已提交
517
  ((SMqRspHead*)buf)->consumerId = consumerId;
518

L
Liu Jicong 已提交
519
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
520
  tEncodeSMqDataBlkRsp(&abuf, &rsp);
dengyihao's avatar
dengyihao 已提交
521

L
Liu Jicong 已提交
522 523 524 525 526 527
  SRpcMsg resp = {
      .info = pMsg->info,
      .pCont = buf,
      .contLen = tlen,
      .code = 0,
  };
dengyihao's avatar
dengyihao 已提交
528
  tmsgSendRsp(&resp);
L
Liu Jicong 已提交
529

H
Hongze Cheng 已提交
530 531
  tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
          TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset);
L
Liu Jicong 已提交
532

L
Liu Jicong 已提交
533
  // TODO wrap in destroy func
L
Liu Jicong 已提交
534 535
  taosArrayDestroy(rsp.blockData);
  taosArrayDestroy(rsp.blockDataLen);
L
Liu Jicong 已提交
536
  taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
L
Liu Jicong 已提交
537
  taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree);
L
Liu Jicong 已提交
538

539 540
  return 0;
}
L
Liu Jicong 已提交
541

L
Liu Jicong 已提交
542 543
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
L
Liu Jicong 已提交
544

L
Liu Jicong 已提交
545
  int32_t code = taosHashRemove(pTq->handles, pReq->subKey, strlen(pReq->subKey));
L
Liu Jicong 已提交
546
  ASSERT(code == 0);
547 548 549 550 551 552 553 554 555 556 557

  TXN txn;

  if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
    ASSERT(0);
  }

  if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
558 559 560
  if (tdbTbDelete(pTq->pExecStore, pReq->subKey, (int)strlen(pReq->subKey), &txn) < 0) {
    /*ASSERT(0);*/
  }
561 562 563 564 565

  if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
    ASSERT(0);
  }

L
Liu Jicong 已提交
566
  return 0;
L
Liu Jicong 已提交
567 568
}

L
Liu Jicong 已提交
569 570
// TODO: persist meta into tdb
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
571
  SMqRebVgReq req = {0};
L
Liu Jicong 已提交
572 573
  tDecodeSMqRebVgReq(msg, &req);
  // todo lock
L
Liu Jicong 已提交
574 575
  STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey));
  if (pHandle == NULL) {
L
Liu Jicong 已提交
576 577
    ASSERT(req.oldConsumerId == -1);
    ASSERT(req.newConsumerId != -1);
L
Liu Jicong 已提交
578 579
    STqHandle tqHandle = {0};
    pHandle = &tqHandle;
L
Liu Jicong 已提交
580 581
    /*taosInitRWLatch(&pExec->lock);*/

L
Liu Jicong 已提交
582 583 584
    memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
    pHandle->consumerId = req.newConsumerId;
    pHandle->epoch = -1;
L
Liu Jicong 已提交
585

L
Liu Jicong 已提交
586 587 588 589
    pHandle->execHandle.subType = req.subType;
    /*pExec->withTbName = req.withTbName;*/
    /*pExec->withSchema = req.withSchema;*/
    /*pExec->withTag = req.withTag;*/
L
Liu Jicong 已提交
590

L
Liu Jicong 已提交
591
    pHandle->execHandle.exec.execCol.qmsg = req.qmsg;
L
Liu Jicong 已提交
592
    req.qmsg = NULL;
L
Liu Jicong 已提交
593

L
Liu Jicong 已提交
594 595 596 597 598
    pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
    for (int32_t i = 0; i < 5; i++) {
      pHandle->execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
    }
    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
599
      for (int32_t i = 0; i < 5; i++) {
L
Liu Jicong 已提交
600
        SReadHandle handle = {
L
Liu Jicong 已提交
601
            .reader = pHandle->execHandle.pExecReader[i],
L
Liu Jicong 已提交
602
            .meta = pTq->pVnode->pMeta,
603
            .pMsgCb = &pTq->pVnode->msgCb,
L
Liu Jicong 已提交
604
        };
L
Liu Jicong 已提交
605 606 607
        pHandle->execHandle.exec.execCol.task[i] =
            qCreateStreamExecTaskInfo(pHandle->execHandle.exec.execCol.qmsg, &handle);
        ASSERT(pHandle->execHandle.exec.execCol.task[i]);
608
      }
L
Liu Jicong 已提交
609 610 611 612
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
      pHandle->execHandle.exec.execDb.pFilterOutTbUid =
          taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
    } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
L
Liu Jicong 已提交
613
    }
L
Liu Jicong 已提交
614
    taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
L
Liu Jicong 已提交
615
  } else {
L
Liu Jicong 已提交
616
    /*ASSERT(pExec->consumerId == req.oldConsumerId);*/
L
Liu Jicong 已提交
617
    // TODO handle qmsg and exec modification
L
Liu Jicong 已提交
618 619 620 621
    atomic_store_32(&pHandle->epoch, -1);
    atomic_store_64(&pHandle->consumerId, req.newConsumerId);
    atomic_add_fetch_32(&pHandle->epoch, 1);
  }
L
Liu Jicong 已提交
622

L
Liu Jicong 已提交
623 624
  if (tqStoreHandle(pTq, req.subKey, pHandle) < 0) {
    // TODO
L
Liu Jicong 已提交
625
  }
L
Liu Jicong 已提交
626
  return 0;
L
Liu Jicong 已提交
627
}
628

L
Liu Jicong 已提交
629 630 631 632 633
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
  const SArray* pRes = (const SArray*)data;
  SVnode*       pVnode = (SVnode*)vnode;

  ASSERT(pTask->tbSink.pTSchema);
L
Liu Jicong 已提交
634 635
  SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
                                     pTask->tbSink.stbFullName, pVnode->config.vgId);
L
Liu Jicong 已提交
636
  /*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
L
Liu Jicong 已提交
637
  // build write msg
L
Liu Jicong 已提交
638 639 640 641 642
  SRpcMsg msg = {
      .msgType = TDMT_VND_SUBMIT,
      .pCont = pReq,
      .contLen = ntohl(pReq->length),
  };
L
Liu Jicong 已提交
643

L
Liu Jicong 已提交
644
  ASSERT(tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) == 0);
L
Liu Jicong 已提交
645
}
L
Liu Jicong 已提交
646

L
Liu Jicong 已提交
647 648 649 650 651 652 653 654 655 656 657 658
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  if (tDecodeSStreamTask(&decoder, pTask) < 0) {
    ASSERT(0);
  }
  tDecoderClear(&decoder);

L
Liu Jicong 已提交
659 660 661 662 663 664 665 666 667 668 669 670
  pTask->status = TASK_STATUS__IDLE;
  pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
  pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;

  pTask->inputQ = taosOpenQueue();
  pTask->outputQ = taosOpenQueue();
  pTask->inputQAll = taosAllocateQall();
  pTask->outputQAll = taosAllocateQall();

  if (pTask->inputQ == NULL || pTask->outputQ == NULL || pTask->inputQAll == NULL || pTask->outputQAll == NULL)
    goto FAIL;

L
Liu Jicong 已提交
671
  // exec
L
Liu Jicong 已提交
672 673
  if (pTask->execType != TASK_EXEC__NONE) {
    // expand runners
L
Liu Jicong 已提交
674 675 676 677 678 679 680 681 682 683
    STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
    SReadHandle    handle = {
           .reader = pStreamReader,
           .meta = pTq->pVnode->pMeta,
           .pMsgCb = &pTq->pVnode->msgCb,
           .vnode = pTq->pVnode,
    };
    pTask->exec.inputHandle = pStreamReader;
    pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
    ASSERT(pTask->exec.executor);
L
Liu Jicong 已提交
684
  }
L
Liu Jicong 已提交
685 686

  // sink
L
Liu Jicong 已提交
687
  pTask->ahandle = pTq->pVnode;
L
Liu Jicong 已提交
688
  if (pTask->sinkType == TASK_SINK__SMA) {
L
Liu Jicong 已提交
689
    pTask->smaSink.smaSink = smaHandleRes;
L
Liu Jicong 已提交
690
  } else if (pTask->sinkType == TASK_SINK__TABLE) {
L
Liu Jicong 已提交
691 692 693
    pTask->tbSink.vnode = pTq->pVnode;
    pTask->tbSink.tbSinkFunc = tqTableSink;

L
Liu Jicong 已提交
694 695
    ASSERT(pTask->tbSink.pSchemaWrapper);
    ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
L
Liu Jicong 已提交
696

L
Liu Jicong 已提交
697 698 699
    pTask->tbSink.pTSchema =
        tdGetSTSChemaFromSSChema(&pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols);
    ASSERT(pTask->tbSink.pTSchema);
L
Liu Jicong 已提交
700
  }
L
Liu Jicong 已提交
701

L
Liu Jicong 已提交
702 703
  taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));

L
Liu Jicong 已提交
704 705
  return 0;
FAIL:
L
Liu Jicong 已提交
706 707 708 709 710
  if (pTask->inputQ) taosCloseQueue(pTask->inputQ);
  if (pTask->outputQ) taosCloseQueue(pTask->outputQ);
  if (pTask->inputQAll) taosFreeQall(pTask->inputQAll);
  if (pTask->outputQAll) taosFreeQall(pTask->outputQAll);
  if (pTask) taosMemoryFree(pTask);
L
Liu Jicong 已提交
711 712
  return -1;
}
L
Liu Jicong 已提交
713

L
Liu Jicong 已提交
714
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
L
Liu Jicong 已提交
715 716 717
  void*              pIter = NULL;
  bool               failed = false;
  SStreamDataSubmit* pSubmit = NULL;
L
Liu Jicong 已提交
718

L
Liu Jicong 已提交
719
  pSubmit = streamDataSubmitNew(pReq);
L
Liu Jicong 已提交
720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737
  if (pSubmit == NULL) {
    failed = true;
  }

  while (1) {
    pIter = taosHashIterate(pTq->pStreamTasks, pIter);
    if (pIter == NULL) break;
    SStreamTask* pTask = (SStreamTask*)pIter;
    if (pTask->inputType != STREAM_INPUT__DATA_SUBMIT) continue;

    int8_t inputStatus = atomic_load_8(&pTask->inputStatus);
    if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
      if (failed) {
        atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
        continue;
      }

      streamDataSubmitRefInc(pSubmit);
L
Liu Jicong 已提交
738 739 740
      SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
      memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit));
      taosWriteQitem(pTask->inputQ, pSubmitClone);
L
Liu Jicong 已提交
741 742 743

      int8_t execStatus = atomic_load_8(&pTask->status);
      if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
L
Liu Jicong 已提交
744 745 746 747 748 749 750 751 752 753 754 755
        SStreamTaskRunReq* pRunReq = taosMemoryMalloc(sizeof(SStreamTaskRunReq));
        if (pRunReq == NULL) continue;
        // TODO: do we need htonl?
        pRunReq->head.vgId = pTq->pVnode->config.vgId;
        pRunReq->streamId = pTask->streamId;
        pRunReq->taskId = pTask->taskId;
        SRpcMsg msg = {
            .msgType = TDMT_VND_TASK_RUN,
            .pCont = pRunReq,
            .contLen = sizeof(SStreamTaskRunReq),
        };
        tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &msg);
L
Liu Jicong 已提交
756 757 758 759 760 761 762
      }

    } else {
      // blocked or stopped, do nothing
    }
  }

L
Liu Jicong 已提交
763
  if (pSubmit) {
L
Liu Jicong 已提交
764
    streamDataSubmitRefDec(pSubmit);
L
Liu Jicong 已提交
765
    taosFreeQitem(pSubmit);
L
Liu Jicong 已提交
766
  }
L
Liu Jicong 已提交
767 768

  return failed ? -1 : 0;
L
Liu Jicong 已提交
769 770
}

L
Liu Jicong 已提交
771
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
772
  //
L
Liu Jicong 已提交
773 774 775 776 777 778 779 780 781 782 783
  SStreamTaskRunReq* pReq = pMsg->pCont;
  int32_t            taskId = pReq->taskId;
  SStreamTask*       pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
  streamTaskProcessRunReq(pTask, &pTq->pVnode->msgCb);
  return 0;
}

int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchReq* pReq = pMsg->pCont;
  int32_t             taskId = pReq->taskId;
  SStreamTask*        pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
784
  streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
L
Liu Jicong 已提交
785 786 787 788 789 790 791
  return 0;
}

int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRecoverReq* pReq = pMsg->pCont;
  int32_t                taskId = pReq->taskId;
  SStreamTask*           pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
792
  streamProcessRecoverReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
L
Liu Jicong 已提交
793 794 795 796 797 798 799
  return 0;
}

int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamDispatchRsp* pRsp = pMsg->pCont;
  int32_t             taskId = pRsp->taskId;
  SStreamTask*        pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
800
  streamProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp);
L
Liu Jicong 已提交
801 802 803 804 805 806 807
  return 0;
}

int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
  SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
  int32_t                taskId = pRsp->taskId;
  SStreamTask*           pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
808
  streamProcessRecoverRsp(pTask, pRsp);
L
Liu Jicong 已提交
809 810
  return 0;
}