tq.c 12.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14
 */
L
Liu Jicong 已提交
15
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
16

L
Liu Jicong 已提交
17
#include "tcompare.h"
L
Liu Jicong 已提交
18
#include "tqInt.h"
L
Liu Jicong 已提交
19
#include "tqMetaStore.h"
S
Shengliang Guan 已提交
20

L
Liu Jicong 已提交
21 22
int tqInit() {
  int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
L
Liu Jicong 已提交
23
  if (old == 1) return 0;
L
Liu Jicong 已提交
24 25 26 27 28 29 30

  tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ");
  return 0;
}

void tqCleanUp() {
  int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0);
L
Liu Jicong 已提交
31
  if (old == 0) return;
L
Liu Jicong 已提交
32 33 34 35
  taosTmrStop(tqMgmt.timer);
  taosTmrCleanUp(tqMgmt.timer);
}

L
Liu Jicong 已提交
36
STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) {
L
Liu Jicong 已提交
37
  STQ* pTq = malloc(sizeof(STQ));
L
Liu Jicong 已提交
38
  if (pTq == NULL) {
L
Liu Jicong 已提交
39
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
40 41
    return NULL;
  }
H
Hongze Cheng 已提交
42
  pTq->path = strdup(path);
L
Liu Jicong 已提交
43
  pTq->tqConfig = tqConfig;
L
Liu Jicong 已提交
44
  pTq->pWal = pWal;
L
Liu Jicong 已提交
45
  pTq->pVnodeMeta = pMeta;
L
Liu Jicong 已提交
46
#if 0
L
Liu Jicong 已提交
47
  pTq->tqMemRef.pAllocatorFactory = allocFac;
L
Liu Jicong 已提交
48 49
  pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
  if (pTq->tqMemRef.pAllocator == NULL) {
L
Liu Jicong 已提交
50
    // TODO: error code of buffer pool
L
Liu Jicong 已提交
51
  }
L
Liu Jicong 已提交
52
#endif
L
Liu Jicong 已提交
53 54
  pTq->tqMeta =
      tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, free, 0);
L
Liu Jicong 已提交
55
  if (pTq->tqMeta == NULL) {
L
Liu Jicong 已提交
56
    free(pTq);
L
Liu Jicong 已提交
57 58 59
#if 0
    allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator);
#endif
L
Liu Jicong 已提交
60 61
    return NULL;
  }
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63 64
  return pTq;
}
L
Liu Jicong 已提交
65

L
Liu Jicong 已提交
66
void tqClose(STQ* pTq) {
H
Hongze Cheng 已提交
67
  if (pTq) {
H
Hongze Cheng 已提交
68
    tfree(pTq->path);
H
Hongze Cheng 已提交
69 70
    free(pTq);
  }
L
Liu Jicong 已提交
71 72
  // TODO
}
L
Liu Jicong 已提交
73

L
Liu Jicong 已提交
74
int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) {
L
fix  
Liu Jicong 已提交
75 76
  // if waiting
  // memcpy and send msg to fetch thread
L
Liu Jicong 已提交
77 78 79 80
  // TODO: add reference
  // if handle waiting, launch query and response to consumer
  //
  // if no waiting handle, return
L
Liu Jicong 已提交
81 82 83
  return 0;
}

L
Liu Jicong 已提交
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
int tqCommit(STQ* pTq) { return tqStorePersist(pTq->tqMeta); }

int32_t tqGetTopicHandleSize(const STqTopic* pTopic) {
  return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->logicalPlan) + strlen(pTopic->physicalPlan) +
         strlen(pTopic->qmsg) + sizeof(int64_t) * 3;
}

int32_t tqGetConsumerHandleSize(const STqConsumer* pConsumer) {
  int     num = taosArrayGetSize(pConsumer->topics);
  int32_t sz = 0;
  for (int i = 0; i < num; i++) {
    STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
    sz += tqGetTopicHandleSize(pTopic);
  }
  return sz;
}

static FORCE_INLINE int32_t tEncodeSTqTopic(void** buf, const STqTopic* pTopic) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pTopic->topicName);
  /*tlen += taosEncodeString(buf, pTopic->sql);*/
  /*tlen += taosEncodeString(buf, pTopic->logicalPlan);*/
  /*tlen += taosEncodeString(buf, pTopic->physicalPlan);*/
  tlen += taosEncodeString(buf, pTopic->qmsg);
