tq.c 19.8 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "tcompare.h"
L
Liu Jicong 已提交
17
#include "tdatablock.h"
L
Liu Jicong 已提交
18
#include "tqInt.h"
L
Liu Jicong 已提交
19
#include "tqMetaStore.h"
L
Liu Jicong 已提交
20
#include "tstream.h"
S
Shengliang Guan 已提交
21

L
Liu Jicong 已提交
22
int32_t tqInit() { return tqPushMgrInit(); }
L
Liu Jicong 已提交
23

L
Liu Jicong 已提交
24
void tqCleanUp() { tqPushMgrCleanUp(); }
L
Liu Jicong 已提交
25

L
Liu Jicong 已提交
26 27
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig,
            SMemAllocatorFactory* allocFac) {
wafwerar's avatar
wafwerar 已提交
28
  STQ* pTq = taosMemoryMalloc(sizeof(STQ));
L
Liu Jicong 已提交
29
  if (pTq == NULL) {
L
Liu Jicong 已提交
30
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
31 32
    return NULL;
  }
H
Hongze Cheng 已提交
33
  pTq->path = strdup(path);
L
Liu Jicong 已提交
34
  pTq->tqConfig = tqConfig;
L
Liu Jicong 已提交
35
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
36
  pTq->pWal = pWal;
L
Liu Jicong 已提交
37
  pTq->pVnodeMeta = pVnodeMeta;
L
Liu Jicong 已提交
38
#if 0
L
Liu Jicong 已提交
39
  pTq->tqMemRef.pAllocatorFactory = allocFac;
L
Liu Jicong 已提交
40 41
  pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
  if (pTq->tqMemRef.pAllocator == NULL) {
L
Liu Jicong 已提交
42
    // TODO: error code of buffer pool
L
Liu Jicong 已提交
43
  }
L
Liu Jicong 已提交
44
#endif
L
Liu Jicong 已提交
45 46
  pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
                            (FTqDelete)taosMemoryFree, 0);
L
Liu Jicong 已提交
47
  if (pTq->tqMeta == NULL) {
wafwerar's avatar
wafwerar 已提交
48
    taosMemoryFree(pTq);
L
Liu Jicong 已提交
49 50 51
#if 0
    allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator);
#endif
L
Liu Jicong 已提交
52 53
    return NULL;
  }
L
Liu Jicong 已提交
54

L
Liu Jicong 已提交
55
#if 0
L
Liu Jicong 已提交
56 57 58
  pTq->tqPushMgr = tqPushMgrOpen();
  if (pTq->tqPushMgr == NULL) {
    // free store
wafwerar's avatar
wafwerar 已提交
59
    taosMemoryFree(pTq);
L
Liu Jicong 已提交
60 61
    return NULL;
  }
L
Liu Jicong 已提交
62
#endif
L
Liu Jicong 已提交
63

L
Liu Jicong 已提交
64 65
  pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);

L
Liu Jicong 已提交
66 67
  return pTq;
}
L
Liu Jicong 已提交
68

L
Liu Jicong 已提交
69
void tqClose(STQ* pTq) {
H
Hongze Cheng 已提交
70
  if (pTq) {
wafwerar's avatar
wafwerar 已提交
71 72
    taosMemoryFreeClear(pTq->path);
    taosMemoryFree(pTq);
H
Hongze Cheng 已提交
73
  }
L
Liu Jicong 已提交
74 75
  // TODO
}
L
Liu Jicong 已提交
76

L
Liu Jicong 已提交
77
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version) {
L
Liu Jicong 已提交
78
  if (msgType != TDMT_VND_SUBMIT) return 0;
wafwerar's avatar
wafwerar 已提交
79
  void* data = taosMemoryMalloc(msgLen);
L
Liu Jicong 已提交
80 81
  if (data == NULL) {
    return -1;
L
Liu Jicong 已提交
82
  }
L
Liu Jicong 已提交
83 84 85 86 87 88 89
  memcpy(data, msg, msgLen);
  SRpcMsg req = {
      .msgType = TDMT_VND_STREAM_TRIGGER,
      .pCont = data,
      .contLen = msgLen,
  };
  tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req);
L
Liu Jicong 已提交
90 91

