tq.c 18.2 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "tcompare.h"
L
Liu Jicong 已提交
17
#include "tdatablock.h"
L
Liu Jicong 已提交
18
#include "tqInt.h"
L
Liu Jicong 已提交
19
#include "tqMetaStore.h"
L
Liu Jicong 已提交
20
#include "tstream.h"
S
Shengliang Guan 已提交
21

L
Liu Jicong 已提交
22
int32_t tqInit() { return tqPushMgrInit(); }
L
Liu Jicong 已提交
23

L
Liu Jicong 已提交
24
void tqCleanUp() { tqPushMgrCleanUp(); }
L
Liu Jicong 已提交
25

L
Liu Jicong 已提交
26 27
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig,
            SMemAllocatorFactory* allocFac) {
wafwerar's avatar
wafwerar 已提交
28
  STQ* pTq = taosMemoryMalloc(sizeof(STQ));
L
Liu Jicong 已提交
29
  if (pTq == NULL) {
L
Liu Jicong 已提交
30
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
31 32
    return NULL;
  }
H
Hongze Cheng 已提交
33
  pTq->path = strdup(path);
L
Liu Jicong 已提交
34
  pTq->tqConfig = tqConfig;
L
Liu Jicong 已提交
35
  pTq->pVnode = pVnode;
L
Liu Jicong 已提交
36
  pTq->pWal = pWal;
L
Liu Jicong 已提交
37
  pTq->pVnodeMeta = pVnodeMeta;
L
Liu Jicong 已提交
38
#if 0
L
Liu Jicong 已提交
39
  pTq->tqMemRef.pAllocatorFactory = allocFac;
L
Liu Jicong 已提交
40 41
  pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
  if (pTq->tqMemRef.pAllocator == NULL) {
L
Liu Jicong 已提交
42
    // TODO: error code of buffer pool
L
Liu Jicong 已提交
43
  }
L
Liu Jicong 已提交
44
#endif
L
Liu Jicong 已提交
45 46
  pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
                            (FTqDelete)taosMemoryFree, 0);
L
Liu Jicong 已提交
47
  if (pTq->tqMeta == NULL) {
wafwerar's avatar
wafwerar 已提交
48
    taosMemoryFree(pTq);
L
Liu Jicong 已提交
49 50 51
#if 0
    allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator);
#endif
L
Liu Jicong 已提交
52 53
    return NULL;
  }
L
Liu Jicong 已提交
54

L
Liu Jicong 已提交
55
#if 0
L
Liu Jicong 已提交
56 57 58
  pTq->tqPushMgr = tqPushMgrOpen();
  if (pTq->tqPushMgr == NULL) {
    // free store
wafwerar's avatar
wafwerar 已提交
59
    taosMemoryFree(pTq);
L
Liu Jicong 已提交
60 61
    return NULL;
  }
L
Liu Jicong 已提交
62
#endif
L
Liu Jicong 已提交
63

L
Liu Jicong 已提交
64 65
  pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);

L
Liu Jicong 已提交
66 67
  return pTq;
}
L
Liu Jicong 已提交
68

L
Liu Jicong 已提交
69
void tqClose(STQ* pTq) {
H
Hongze Cheng 已提交
70
  if (pTq) {
wafwerar's avatar
wafwerar 已提交
71 72
    taosMemoryFreeClear(pTq->path);
    taosMemoryFree(pTq);
H
Hongze Cheng 已提交
73
  }
L
Liu Jicong 已提交
74 75
  // TODO
}
L
Liu Jicong 已提交
76

L
Liu Jicong 已提交
77
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version) {
L
Liu Jicong 已提交
78
  if (msgType != TDMT_VND_SUBMIT) return 0;
wafwerar's avatar
wafwerar 已提交
79
  void* data = taosMemoryMalloc(msgLen);
L
Liu Jicong 已提交
80 81
  if (data == NULL) {
    return -1;
L
Liu Jicong 已提交
82
  }
L
Liu Jicong 已提交
83 84 85 86 87 88 89
  memcpy(data, msg, msgLen);
  SRpcMsg req = {
      .msgType = TDMT_VND_STREAM_TRIGGER,
      .pCont = data,
      .contLen = msgLen,
  };
  tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req);