L
Liu Jicong 已提交
108 109 110
  /*tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset);*/
  /*tlen += taosEncodeFixedI64(buf, pTopic->committedOffset);*/
  /*tlen += taosEncodeFixedI64(buf, pTopic->currentOffset);*/
L
Liu Jicong 已提交
111 112 113 114 115 116 117 118 119
  return tlen;
}

static FORCE_INLINE const void* tDecodeSTqTopic(const void* buf, STqTopic* pTopic) {
  buf = taosDecodeStringTo(buf, pTopic->topicName);
  /*buf = taosDecodeString(buf, &pTopic->sql);*/
  /*buf = taosDecodeString(buf, &pTopic->logicalPlan);*/
  /*buf = taosDecodeString(buf, &pTopic->physicalPlan);*/
  buf = taosDecodeString(buf, &pTopic->qmsg);
L
Liu Jicong 已提交
120 121 122
  /*buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset);*/
  /*buf = taosDecodeFixedI64(buf, &pTopic->committedOffset);*/
  /*buf = taosDecodeFixedI64(buf, &pTopic->currentOffset);*/
L
Liu Jicong 已提交
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
  return buf;
}

static FORCE_INLINE int32_t tEncodeSTqConsumer(void** buf, const STqConsumer* pConsumer) {
  int32_t sz;

  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
  tlen += taosEncodeFixedI64(buf, pConsumer->epoch);
  tlen += taosEncodeString(buf, pConsumer->cgroup);
  sz = taosArrayGetSize(pConsumer->topics);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
    tlen += tEncodeSTqTopic(buf, pTopic);
  }
  return tlen;
L
Liu Jicong 已提交
140 141
}

L
Liu Jicong 已提交
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
static FORCE_INLINE const void* tDecodeSTqConsumer(const void* buf, STqConsumer* pConsumer) {
  int32_t sz;

  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
  buf = taosDecodeFixedI64(buf, &pConsumer->epoch);
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->topics = taosArrayInit(sz, sizeof(STqTopic));
  if (pConsumer->topics == NULL) return NULL;
  for (int32_t i = 0; i < sz; i++) {
    STqTopic pTopic;
    buf = tDecodeSTqTopic(buf, &pTopic);
    taosArrayPush(pConsumer->topics, &pTopic);
  }
  return buf;
}

int tqSerializeConsumer(const STqConsumer* pConsumer, STqSerializedHead** ppHead) {
  int32_t sz = tEncodeSTqConsumer(NULL, pConsumer);

L
Liu Jicong 已提交
162
  if (sz > (*ppHead)->ssize) {
L
Liu Jicong 已提交
163
    void* tmpPtr = realloc(*ppHead, sizeof(STqSerializedHead) + sz);
L
Liu Jicong 已提交
164 165
    if (tmpPtr == NULL) {
      free(*ppHead);
L
Liu Jicong 已提交
166
      terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
167 168 169 170 171 172 173
      return -1;
    }
    *ppHead = tmpPtr;
    (*ppHead)->ssize = sz;
  }

  void* ptr = (*ppHead)->content;
L
Liu Jicong 已提交
174 175
  void* abuf = ptr;
  tEncodeSTqConsumer(&abuf, pConsumer);
L
Liu Jicong 已提交
176

L
Liu Jicong 已提交
177 178 179
  return 0;
}

L
Liu Jicong 已提交
180 181 182 183 184 185 186 187 188 189 190 191 192
int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsumer** ppConsumer) {
  const void* str = pHead->content;
  *ppConsumer = calloc(1, sizeof(STqConsumer));
  if (*ppConsumer == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
  }
  if (tDecodeSTqConsumer(str, *ppConsumer) == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
  }
  STqConsumer* pConsumer = *ppConsumer;
  int32_t      sz = taosArrayGetSize(pConsumer->topics);
L
Liu Jicong 已提交
193
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
194 195 196 197 198
    STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
    pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
    if (pTopic->pReadhandle == NULL) {
      ASSERT(false);
    }
L
Liu Jicong 已提交
199 200
    for (int j = 0; j < TQ_BUFFER_SIZE; j++) {
      pTopic->buffer.output[j].status = 0;
L
Liu Jicong 已提交
201 202
      STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
      SReadHandle    handle = {.reader = pReadHandle, .meta = pTq->pVnodeMeta};
L
Liu Jicong 已提交
203 204
      pTopic->buffer.output[j].pReadHandle = pReadHandle;
      pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
L
Liu Jicong 已提交
205
    }
L
Liu Jicong 已提交
206
  }