#if 0
L
Liu Jicong 已提交
92 93 94 95 96 97
  void* pIter = taosHashIterate(pTq->tqPushMgr->pHash, NULL);
  while (pIter != NULL) {
    STqPusher* pusher = *(STqPusher**)pIter;
    if (pusher->type == TQ_PUSHER_TYPE__STREAM) {
      STqStreamPusher* streamPusher = (STqStreamPusher*)pusher;
      // repack
wafwerar's avatar
wafwerar 已提交
98
      STqStreamToken* token = taosMemoryMalloc(sizeof(STqStreamToken));
L
Liu Jicong 已提交
99 100 101 102 103 104 105 106 107 108 109 110
      if (token == NULL) {
        taosHashCancelIterate(pTq->tqPushMgr->pHash, pIter);
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return -1;
      }
      token->type = TQ_STREAM_TOKEN__DATA;
      token->data = msg;
      // set input
      // exec
    }
    // send msg to ep
  }
L
Liu Jicong 已提交
111 112
  // iterate hash
  // process all msg
L
fix  
Liu Jicong 已提交
113 114
  // if waiting
  // memcpy and send msg to fetch thread
L
Liu Jicong 已提交
115 116 117 118
  // TODO: add reference
  // if handle waiting, launch query and response to consumer
  //
  // if no waiting handle, return
L
Liu Jicong 已提交
119
#endif
L
Liu Jicong 已提交
120 121 122
  return 0;
}

L
Liu Jicong 已提交
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
int tqCommit(STQ* pTq) { return tqStorePersist(pTq->tqMeta); }

int32_t tqGetTopicHandleSize(const STqTopic* pTopic) {
  return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->logicalPlan) + strlen(pTopic->physicalPlan) +
         strlen(pTopic->qmsg) + sizeof(int64_t) * 3;
}

int32_t tqGetConsumerHandleSize(const STqConsumer* pConsumer) {
  int     num = taosArrayGetSize(pConsumer->topics);
  int32_t sz = 0;
  for (int i = 0; i < num; i++) {
    STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
    sz += tqGetTopicHandleSize(pTopic);
  }
  return sz;
}

static FORCE_INLINE int32_t tEncodeSTqTopic(void** buf, const STqTopic* pTopic) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pTopic->topicName);
  /*tlen += taosEncodeString(buf, pTopic->sql);*/
  /*tlen += taosEncodeString(buf, pTopic->logicalPlan);*/
  /*tlen += taosEncodeString(buf, pTopic->physicalPlan);*/
  tlen += taosEncodeString(buf, pTopic->qmsg);
L
Liu Jicong 已提交
147 148 149
  /*tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset);*/
  /*tlen += taosEncodeFixedI64(buf, pTopic->committedOffset);*/
  /*tlen += taosEncodeFixedI64(buf, pTopic->currentOffset);*/
L
Liu Jicong 已提交
150 151 152 153 154 155 156 157 158
  return tlen;
}

static FORCE_INLINE const void* tDecodeSTqTopic(const void* buf, STqTopic* pTopic) {
  buf = taosDecodeStringTo(buf, pTopic->topicName);
  /*buf = taosDecodeString(buf, &pTopic->sql);*/
  /*buf = taosDecodeString(buf, &pTopic->logicalPlan);*/
  /*buf = taosDecodeString(buf, &pTopic->physicalPlan);*/
  buf = taosDecodeString(buf, &pTopic->qmsg);
L
Liu Jicong 已提交
159 160 161
  /*buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset);*/
  /*buf = taosDecodeFixedI64(buf, &pTopic->committedOffset);*/
  /*buf = taosDecodeFixedI64(buf, &pTopic->currentOffset);*/
L
Liu Jicong 已提交
162 163 164 165 166 167 168 169
  return buf;
}

static FORCE_INLINE int32_t tEncodeSTqConsumer(void** buf, const STqConsumer* pConsumer) {
  int32_t sz;

  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
L
Liu Jicong 已提交
170
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
L
Liu Jicong 已提交
171 172 173 174 175 176 177 178
  tlen += taosEncodeString(buf, pConsumer->cgroup);
  sz = taosArrayGetSize(pConsumer->topics);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
    tlen += tEncodeSTqTopic(buf, pTopic);
  }
  return tlen;
L
Liu Jicong 已提交
179 180
}