L
Liu Jicong 已提交
90 91

#if 0
L
Liu Jicong 已提交
92 93 94 95 96 97
  void* pIter = taosHashIterate(pTq->tqPushMgr->pHash, NULL);
  while (pIter != NULL) {
    STqPusher* pusher = *(STqPusher**)pIter;
    if (pusher->type == TQ_PUSHER_TYPE__STREAM) {
      STqStreamPusher* streamPusher = (STqStreamPusher*)pusher;
      // repack
wafwerar's avatar
wafwerar 已提交
98
      STqStreamToken* token = taosMemoryMalloc(sizeof(STqStreamToken));
L
Liu Jicong 已提交
99 100 101 102 103 104 105 106 107 108 109 110
      if (token == NULL) {
        taosHashCancelIterate(pTq->tqPushMgr->pHash, pIter);
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return -1;
      }
      token->type = TQ_STREAM_TOKEN__DATA;
      token->data = msg;
      // set input
      // exec
    }
    // send msg to ep
  }
L
Liu Jicong 已提交
111 112
  // iterate hash
  // process all msg
L
fix  
Liu Jicong 已提交
113 114
  // if waiting
  // memcpy and send msg to fetch thread
L
Liu Jicong 已提交
115 116 117 118
  // TODO: add reference
  // if handle waiting, launch query and response to consumer
  //
  // if no waiting handle, return
L
Liu Jicong 已提交
119
#endif
L
Liu Jicong 已提交
120 121 122
  return 0;
}

L
Liu Jicong 已提交
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
int tqCommit(STQ* pTq) { return tqStorePersist(pTq->tqMeta); }

int32_t tqGetTopicHandleSize(const STqTopic* pTopic) {
  return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->logicalPlan) + strlen(pTopic->physicalPlan) +
         strlen(pTopic->qmsg) + sizeof(int64_t) * 3;
}

int32_t tqGetConsumerHandleSize(const STqConsumer* pConsumer) {
  int     num = taosArrayGetSize(pConsumer->topics);
  int32_t sz = 0;
  for (int i = 0; i < num; i++) {
    STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
    sz += tqGetTopicHandleSize(pTopic);
  }
  return sz;
}

static FORCE_INLINE int32_t tEncodeSTqTopic(void** buf, const STqTopic* pTopic) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pTopic->topicName);
  /*tlen += taosEncodeString(buf, pTopic->sql);*/
  /*tlen += taosEncodeString(buf, pTopic->logicalPlan);*/
  /*tlen += taosEncodeString(buf, pTopic->physicalPlan);*/
  tlen += taosEncodeString(buf, pTopic->qmsg);
L
Liu Jicong 已提交
147 148 149
  /*tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset);*/
  /*tlen += taosEncodeFixedI64(buf, pTopic->committedOffset);*/
  /*tlen += taosEncodeFixedI64(buf, pTopic->currentOffset);*/
L
Liu Jicong 已提交
150 151 152 153 154 155 156 157 158
  return tlen;
}

static FORCE_INLINE const void* tDecodeSTqTopic(const void* buf, STqTopic* pTopic) {
  buf = taosDecodeStringTo(buf, pTopic->topicName);
  /*buf = taosDecodeString(buf, &pTopic->sql);*/
  /*buf = taosDecodeString(buf, &pTopic->logicalPlan);*/
  /*buf = taosDecodeString(buf, &pTopic->physicalPlan);*/
  buf = taosDecodeString(buf, &pTopic->qmsg);
L
Liu Jicong 已提交
159 160 161
  /*buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset);*/
  /*buf = taosDecodeFixedI64(buf, &pTopic->committedOffset);*/
  /*buf = taosDecodeFixedI64(buf, &pTopic->currentOffset);*/
L
Liu Jicong 已提交
162 163 164 165 166 167 168 169
  return buf;
}