L
Liu Jicong 已提交
207 208

  return 0;
L
Liu Jicong 已提交
209 210
}

S
Shengliang 已提交
211
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
212 213
  SMqConsumeReq* pReq = pMsg->pCont;
  int64_t        consumerId = pReq->consumerId;
L
Liu Jicong 已提交
214
  int64_t        fetchOffset;
L
fix  
Liu Jicong 已提交
215
  int64_t        blockingTime = pReq->blockingTime;
216

L
Liu Jicong 已提交
217 218 219 220 221 222 223 224
  if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
    fetchOffset = 0;
  } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
    fetchOffset = walGetLastVer(pTq->pWal);
  } else {
    fetchOffset = pReq->currentOffset + 1;
  }

L
Liu Jicong 已提交
225 226 227 228 229
  SMqConsumeRsp rsp = {
      .consumerId = consumerId,
      .numOfTopics = 0,
      .pBlockData = NULL,
  };
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244

  STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
  if (pConsumer == NULL) {
    pMsg->pCont = NULL;
    pMsg->contLen = 0;
    pMsg->code = -1;
    rpcSendResponse(pMsg);
    return 0;
  }
  int sz = taosArrayGetSize(pConsumer->topics);
  ASSERT(sz == 1);
  STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0);
  ASSERT(strcmp(pTopic->topicName, pReq->topic) == 0);
  ASSERT(pConsumer->consumerId == consumerId);

L
Liu Jicong 已提交
245
  rsp.reqOffset = pReq->currentOffset;
246 247 248 249 250 251
  rsp.skipLogNum = 0;

  SWalHead* pHead;
  while (1) {
    int8_t pos = fetchOffset % TQ_BUFFER_SIZE;
    if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
L
Liu Jicong 已提交
252 253
      // TODO: no more log, set timer to wait blocking time
      // if data inserted during waiting, launch query and
L
Liu Jicong 已提交
254
      // response to user
255 256 257 258
      break;
    }
    pHead = pTopic->pReadhandle->pHead;
    if (pHead->head.msgType == TDMT_VND_SUBMIT) {
S
Shengliang Guan 已提交
259
      SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body;
260 261 262 263 264 265 266 267 268 269 270
      qTaskInfo_t task = pTopic->buffer.output[pos].task;
      qSetStreamInput(task, pCont);
      SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
      while (1) {
        SSDataBlock* pDataBlock;
        uint64_t     ts;
        if (qExecTask(task, &pDataBlock, &ts) < 0) {
          ASSERT(false);
        }
        if (pDataBlock == NULL) {
          fetchOffset++;
L
Liu Jicong 已提交
271
          pos = fetchOffset % TQ_BUFFER_SIZE;
272 273 274 275 276 277 278 279 280 281 282
          rsp.skipLogNum++;
          break;
        }

        taosArrayPush(pRes, pDataBlock);
        rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
        rsp.rspOffset = fetchOffset;

        rsp.numOfTopics = 1;
        rsp.pBlockData = pRes;

L
fix  
Liu Jicong 已提交
283
        int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqConsumeRsp(NULL, &rsp);
284 285 286 287 288
        void*   buf = rpcMallocCont(tlen);
        if (buf == NULL) {
          pMsg->code = -1;
          return -1;
        }
L
fix  
Liu Jicong 已提交
289 290
        ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
        ((SMqRspHead*)buf)->epoch = pReq->epoch;
291

L
fix  
Liu Jicong 已提交
292
        void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
293 294 295 296 297 298 299 300 301 302 303 304 305 306
        tEncodeSMqConsumeRsp(&abuf, &rsp);
        taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);
        pMsg->pCont = buf;
        pMsg->contLen = tlen;
        pMsg->code = 0;
        rpcSendResponse(pMsg);
        return 0;
      }
    } else {
      fetchOffset++;
      rsp.skipLogNum++;
    }
  }

L
Liu Jicong 已提交
307
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqConsumeRsp(NULL, &rsp);
308 309 310 311 312
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    pMsg->code = -1;
    return -1;
  }
L
Liu Jicong 已提交
313 314
  ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
  ((SMqRspHead*)buf)->epoch = pReq->epoch;
315