L
Liu Jicong 已提交
181 182 183 184
static FORCE_INLINE const void* tDecodeSTqConsumer(const void* buf, STqConsumer* pConsumer) {
  int32_t sz;

  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
L
Liu Jicong 已提交
185
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
L
Liu Jicong 已提交
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->topics = taosArrayInit(sz, sizeof(STqTopic));
  if (pConsumer->topics == NULL) return NULL;
  for (int32_t i = 0; i < sz; i++) {
    STqTopic pTopic;
    buf = tDecodeSTqTopic(buf, &pTopic);
    taosArrayPush(pConsumer->topics, &pTopic);
  }
  return buf;
}

int tqSerializeConsumer(const STqConsumer* pConsumer, STqSerializedHead** ppHead) {
  int32_t sz = tEncodeSTqConsumer(NULL, pConsumer);

L
Liu Jicong 已提交
201
  if (sz > (*ppHead)->ssize) {
wafwerar's avatar
wafwerar 已提交
202
    void* tmpPtr = taosMemoryRealloc(*ppHead, sizeof(STqSerializedHead) + sz);
L
Liu Jicong 已提交
203
    if (tmpPtr == NULL) {
wafwerar's avatar
wafwerar 已提交
204
      taosMemoryFree(*ppHead);
L
Liu Jicong 已提交
205
      terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
206 207 208 209 210 211 212
      return -1;
    }
    *ppHead = tmpPtr;
    (*ppHead)->ssize = sz;
  }

  void* ptr = (*ppHead)->content;
L
Liu Jicong 已提交
213 214
  void* abuf = ptr;
  tEncodeSTqConsumer(&abuf, pConsumer);
L
Liu Jicong 已提交
215

L
Liu Jicong 已提交
216 217 218
  return 0;
}

L
Liu Jicong 已提交
219 220
int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsumer** ppConsumer) {
  const void* str = pHead->content;
wafwerar's avatar
wafwerar 已提交
221
  *ppConsumer = taosMemoryCalloc(1, sizeof(STqConsumer));
L
Liu Jicong 已提交
222 223 224 225 226 227 228 229 230 231
  if (*ppConsumer == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
  }
  if (tDecodeSTqConsumer(str, *ppConsumer) == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
  }
  STqConsumer* pConsumer = *ppConsumer;
  int32_t      sz = taosArrayGetSize(pConsumer->topics);
L
Liu Jicong 已提交
232
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
233 234 235 236 237
    STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
    pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
    if (pTopic->pReadhandle == NULL) {
      ASSERT(false);
    }
L
Liu Jicong 已提交
238 239
    for (int j = 0; j < TQ_BUFFER_SIZE; j++) {
      pTopic->buffer.output[j].status = 0;
L
Liu Jicong 已提交
240
      STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
L
Liu Jicong 已提交
241 242 243 244
      SReadHandle    handle = {
             .reader = pReadHandle,
             .meta = pTq->pVnodeMeta,
      };
L
Liu Jicong 已提交
245 246
      pTopic->buffer.output[j].pReadHandle = pReadHandle;
      pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
L
Liu Jicong 已提交
247
    }
L
Liu Jicong 已提交
248
  }
L
Liu Jicong 已提交
249 250

  return 0;
L
Liu Jicong 已提交
251 252
}

L
fix  
Liu Jicong 已提交
253
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
L
Liu Jicong 已提交
254 255 256 257
  SMqPollReq* pReq = pMsg->pCont;
  int64_t     consumerId = pReq->consumerId;
  int64_t     fetchOffset;
  int64_t     blockingTime = pReq->blockingTime;
L
Liu Jicong 已提交
258
  int32_t     reqEpoch = pReq->epoch;
259

L
Liu Jicong 已提交
260 261 262 263 264 265 266 267
  if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
    fetchOffset = 0;
  } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
    fetchOffset = walGetLastVer(pTq->pWal);
  } else {
    fetchOffset = pReq->currentOffset + 1;
  }

L
Liu Jicong 已提交
268
  vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);
L
fix  
Liu Jicong 已提交
269

L
Liu Jicong 已提交
270
  SMqPollRsp rsp = {
L
Liu Jicong 已提交
271
      /*.consumerId = consumerId,*/
L
Liu Jicong 已提交
272 273 274
      .numOfTopics = 0,
      .pBlockData = NULL,
  };