static FORCE_INLINE int32_t tEncodeSTqConsumer(void** buf, const STqConsumer* pConsumer) {
  int32_t sz;

  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
L
Liu Jicong 已提交
170
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
L
Liu Jicong 已提交
171 172 173 174 175 176 177 178
  tlen += taosEncodeString(buf, pConsumer->cgroup);
  sz = taosArrayGetSize(pConsumer->topics);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
    tlen += tEncodeSTqTopic(buf, pTopic);
  }
  return tlen;
L
Liu Jicong 已提交
179 180
}

L
Liu Jicong 已提交
181 182 183 184
static FORCE_INLINE const void* tDecodeSTqConsumer(const void* buf, STqConsumer* pConsumer) {
  int32_t sz;

  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
L
Liu Jicong 已提交
185
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
L
Liu Jicong 已提交
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->topics = taosArrayInit(sz, sizeof(STqTopic));
  if (pConsumer->topics == NULL) return NULL;
  for (int32_t i = 0; i < sz; i++) {
    STqTopic pTopic;
    buf = tDecodeSTqTopic(buf, &pTopic);
    taosArrayPush(pConsumer->topics, &pTopic);
  }
  return buf;
}

int tqSerializeConsumer(const STqConsumer* pConsumer, STqSerializedHead** ppHead) {
  int32_t sz = tEncodeSTqConsumer(NULL, pConsumer);

L
Liu Jicong 已提交
201
  if (sz > (*ppHead)->ssize) {
wafwerar's avatar
wafwerar 已提交
202
    void* tmpPtr = taosMemoryRealloc(*ppHead, sizeof(STqSerializedHead) + sz);
L
Liu Jicong 已提交
203
    if (tmpPtr == NULL) {
wafwerar's avatar
wafwerar 已提交
204
      taosMemoryFree(*ppHead);
L
Liu Jicong 已提交
205
      terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
206 207 208 209 210 211 212
      return -1;
    }
    *ppHead = tmpPtr;
    (*ppHead)->ssize = sz;
  }

  void* ptr = (*ppHead)->content;
L
Liu Jicong 已提交
213 214
  void* abuf = ptr;
  tEncodeSTqConsumer(&abuf, pConsumer);
L
Liu Jicong 已提交
215

L
Liu Jicong 已提交
216 217 218
  return 0;
}

L
Liu Jicong 已提交
219 220
int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsumer** ppConsumer) {
  const void* str = pHead->content;
wafwerar's avatar
wafwerar 已提交
221
  *ppConsumer = taosMemoryCalloc(1, sizeof(STqConsumer));
L
Liu Jicong 已提交
222 223 224 225 226 227 228 229 230 231
  if (*ppConsumer == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
  }
  if (tDecodeSTqConsumer(str, *ppConsumer) == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
  }
  STqConsumer* pConsumer = *ppConsumer;
  int32_t      sz = taosArrayGetSize(pConsumer->topics);
L
Liu Jicong 已提交
232
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
233 234 235 236 237
    STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
    pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
    if (pTopic->pReadhandle == NULL) {
      ASSERT(false);
    }
L
Liu Jicong 已提交
238 239
    for (int j = 0; j < TQ_BUFFER_SIZE; j++) {
      pTopic->buffer.output[j].status = 0;
L
Liu Jicong 已提交
240
      STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
L
Liu Jicong 已提交
241 242 243 244
      SReadHandle    handle = {
             .reader = pReadHandle,
             .meta = pTq->pVnodeMeta,
      };
L
Liu Jicong 已提交
245 246
      pTopic->buffer.output[j].pReadHandle = pReadHandle;
      pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
L
Liu Jicong 已提交
247
    }
L
Liu Jicong 已提交
248
  }
L
Liu Jicong 已提交
249 250

  return 0;
L
Liu Jicong 已提交
251 252
}