L
Liu Jicong 已提交
316
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
317 318 319 320 321 322 323 324 325
  tEncodeSMqConsumeRsp(&abuf, &rsp);
  rsp.pBlockData = NULL;
  pMsg->pCont = buf;
  pMsg->contLen = tlen;
  pMsg->code = 0;
  rpcSendResponse(pMsg);
  return 0;
}

L
Liu Jicong 已提交
326 327 328 329
int32_t tqProcessRebReq(STQ* pTq, char* msg) {
  SMqMVRebReq req = {0};
  tDecodeSMqMVRebReq(msg, &req);

L
Liu Jicong 已提交
330
  STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
L
Liu Jicong 已提交
331
  ASSERT(pConsumer);
L
Liu Jicong 已提交
332
  pConsumer->consumerId = req.newConsumerId;
L
Liu Jicong 已提交
333 334 335 336 337 338 339
  tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
  tqHandleCommit(pTq->tqMeta, req.newConsumerId);
  tqHandlePurge(pTq->tqMeta, req.oldConsumerId);
  terrno = TSDB_CODE_SUCCESS;
  return 0;
}

L
Liu Jicong 已提交
340
int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
L
Liu Jicong 已提交
341
  SMqSetCVgReq req = {0};
L
Liu Jicong 已提交
342
  tDecodeSMqSetCVgReq(msg, &req);
L
Liu Jicong 已提交
343

344
  /*printf("vg %d set to consumer from %ld to %ld\n", req.vgId, req.oldConsumerId, req.newConsumerId);*/
L
Liu Jicong 已提交
345
  STqConsumer* pConsumer = calloc(1, sizeof(STqConsumer));
L
Liu Jicong 已提交
346
  if (pConsumer == NULL) {
L
Liu Jicong 已提交
347 348
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
L
Liu Jicong 已提交
349
  }
L
Liu Jicong 已提交
350

L
Liu Jicong 已提交
351
  strcpy(pConsumer->cgroup, req.cgroup);
L
Liu Jicong 已提交
352
  pConsumer->topics = taosArrayInit(0, sizeof(STqTopic));
L
Liu Jicong 已提交
353
  pConsumer->consumerId = req.consumerId;
L
Liu Jicong 已提交
354
  pConsumer->epoch = 0;
L
Liu Jicong 已提交
355

L
Liu Jicong 已提交
356
  STqTopic* pTopic = calloc(1, sizeof(STqTopic));
L
Liu Jicong 已提交
357
  if (pTopic == NULL) {
L
Liu Jicong 已提交
358
    taosArrayDestroy(pConsumer->topics);
L
Liu Jicong 已提交
359 360 361
    free(pConsumer);
    return -1;
  }
L
Liu Jicong 已提交
362
  strcpy(pTopic->topicName, req.topicName);
L
Liu Jicong 已提交
363 364 365
  pTopic->sql = req.sql;
  pTopic->logicalPlan = req.logicalPlan;
  pTopic->physicalPlan = req.physicalPlan;
L
Liu Jicong 已提交
366
  pTopic->qmsg = req.qmsg;
L
Liu Jicong 已提交
367 368
  /*pTopic->committedOffset = -1;*/
  /*pTopic->currentOffset = -1;*/
369

L
Liu Jicong 已提交
370 371
  pTopic->buffer.firstOffset = -1;
  pTopic->buffer.lastOffset = -1;
L
Liu Jicong 已提交
372 373
  pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
  if (pTopic->pReadhandle == NULL) {
L
Liu Jicong 已提交
374
    ASSERT(false);
L
Liu Jicong 已提交
375
  }
L
Liu Jicong 已提交
376 377
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
    pTopic->buffer.output[i].status = 0;
L
Liu Jicong 已提交
378
    STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
L
Liu Jicong 已提交
379 380 381 382
    SReadHandle    handle = {
           .reader = pReadHandle,
           .meta = pTq->pVnodeMeta,
    };
L
Liu Jicong 已提交
383
    pTopic->buffer.output[i].pReadHandle = pReadHandle;
L
Liu Jicong 已提交
384
    pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
L
Liu Jicong 已提交
385
  }
L
Liu Jicong 已提交
386
  taosArrayPush(pConsumer->topics, pTopic);
L
Liu Jicong 已提交
387 388
  tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer);
  tqHandleCommit(pTq->tqMeta, req.consumerId);
L
Liu Jicong 已提交
389
  terrno = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
390 391
  return 0;
}