275 276 277

  STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
  if (pConsumer == NULL) {
L
Liu Jicong 已提交
278
    vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId);
279 280 281
    pMsg->pCont = NULL;
    pMsg->contLen = 0;
    pMsg->code = -1;
S
shm  
Shengliang Guan 已提交
282
    tmsgSendRsp(pMsg);
283 284
    return 0;
  }
L
Liu Jicong 已提交
285

L
Liu Jicong 已提交
286 287 288 289 290 291
  int32_t consumerEpoch = atomic_load_32(&pConsumer->epoch);
  while (consumerEpoch < reqEpoch) {
    consumerEpoch = atomic_val_compare_exchange_32(&pConsumer->epoch, consumerEpoch, reqEpoch);
  }

  STqTopic* pTopic = NULL;
292
  int sz = taosArrayGetSize(pConsumer->topics);
L
Liu Jicong 已提交
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
  for (int32_t i = 0; i < sz; i++) {
    STqTopic* topic = taosArrayGet(pConsumer->topics, i);
    //TODO race condition
    ASSERT(pConsumer->consumerId == consumerId);
    if (strcmp(topic->topicName, pReq->topic) == 0) {
      pTopic = topic;
      break;
    }
  }
  if (pTopic == NULL) {
    vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, pTq->pVnode->vgId);
    pMsg->pCont = NULL;
    pMsg->contLen = 0;
    pMsg->code = -1;
    tmsgSendRsp(pMsg);
    return 0;
  }

  vDebug("poll topic %s from consumer %ld (epoch %d)", pTopic->topicName, consumerId, pReq->epoch);
312

L
Liu Jicong 已提交
313
  rsp.reqOffset = pReq->currentOffset;
314 315 316
  rsp.skipLogNum = 0;

  while (1) {
L
Liu Jicong 已提交
317
    /*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
L
Liu Jicong 已提交
318 319 320 321 322 323
    //TODO
    consumerEpoch = atomic_load_32(&pConsumer->epoch);
    if (consumerEpoch > pReq->epoch) {
      //TODO: return
      break;
    }
L
fix  
Liu Jicong 已提交
324 325
    SWalReadHead* pHead;
    if (walReadWithHandle_s(pTopic->pReadhandle, fetchOffset, &pHead) < 0) {
L
Liu Jicong 已提交
326 327
      // TODO: no more log, set timer to wait blocking time
      // if data inserted during waiting, launch query and
L
Liu Jicong 已提交
328
      // response to user
L
Liu Jicong 已提交
329
      vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset);
330 331
      break;
    }
L
Liu Jicong 已提交
332
    vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, pHead->msgType);
L
fix  
Liu Jicong 已提交
333 334 335 336 337 338
    /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
    /*pHead = pTopic->pReadhandle->pHead;*/
    if (pHead->msgType == TDMT_VND_SUBMIT) {
      SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
      qTaskInfo_t task = pTopic->buffer.output[workerId].task;
      ASSERT(task);
339
      qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
340 341
      SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
      while (1) {
L
fix  
Liu Jicong 已提交
342
        SSDataBlock* pDataBlock = NULL;
343 344 345 346 347
        uint64_t     ts;
        if (qExecTask(task, &pDataBlock, &ts) < 0) {
          ASSERT(false);
        }
        if (pDataBlock == NULL) {
L
fix  
Liu Jicong 已提交
348
          /*pos = fetchOffset % TQ_BUFFER_SIZE;*/
349 350 351 352
          break;
        }

        taosArrayPush(pRes, pDataBlock);
L
fix  
Liu Jicong 已提交
353 354 355
      }

      if (taosArrayGetSize(pRes) == 0) {
L
Liu Jicong 已提交
356
        vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset);
L
fix  
Liu Jicong 已提交
357 358 359 360 361 362 363
        fetchOffset++;
        rsp.skipLogNum++;
        taosArrayDestroy(pRes);
        continue;
      }
      rsp.schema = pTopic->buffer.output[workerId].pReadHandle->pSchemaWrapper;
      rsp.rspOffset = fetchOffset;
364

L
fix  
Liu Jicong 已提交
365 366
      rsp.numOfTopics = 1;
      rsp.pBlockData = pRes;
367

L
fix  
Liu Jicong 已提交
368 369 370 371 372 373
      int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp);
      void*   buf = rpcMallocCont(tlen);
      if (buf == NULL) {
        pMsg->code = -1;
        taosMemoryFree(pHead);
        return -1;
374
      }