L
fix  
Liu Jicong 已提交
253
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
L
Liu Jicong 已提交
254 255 256 257
  SMqPollReq* pReq = pMsg->pCont;
  int64_t     consumerId = pReq->consumerId;
  int64_t     fetchOffset;
  int64_t     blockingTime = pReq->blockingTime;
L
Liu Jicong 已提交
258
  int32_t     reqEpoch = pReq->epoch;
259

L
Liu Jicong 已提交
260 261 262 263 264 265 266 267
  if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
    fetchOffset = 0;
  } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
    fetchOffset = walGetLastVer(pTq->pWal);
  } else {
    fetchOffset = pReq->currentOffset + 1;
  }

L
Liu Jicong 已提交
268
  vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);
L
fix  
Liu Jicong 已提交
269

L
Liu Jicong 已提交
270
  SMqPollRsp rsp = {
L
Liu Jicong 已提交
271
      /*.consumerId = consumerId,*/
L
Liu Jicong 已提交
272 273 274
      .numOfTopics = 0,
      .pBlockData = NULL,
  };
275 276 277

  STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
  if (pConsumer == NULL) {
L
Liu Jicong 已提交
278
    vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId);
279 280 281
    pMsg->pCont = NULL;
    pMsg->contLen = 0;
    pMsg->code = -1;
S
shm  
Shengliang Guan 已提交
282
    tmsgSendRsp(pMsg);
283 284
    return 0;
  }
L
Liu Jicong 已提交
285

L
Liu Jicong 已提交
286 287 288 289 290 291
  int32_t consumerEpoch = atomic_load_32(&pConsumer->epoch);
  while (consumerEpoch < reqEpoch) {
    consumerEpoch = atomic_val_compare_exchange_32(&pConsumer->epoch, consumerEpoch, reqEpoch);
  }

  STqTopic* pTopic = NULL;
292
  int sz = taosArrayGetSize(pConsumer->topics);
L
Liu Jicong 已提交
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
  for (int32_t i = 0; i < sz; i++) {
    STqTopic* topic = taosArrayGet(pConsumer->topics, i);
    //TODO race condition
    ASSERT(pConsumer->consumerId == consumerId);
    if (strcmp(topic->topicName, pReq->topic) == 0) {
      pTopic = topic;
      break;
    }
  }
  if (pTopic == NULL) {
    vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, pTq->pVnode->vgId);
    pMsg->pCont = NULL;
    pMsg->contLen = 0;
    pMsg->code = -1;
    tmsgSendRsp(pMsg);
    return 0;
  }

  vDebug("poll topic %s from consumer %ld (epoch %d)", pTopic->topicName, consumerId, pReq->epoch);
312

L
Liu Jicong 已提交
313
  rsp.reqOffset = pReq->currentOffset;