L
fix  
Liu Jicong 已提交
375 376 377 378 379 380 381 382 383 384
      ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
      ((SMqRspHead*)buf)->epoch = pReq->epoch;
      ((SMqRspHead*)buf)->consumerId = consumerId;

      void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
      tEncodeSMqPollRsp(&abuf, &rsp);
      /*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
      pMsg->pCont = buf;
      pMsg->contLen = tlen;
      pMsg->code = 0;
L
Liu Jicong 已提交
385
      vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, pHead->msgType, consumerId, pReq->epoch);
L
fix  
Liu Jicong 已提交
386 387 388
      tmsgSendRsp(pMsg);
      taosMemoryFree(pHead);
      return 0;
389
    } else {
L
fix  
Liu Jicong 已提交
390
      taosMemoryFree(pHead);
391 392 393 394 395
      fetchOffset++;
      rsp.skipLogNum++;
    }
  }

L
Liu Jicong 已提交
396 397 398 399
  /*if (blockingTime != 0) {*/
  /*tqAddClientPusher(pTq->tqPushMgr, pMsg, consumerId, blockingTime);*/
  /*} else {*/
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp);
400 401 402 403 404
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    pMsg->code = -1;
    return -1;
  }
L
Liu Jicong 已提交
405 406
  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
L
temp  
Liu Jicong 已提交
407
  rsp.rspOffset = fetchOffset - 1;
408

L
Liu Jicong 已提交
409
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
410
  tEncodeSMqPollRsp(&abuf, &rsp);
411 412 413 414
  rsp.pBlockData = NULL;
  pMsg->pCont = buf;
  pMsg->contLen = tlen;
  pMsg->code = 0;
S
shm  
Shengliang Guan 已提交
415
  tmsgSendRsp(pMsg);
L
Liu Jicong 已提交
416
  vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, pReq->epoch);
L
Liu Jicong 已提交
417 418
  /*}*/

419 420 421
  return 0;
}

L
Liu Jicong 已提交
422 423
int32_t tqProcessRebReq(STQ* pTq, char* msg) {
  SMqMVRebReq req = {0};
L
Liu Jicong 已提交
424
  terrno = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
425 426
  tDecodeSMqMVRebReq(msg, &req);

L
Liu Jicong 已提交
427
  vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId ,req.newConsumerId);
L
Liu Jicong 已提交
428
  STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
L
Liu Jicong 已提交
429
  ASSERT(pConsumer);
L
Liu Jicong 已提交
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
  ASSERT(pConsumer->consumerId == req.oldConsumerId);
  int32_t numOfTopics = taosArrayGetSize(pConsumer->topics);
  if (numOfTopics == 1) {
    STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0);
    ASSERT(strcmp(pTopic->topicName, req.topic) == 0);
    STqConsumer* pNewConsumer = tqHandleGet(pTq->tqMeta, req.newConsumerId);
    if (pNewConsumer == NULL) {
      pConsumer->consumerId = req.newConsumerId;
      tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
      tqHandleCommit(pTq->tqMeta, req.newConsumerId);
      tqHandlePurge(pTq->tqMeta, req.oldConsumerId);
      return 0;
    } else {
      taosArrayPush(pNewConsumer->topics, pTopic);
    }
  } else {
    for (int32_t i = 0; i < numOfTopics; i++) {
      STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
      if (strcmp(pTopic->topicName, req.topic) == 0) {
        STqConsumer* pNewConsumer = tqHandleGet(pTq->tqMeta, req.newConsumerId);
        if (pNewConsumer == NULL) {
          pNewConsumer = taosMemoryCalloc(1, sizeof(STqConsumer));
          if (pNewConsumer == NULL) {
            terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
            return -1;
          }
          strcpy(pNewConsumer->cgroup, pConsumer->cgroup);
          pNewConsumer->topics = taosArrayInit(0, sizeof(STqTopic));
          pNewConsumer->consumerId = req.newConsumerId;
          pNewConsumer->epoch = 0;

          taosArrayPush(pNewConsumer->topics, pTopic);
          tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
          tqHandleCommit(pTq->tqMeta, req.newConsumerId);
          return 0;
        }
        ASSERT(pNewConsumer->consumerId == req.newConsumerId);
        taosArrayPush(pNewConsumer->topics, pTopic);
        break;
      }
    }
    //
  }