314 315 316
  rsp.skipLogNum = 0;

  while (1) {
L
Liu Jicong 已提交
317
    /*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
L
Liu Jicong 已提交
318 319 320 321 322 323
    //TODO
    consumerEpoch = atomic_load_32(&pConsumer->epoch);
    if (consumerEpoch > pReq->epoch) {
      //TODO: return
      break;
    }
L
fix  
Liu Jicong 已提交
324 325
    SWalReadHead* pHead;
    if (walReadWithHandle_s(pTopic->pReadhandle, fetchOffset, &pHead) < 0) {
L
Liu Jicong 已提交
326 327
      // TODO: no more log, set timer to wait blocking time
      // if data inserted during waiting, launch query and
L
Liu Jicong 已提交
328
      // response to user
L
Liu Jicong 已提交
329
      vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset);
330 331
      break;
    }
L
Liu Jicong 已提交
332
    vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, pHead->msgType);
L
fix  
Liu Jicong 已提交
333 334 335 336 337 338
    /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
    /*pHead = pTopic->pReadhandle->pHead;*/
    if (pHead->msgType == TDMT_VND_SUBMIT) {
      SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
      qTaskInfo_t task = pTopic->buffer.output[workerId].task;
      ASSERT(task);
339
      qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
340 341
      SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
      while (1) {
L
fix  
Liu Jicong 已提交
342
        SSDataBlock* pDataBlock = NULL;
343 344 345 346 347
        uint64_t     ts;
        if (qExecTask(task, &pDataBlock, &ts) < 0) {
          ASSERT(false);
        }
        if (pDataBlock == NULL) {
L
fix  
Liu Jicong 已提交
348
          /*pos = fetchOffset % TQ_BUFFER_SIZE;*/
349 350 351 352
          break;
        }

        taosArrayPush(pRes, pDataBlock);
L
fix  
Liu Jicong 已提交
353 354 355
      }

      if (taosArrayGetSize(pRes) == 0) {
L
Liu Jicong 已提交
356
        vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset);
L
fix  
Liu Jicong 已提交
357 358 359 360 361 362 363
        fetchOffset++;
        rsp.skipLogNum++;
        taosArrayDestroy(pRes);
        continue;
      }
      rsp.schema = pTopic->buffer.output[workerId].pReadHandle->pSchemaWrapper;
      rsp.rspOffset = fetchOffset;
364

L
fix  
Liu Jicong 已提交
365 366
      rsp.numOfTopics = 1;
      rsp.pBlockData = pRes;
367

L
fix  
Liu Jicong 已提交
368 369 370 371 372 373
      int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp);
      void*   buf = rpcMallocCont(tlen);
      if (buf == NULL) {
        pMsg->code = -1;
        taosMemoryFree(pHead);
        return -1;
374
      }
L
fix  
Liu Jicong 已提交
375 376 377 378 379 380 381 382 383 384
      ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
      ((SMqRspHead*)buf)->epoch = pReq->epoch;
      ((SMqRspHead*)buf)->consumerId = consumerId;

      void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
      tEncodeSMqPollRsp(&abuf, &rsp);
      /*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
      pMsg->pCont = buf;
      pMsg->contLen = tlen;
      pMsg->code = 0;
L
Liu Jicong 已提交
385
      vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, pHead->msgType, consumerId, pReq->epoch);
L
fix  
Liu Jicong 已提交
386 387 388
      tmsgSendRsp(pMsg);
      taosMemoryFree(pHead);
      return 0;
389
    } else {
L
fix  
Liu Jicong 已提交
390
      taosMemoryFree(pHead);
391 392 393 394 395
      fetchOffset++;
      rsp.skipLogNum++;
    }
  }

L
Liu Jicong 已提交
396 397 398 399
  /*if (blockingTime != 0) {*/
  /*tqAddClientPusher(pTq->tqPushMgr, pMsg, consumerId, blockingTime);*/
  /*} else {*/
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp);
400 401 402 403 404
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    pMsg->code = -1;
    return -1;
  }
L
Liu Jicong 已提交
405 406
  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
L
temp  
Liu Jicong 已提交
407
  rsp.rspOffset = fetchOffset - 1;
408

L
Liu Jicong 已提交
409
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
L
Liu Jicong 已提交
410
  tEncodeSMqPollRsp(&abuf, &rsp);
411 412 413 414
  rsp.pBlockData = NULL;
  pMsg->pCont = buf;
  pMsg->contLen = tlen;
  pMsg->code = 0;
S
shm  
Shengliang Guan 已提交
415
  tmsgSendRsp(pMsg);
L
Liu Jicong 已提交
416
  vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, pReq->epoch);
L
Liu Jicong 已提交
417 418
  /*}*/

419 420 421
  return 0;
}

L
Liu Jicong 已提交
422 423 424 425
int32_t tqProcessRebReq(STQ* pTq, char* msg) {
  SMqMVRebReq req = {0};
  tDecodeSMqMVRebReq(msg, &req);

L
Liu Jicong 已提交
426
  vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId ,req.newConsumerId);
L
Liu Jicong 已提交
427
  STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
L
Liu Jicong 已提交
428
  ASSERT(pConsumer);
L
Liu Jicong 已提交
429
  pConsumer->consumerId = req.newConsumerId;
L
Liu Jicong 已提交
430 431 432 433 434 435 436
  tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
  tqHandleCommit(pTq->tqMeta, req.newConsumerId);
  tqHandlePurge(pTq->tqMeta, req.oldConsumerId);
  terrno = TSDB_CODE_SUCCESS;
  return 0;
}

L
Liu Jicong 已提交
437
int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
L
Liu Jicong 已提交
438
  SMqSetCVgReq req = {0};
L
Liu Jicong 已提交
439
  tDecodeSMqSetCVgReq(msg, &req);
L
Liu Jicong 已提交
440

L
Liu Jicong 已提交
441 442
  vDebug("vg %d set to consumer %ld", req.vgId, req.consumerId);
  STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.consumerId);
L
Liu Jicong 已提交
443
  if (pConsumer == NULL) {
L
Liu Jicong 已提交
444 445 446 447 448 449 450 451 452
    pConsumer = taosMemoryCalloc(1, sizeof(STqConsumer));
    if (pConsumer == NULL) {
      terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
      return -1;
    }
    strcpy(pConsumer->cgroup, req.cgroup);
    pConsumer->topics = taosArrayInit(0, sizeof(STqTopic));
    pConsumer->consumerId = req.consumerId;
    pConsumer->epoch = 0;
L
Liu Jicong 已提交
453
  }
L
Liu Jicong 已提交
454

wafwerar's avatar
wafwerar 已提交
455
  STqTopic* pTopic = taosMemoryCalloc(1, sizeof(STqTopic));
L
Liu Jicong 已提交
456
  if (pTopic == NULL) {
L
Liu Jicong 已提交
457
    taosArrayDestroy(pConsumer->topics);
wafwerar's avatar
wafwerar 已提交
458
    taosMemoryFree(pConsumer);
L
Liu Jicong 已提交
459 460
    return -1;
  }
L
Liu Jicong 已提交
461
  strcpy(pTopic->topicName, req.topicName);
L
Liu Jicong 已提交
462 463 464
  pTopic->sql = req.sql;
  pTopic->logicalPlan = req.logicalPlan;
  pTopic->physicalPlan = req.physicalPlan;
L
Liu Jicong 已提交
465
  pTopic->qmsg = req.qmsg;
L
Liu Jicong 已提交
466 467
  /*pTopic->committedOffset = -1;*/
  /*pTopic->currentOffset = -1;*/
468

L
Liu Jicong 已提交
469 470
  pTopic->buffer.firstOffset = -1;
  pTopic->buffer.lastOffset = -1;
L
Liu Jicong 已提交
471 472
  pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
  if (pTopic->pReadhandle == NULL) {
L
Liu Jicong 已提交
473
    ASSERT(false);
L
Liu Jicong 已提交
474
  }
L
Liu Jicong 已提交
475 476
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
    pTopic->buffer.output[i].status = 0;
L
Liu Jicong 已提交
477
    STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
L
Liu Jicong 已提交
478 479 480 481
    SReadHandle    handle = {
           .reader = pReadHandle,
           .meta = pTq->pVnodeMeta,
    };
L
Liu Jicong 已提交
482
    pTopic->buffer.output[i].pReadHandle = pReadHandle;
L
Liu Jicong 已提交
483
    pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
L
fix  
Liu Jicong 已提交
484
    ASSERT(pTopic->buffer.output[i].task);
L
Liu Jicong 已提交
485
  }
L
fix txn  
Liu Jicong 已提交
486
  /*printf("set topic %s to consumer %ld on vg %d\n", pTopic->topicName, req.consumerId, pTq->pVnode->vgId);*/
L
Liu Jicong 已提交
487
  taosArrayPush(pConsumer->topics, pTopic);
L
Liu Jicong 已提交
488 489
  tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer);
  tqHandleCommit(pTq->tqMeta, req.consumerId);
L
Liu Jicong 已提交
490
  terrno = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
491 492
  return 0;
}
L
Liu Jicong 已提交
493