L
Liu Jicong 已提交
473 474 475
  return 0;
}

L
Liu Jicong 已提交
476
int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
L
Liu Jicong 已提交
477
  SMqSetCVgReq req = {0};
L
Liu Jicong 已提交
478
  tDecodeSMqSetCVgReq(msg, &req);
L
Liu Jicong 已提交
479
  bool create = false;
L
Liu Jicong 已提交
480

L
Liu Jicong 已提交
481 482
  vDebug("vg %d set to consumer %ld", req.vgId, req.consumerId);
  STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.consumerId);
L
Liu Jicong 已提交
483
  if (pConsumer == NULL) {
L
Liu Jicong 已提交
484 485 486 487 488 489 490 491 492
    pConsumer = taosMemoryCalloc(1, sizeof(STqConsumer));
    if (pConsumer == NULL) {
      terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
      return -1;
    }
    strcpy(pConsumer->cgroup, req.cgroup);
    pConsumer->topics = taosArrayInit(0, sizeof(STqTopic));
    pConsumer->consumerId = req.consumerId;
    pConsumer->epoch = 0;
L
Liu Jicong 已提交
493
    create = true;
L
Liu Jicong 已提交
494
  }
L
Liu Jicong 已提交
495

wafwerar's avatar
wafwerar 已提交
496
  STqTopic* pTopic = taosMemoryCalloc(1, sizeof(STqTopic));
L
Liu Jicong 已提交
497
  if (pTopic == NULL) {
L
Liu Jicong 已提交
498
    taosArrayDestroy(pConsumer->topics);
wafwerar's avatar
wafwerar 已提交
499
    taosMemoryFree(pConsumer);
L
Liu Jicong 已提交
500 501
    return -1;
  }
L
Liu Jicong 已提交
502
  strcpy(pTopic->topicName, req.topicName);
L
Liu Jicong 已提交
503 504 505
  pTopic->sql = req.sql;
  pTopic->logicalPlan = req.logicalPlan;
  pTopic->physicalPlan = req.physicalPlan;
L
Liu Jicong 已提交
506
  pTopic->qmsg = req.qmsg;
L
Liu Jicong 已提交
507 508
  /*pTopic->committedOffset = -1;*/
  /*pTopic->currentOffset = -1;*/
509

L
Liu Jicong 已提交
510 511
  pTopic->buffer.firstOffset = -1;
  pTopic->buffer.lastOffset = -1;
L
Liu Jicong 已提交
512 513
  pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
  if (pTopic->pReadhandle == NULL) {
L
Liu Jicong 已提交
514
    ASSERT(false);
L
Liu Jicong 已提交
515
  }
L
Liu Jicong 已提交
516 517
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
    pTopic->buffer.output[i].status = 0;
L
Liu Jicong 已提交
518
    STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
L
Liu Jicong 已提交
519 520 521 522
    SReadHandle    handle = {
           .reader = pReadHandle,
           .meta = pTq->pVnodeMeta,
    };
L
Liu Jicong 已提交
523
    pTopic->buffer.output[i].pReadHandle = pReadHandle;
L
Liu Jicong 已提交
524
    pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
L
fix  
Liu Jicong 已提交
525
    ASSERT(pTopic->buffer.output[i].task);
L
Liu Jicong 已提交
526
  }
L
Liu Jicong 已提交
527
  vDebug("set topic %s to consumer %ld on vg %d", pTopic->topicName, req.consumerId, pTq->pVnode->vgId);
L
Liu Jicong 已提交
528
  taosArrayPush(pConsumer->topics, pTopic);
L
Liu Jicong 已提交
529 530 531 532
  if (create) {
    tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer);
    tqHandleCommit(pTq->tqMeta, req.consumerId);
  }
L
Liu Jicong 已提交
533
  terrno = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
534 535
  return 0;
}
L
Liu Jicong 已提交
536

L
Liu Jicong 已提交
537 538 539 540 541
int32_t tqProcessCancelConnReq(STQ* pTq, char* msg) {
  terrno = TSDB_CODE_SUCCESS;
  return 0;
}

L
Liu Jicong 已提交
542
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
L
Liu Jicong 已提交
543 544 545
  if (pTask->execType == TASK_EXEC__NONE) return 0;

  pTask->exec.numOfRunners = parallel;
L
Liu Jicong 已提交
546
  pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner));
L
Liu Jicong 已提交
547 548 549
  if (pTask->exec.runners == NULL) {
    return -1;
  }
L
Liu Jicong 已提交
550 551 552 553 554 555
  for (int32_t i = 0; i < parallel; i++) {
    STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
    SReadHandle    handle = {
           .reader = pReadHandle,
           .meta = pTq->pVnodeMeta,
    };
L
Liu Jicong 已提交
556 557
    pTask->exec.runners[i].inputHandle = pReadHandle;
    pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
L
Liu Jicong 已提交
558
    ASSERT(pTask->exec.runners[i].executor);
L
Liu Jicong 已提交
559 560 561 562
  }
  return 0;
}

L
Liu Jicong 已提交
563
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
wafwerar's avatar
wafwerar 已提交
564
  SStreamTask* pTask = taosMemoryMalloc(sizeof(SStreamTask));
L
Liu Jicong 已提交
565 566 567 568 569
  if (pTask == NULL) {
    return -1;
  }
  SCoder decoder;
  tCoderInit(&decoder, TD_LITTLE_ENDIAN, (uint8_t*)msg, msgLen, TD_DECODER);
L
Liu Jicong 已提交
570 571 572
  if (tDecodeSStreamTask(&decoder, pTask) < 0) {
    ASSERT(0);
  }
L
Liu Jicong 已提交
573 574
  tCoderClear(&decoder);

L
Liu Jicong 已提交
575
  // exec
L
Liu Jicong 已提交
576 577 578
  if (tqExpandTask(pTq, pTask, 4) < 0) {
    ASSERT(0);
  }
L
Liu Jicong 已提交
579 580

  // sink
L
Liu Jicong 已提交
581
  pTask->ahandle = pTq->pVnode;
L
Liu Jicong 已提交
582 583 584
  if (pTask->sinkType == TASK_SINK__SMA) {
    pTask->smaSink.smaHandle = smaHandleRes;
  }
L
Liu Jicong 已提交
585

L
Liu Jicong 已提交
586 587 588 589
  taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));

  return 0;
}
L
Liu Jicong 已提交
590

L
Liu Jicong 已提交
591
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId) {
L
Liu Jicong 已提交
592 593 594 595 596 597 598
  void* pIter = NULL;

  while (1) {
    pIter = taosHashIterate(pTq->pStreamTasks, pIter);
    if (pIter == NULL) break;
    SStreamTask* pTask = (SStreamTask*)pIter;

L
Liu Jicong 已提交
599
    if (streamExecTask(pTask, &pTq->pVnode->msgCb, data, STREAM_DATA_TYPE_SUBMIT_BLOCK, workerId) < 0) {
L
Liu Jicong 已提交
600
      // TODO
L
Liu Jicong 已提交
601 602 603 604 605
    }
  }
  return 0;
}

L
Liu Jicong 已提交
606
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) {
L
Liu Jicong 已提交
607
  SStreamTaskExecReq req;
L
Liu Jicong 已提交
608
  tDecodeSStreamTaskExecReq(msg, &req);
L
Liu Jicong 已提交
609

L
Liu Jicong 已提交
610 611 612
  int32_t taskId = req.taskId;
  ASSERT(taskId);

L
Liu Jicong 已提交
613
  SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
614
  ASSERT(pTask);
L
Liu Jicong 已提交
615

L
Liu Jicong 已提交
616
  if (streamExecTask(pTask, &pTq->pVnode->msgCb, req.data, STREAM_DATA_TYPE_SSDATA_BLOCK, workerId) < 0) {
L
Liu Jicong 已提交
617
    // TODO
L
Liu Jicong 已提交
618
  }
L
Liu Jicong 已提交
619 620
  return 0;
}