L
Liu Jicong 已提交
494
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
L
Liu Jicong 已提交
495 496 497
  if (pTask->execType == TASK_EXEC__NONE) return 0;

  pTask->exec.numOfRunners = parallel;
L
Liu Jicong 已提交
498
  pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner));
L
Liu Jicong 已提交
499 500 501
  if (pTask->exec.runners == NULL) {
    return -1;
  }
L
Liu Jicong 已提交
502 503 504 505 506 507
  for (int32_t i = 0; i < parallel; i++) {
    STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
    SReadHandle    handle = {
           .reader = pReadHandle,
           .meta = pTq->pVnodeMeta,
    };
L
Liu Jicong 已提交
508 509
    pTask->exec.runners[i].inputHandle = pReadHandle;
    pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
L
Liu Jicong 已提交
510
    ASSERT(pTask->exec.runners[i].executor);
L
Liu Jicong 已提交
511 512 513 514
  }
  return 0;
}

L
Liu Jicong 已提交
515
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
wafwerar's avatar
wafwerar 已提交
516
  SStreamTask* pTask = taosMemoryMalloc(sizeof(SStreamTask));
L
Liu Jicong 已提交
517 518 519 520 521
  if (pTask == NULL) {
    return -1;
  }
  SCoder decoder;
  tCoderInit(&decoder, TD_LITTLE_ENDIAN, (uint8_t*)msg, msgLen, TD_DECODER);
L
Liu Jicong 已提交
522 523 524
  if (tDecodeSStreamTask(&decoder, pTask) < 0) {
    ASSERT(0);
  }
L
Liu Jicong 已提交
525 526
  tCoderClear(&decoder);

L
Liu Jicong 已提交
527
  // exec
L
Liu Jicong 已提交
528 529 530
  if (tqExpandTask(pTq, pTask, 4) < 0) {
    ASSERT(0);
  }
L
Liu Jicong 已提交
531 532

  // sink
L
Liu Jicong 已提交
533
  pTask->ahandle = pTq->pVnode;
L
Liu Jicong 已提交
534 535 536
  if (pTask->sinkType == TASK_SINK__SMA) {
    pTask->smaSink.smaHandle = smaHandleRes;
  }
L
Liu Jicong 已提交
537

L
Liu Jicong 已提交
538 539 540 541
  taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));

  return 0;
}
L
Liu Jicong 已提交
542

L
Liu Jicong 已提交
543
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId) {
L
Liu Jicong 已提交
544 545 546 547 548 549 550
  void* pIter = NULL;

  while (1) {
    pIter = taosHashIterate(pTq->pStreamTasks, pIter);
    if (pIter == NULL) break;
    SStreamTask* pTask = (SStreamTask*)pIter;

L
Liu Jicong 已提交
551
    if (streamExecTask(pTask, &pTq->pVnode->msgCb, data, STREAM_DATA_TYPE_SUBMIT_BLOCK, workerId) < 0) {
L
Liu Jicong 已提交
552
      // TODO
L
Liu Jicong 已提交
553 554 555 556 557
    }
  }
  return 0;
}

L
Liu Jicong 已提交
558
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) {
L
Liu Jicong 已提交
559
  SStreamTaskExecReq req;
L
Liu Jicong 已提交
560
  tDecodeSStreamTaskExecReq(msg, &req);
L
Liu Jicong 已提交
561

L
Liu Jicong 已提交
562 563 564
  int32_t taskId = req.taskId;
  ASSERT(taskId);

L
Liu Jicong 已提交
565
  SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
566
  ASSERT(pTask);
L
Liu Jicong 已提交
567

L
Liu Jicong 已提交
568
  if (streamExecTask(pTask, &pTq->pVnode->msgCb, req.data, STREAM_DATA_TYPE_SSDATA_BLOCK, workerId) < 0) {
L
Liu Jicong 已提交
569
    // TODO
L
Liu Jicong 已提交
570
  }
L
Liu Jicong 已提交
571 572
  return 0;
}