tq.c 12.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14
 */
L
Liu Jicong 已提交
15
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
16

L
Liu Jicong 已提交
17
#include "tcompare.h"
L
Liu Jicong 已提交
18
#include "tqInt.h"
L
Liu Jicong 已提交
19
#include "tqMetaStore.h"
S
Shengliang Guan 已提交
20

L
Liu Jicong 已提交
21 22
int tqInit() {
  int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
L
Liu Jicong 已提交
23
  if (old == 1) return 0;
L
Liu Jicong 已提交
24 25 26 27 28 29 30

  tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ");
  return 0;
}

void tqCleanUp() {
  int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0);
L
Liu Jicong 已提交
31
  if (old == 0) return;
L
Liu Jicong 已提交
32 33 34 35
  taosTmrStop(tqMgmt.timer);
  taosTmrCleanUp(tqMgmt.timer);
}

L
Liu Jicong 已提交
36
STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) {
L
Liu Jicong 已提交
37
  STQ* pTq = malloc(sizeof(STQ));
L
Liu Jicong 已提交
38
  if (pTq == NULL) {
L
Liu Jicong 已提交
39
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
40 41
    return NULL;
  }
H
Hongze Cheng 已提交
42
  pTq->path = strdup(path);
L
Liu Jicong 已提交
43
  pTq->tqConfig = tqConfig;
L
Liu Jicong 已提交
44 45
  pTq->pWal = pWal;
  pTq->pMeta = pMeta;
L
Liu Jicong 已提交
46
#if 0
L
Liu Jicong 已提交
47
  pTq->tqMemRef.pAllocatorFactory = allocFac;
L
Liu Jicong 已提交
48 49
  pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
  if (pTq->tqMemRef.pAllocator == NULL) {
L
Liu Jicong 已提交
50
    // TODO: error code of buffer pool
L
Liu Jicong 已提交
51
  }
L
Liu Jicong 已提交
52
#endif
L
Liu Jicong 已提交
53 54
  pTq->tqMeta =
      tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, free, 0);
L
Liu Jicong 已提交
55
  if (pTq->tqMeta == NULL) {
L
Liu Jicong 已提交
56
    free(pTq);
L
Liu Jicong 已提交
57 58 59
#if 0
    allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator);
#endif
L
Liu Jicong 已提交
60 61
    return NULL;
  }
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63 64
  return pTq;
}
L
Liu Jicong 已提交
65

L
Liu Jicong 已提交
66
void tqClose(STQ* pTq) {
H
Hongze Cheng 已提交
67
  if (pTq) {
H
Hongze Cheng 已提交
68
    tfree(pTq->path);
H
Hongze Cheng 已提交
69 70
    free(pTq);
  }
L
Liu Jicong 已提交
71 72
  // TODO
}
L
Liu Jicong 已提交
73

L
Liu Jicong 已提交
74 75 76
int tqPushMsg(STQ* pTq, void* p, int64_t version) {
  // add reference
  // judge and launch new query
L
Liu Jicong 已提交
77 78 79
  return 0;
}

L
Liu Jicong 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
int tqCommit(STQ* pTq) { return tqStorePersist(pTq->tqMeta); }

int32_t tqGetTopicHandleSize(const STqTopic* pTopic) {
  return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->logicalPlan) + strlen(pTopic->physicalPlan) +
         strlen(pTopic->qmsg) + sizeof(int64_t) * 3;
}

int32_t tqGetConsumerHandleSize(const STqConsumer* pConsumer) {
  int     num = taosArrayGetSize(pConsumer->topics);
  int32_t sz = 0;
  for (int i = 0; i < num; i++) {
    STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
    sz += tqGetTopicHandleSize(pTopic);
  }
  return sz;
}

static FORCE_INLINE int32_t tEncodeSTqTopic(void** buf, const STqTopic* pTopic) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pTopic->topicName);
  /*tlen += taosEncodeString(buf, pTopic->sql);*/
  /*tlen += taosEncodeString(buf, pTopic->logicalPlan);*/
  /*tlen += taosEncodeString(buf, pTopic->physicalPlan);*/
  tlen += taosEncodeString(buf, pTopic->qmsg);
  tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset);
  tlen += taosEncodeFixedI64(buf, pTopic->committedOffset);
  tlen += taosEncodeFixedI64(buf, pTopic->currentOffset);
  return tlen;
}

static FORCE_INLINE const void* tDecodeSTqTopic(const void* buf, STqTopic* pTopic) {
  buf = taosDecodeStringTo(buf, pTopic->topicName);
  /*buf = taosDecodeString(buf, &pTopic->sql);*/
  /*buf = taosDecodeString(buf, &pTopic->logicalPlan);*/
  /*buf = taosDecodeString(buf, &pTopic->physicalPlan);*/
  buf = taosDecodeString(buf, &pTopic->qmsg);
  buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset);
  buf = taosDecodeFixedI64(buf, &pTopic->committedOffset);
  buf = taosDecodeFixedI64(buf, &pTopic->currentOffset);
  return buf;
}

static FORCE_INLINE int32_t tEncodeSTqConsumer(void** buf, const STqConsumer* pConsumer) {
  int32_t sz;

  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
  tlen += taosEncodeFixedI64(buf, pConsumer->epoch);
  tlen += taosEncodeString(buf, pConsumer->cgroup);
  sz = taosArrayGetSize(pConsumer->topics);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
    tlen += tEncodeSTqTopic(buf, pTopic);
  }
  return tlen;
L
Liu Jicong 已提交
136 137
}

L
Liu Jicong 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
static FORCE_INLINE const void* tDecodeSTqConsumer(const void* buf, STqConsumer* pConsumer) {
  int32_t sz;

  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
  buf = taosDecodeFixedI64(buf, &pConsumer->epoch);
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->topics = taosArrayInit(sz, sizeof(STqTopic));
  if (pConsumer->topics == NULL) return NULL;
  for (int32_t i = 0; i < sz; i++) {
    STqTopic pTopic;
    buf = tDecodeSTqTopic(buf, &pTopic);
    taosArrayPush(pConsumer->topics, &pTopic);
  }
  return buf;
}

int tqSerializeConsumer(const STqConsumer* pConsumer, STqSerializedHead** ppHead) {
  int32_t sz = tEncodeSTqConsumer(NULL, pConsumer);

L
Liu Jicong 已提交
158
  if (sz > (*ppHead)->ssize) {
L
Liu Jicong 已提交
159
    void* tmpPtr = realloc(*ppHead, sizeof(STqSerializedHead) + sz);
L
Liu Jicong 已提交
160 161
    if (tmpPtr == NULL) {
      free(*ppHead);
L
Liu Jicong 已提交
162
      terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
163 164 165 166 167 168 169
      return -1;
    }
    *ppHead = tmpPtr;
    (*ppHead)->ssize = sz;
  }

  void* ptr = (*ppHead)->content;
L
Liu Jicong 已提交
170 171
  void* abuf = ptr;
  tEncodeSTqConsumer(&abuf, pConsumer);
L
Liu Jicong 已提交
172

L
Liu Jicong 已提交
173 174 175
  return 0;
}

L
Liu Jicong 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188
int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsumer** ppConsumer) {
  const void* str = pHead->content;
  *ppConsumer = calloc(1, sizeof(STqConsumer));
  if (*ppConsumer == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
  }
  if (tDecodeSTqConsumer(str, *ppConsumer) == NULL) {
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
  }
  STqConsumer* pConsumer = *ppConsumer;
  int32_t      sz = taosArrayGetSize(pConsumer->topics);
L
Liu Jicong 已提交
189
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
190 191 192 193 194 195 196 197 198 199 200 201
    STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
    pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
    if (pTopic->pReadhandle == NULL) {
      ASSERT(false);
    }
    for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
      pTopic->buffer.output[i].status = 0;
      STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
      SReadHandle    handle = {.reader = pReadHandle, .meta = pTq->pMeta};
      pTopic->buffer.output[i].pReadHandle = pReadHandle;
      pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
    }
L
Liu Jicong 已提交
202
  }
L
Liu Jicong 已提交
203 204

  return 0;
L
Liu Jicong 已提交
205 206
}

S
Shengliang 已提交
207
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
208 209 210
  SMqConsumeReq* pReq = pMsg->pCont;
  int64_t        reqId = pReq->reqId;
  int64_t        consumerId = pReq->consumerId;
L
Liu Jicong 已提交
211
  int64_t        fetchOffset = pReq->offset;
L
Liu Jicong 已提交
212
  int64_t        blockingTime = pReq->blockingTime;
L
Liu Jicong 已提交
213

L
Liu Jicong 已提交
214
  SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL};
L
Liu Jicong 已提交
215

216 217
  /*printf("vg %d get consume req\n", pReq->head.vgId);*/

L
Liu Jicong 已提交
218
  STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
L
Liu Jicong 已提交
219 220 221 222 223 224 225
  if (pConsumer == NULL) {
    pMsg->pCont = NULL;
    pMsg->contLen = 0;
    pMsg->code = -1;
    rpcSendResponse(pMsg);
    return 0;
  }
L
Liu Jicong 已提交
226
  int sz = taosArrayGetSize(pConsumer->topics);
L
Liu Jicong 已提交
227

L
Liu Jicong 已提交
228
  for (int i = 0; i < sz; i++) {
L
Liu Jicong 已提交
229
    STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
L
Liu Jicong 已提交
230
    // TODO: support multiple topic in one req
L
Liu Jicong 已提交
231
    if (strcmp(pTopic->topicName, pReq->topic) != 0) {
L
Liu Jicong 已提交
232
      ASSERT(false);
L
Liu Jicong 已提交
233 234
      continue;
    }
L
Liu Jicong 已提交
235 236 237 238 239 240 241 242 243 244 245

    if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) {
      pTopic->committedOffset = pReq->offset;
      pMsg->pCont = NULL;
      pMsg->contLen = 0;
      pMsg->code = 0;
      rpcSendResponse(pMsg);
      return 0;
    }

    if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) {
L
Liu Jicong 已提交
246
      pTopic->committedOffset = pReq->offset - 1;
L
Liu Jicong 已提交
247 248
    }

L
Liu Jicong 已提交
249 250 251
    rsp.committedOffset = pTopic->committedOffset;
    rsp.reqOffset = pReq->offset;
    rsp.skipLogNum = 0;
L
Liu Jicong 已提交
252

L
Liu Jicong 已提交
253
    if (fetchOffset <= pTopic->committedOffset) {
L
Liu Jicong 已提交
254 255
      fetchOffset = pTopic->committedOffset + 1;
    }
256
    /*printf("vg %d fetch Offset %ld\n", pReq->head.vgId, fetchOffset);*/
L
Liu Jicong 已提交
257 258
    int8_t    pos;
    int8_t    skip = 0;
L
Liu Jicong 已提交
259
    SWalHead* pHead;
L
Liu Jicong 已提交
260
    while (1) {
L
Liu Jicong 已提交
261 262 263 264 265 266 267 268 269 270 271 272
      pos = fetchOffset % TQ_BUFFER_SIZE;
      skip = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1);
      if (skip == 1) {
        // do nothing
        break;
      }
      if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
        // check err
        atomic_store_8(&pTopic->buffer.output[pos].status, 0);
        skip = 1;
        break;
      }
L
Liu Jicong 已提交
273
      // read until find TDMT_VND_SUBMIT
L
Liu Jicong 已提交
274 275 276 277
      pHead = pTopic->pReadhandle->pHead;
      if (pHead->head.msgType == TDMT_VND_SUBMIT) {
        break;
      }
L
Liu Jicong 已提交
278
      rsp.skipLogNum++;
L
Liu Jicong 已提交
279
      if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
L
Liu Jicong 已提交
280 281 282
        atomic_store_8(&pTopic->buffer.output[pos].status, 0);
        skip = 1;
        break;
L
Liu Jicong 已提交
283
      }
L
Liu Jicong 已提交
284 285
      atomic_store_8(&pTopic->buffer.output[pos].status, 0);
      fetchOffset++;
L
Liu Jicong 已提交
286
    }
L
Liu Jicong 已提交
287
    if (skip == 1) continue;
L
Liu Jicong 已提交
288
    SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
L
Liu Jicong 已提交
289
    qTaskInfo_t task = pTopic->buffer.output[pos].task;
L
Liu Jicong 已提交
290

L
Liu Jicong 已提交
291
    qSetStreamInput(task, pCont);
L
Liu Jicong 已提交
292

L
Liu Jicong 已提交
293
    // SArray<SSDataBlock>
L
Liu Jicong 已提交
294 295 296 297 298 299 300 301 302 303 304 305 306
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    while (1) {
      SSDataBlock* pDataBlock;
      uint64_t     ts;
      if (qExecTask(task, &pDataBlock, &ts) < 0) {
        break;
      }
      if (pDataBlock != NULL) {
        taosArrayPush(pRes, pDataBlock);
      } else {
        break;
      }
    }
L
Liu Jicong 已提交
307
    // TODO copy
L
Liu Jicong 已提交
308
    rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
L
Liu Jicong 已提交
309
    rsp.rspOffset = fetchOffset;
L
Liu Jicong 已提交
310 311 312 313 314 315 316

    atomic_store_8(&pTopic->buffer.output[pos].status, 0);

    if (taosArrayGetSize(pRes) == 0) {
      taosArrayDestroy(pRes);
      fetchOffset++;
      continue;
L
Liu Jicong 已提交
317 318
    } else {
      rsp.numOfTopics++;
319
    }
L
Liu Jicong 已提交
320

L
Liu Jicong 已提交
321 322 323
    rsp.pBlockData = pRes;

#if 0
L
Liu Jicong 已提交
324 325
    pTopic->buffer.output[pos].dst = pRes;
    if (pTopic->buffer.firstOffset == -1 || pReq->offset < pTopic->buffer.firstOffset) {
L
Liu Jicong 已提交
326
      pTopic->buffer.firstOffset = pReq->offset;
L
Liu Jicong 已提交
327
    }
L
Liu Jicong 已提交
328
    if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) {
L
Liu Jicong 已提交
329
      pTopic->buffer.lastOffset = pReq->offset;
L
Liu Jicong 已提交
330
    }
L
Liu Jicong 已提交
331
#endif
L
Liu Jicong 已提交
332
  }
L
Liu Jicong 已提交
333 334 335 336 337 338 339 340
  int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp);
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    pMsg->code = -1;
    return -1;
  }
  void* abuf = buf;
  tEncodeSMqConsumeRsp(&abuf, &rsp);
L
Liu Jicong 已提交
341

L
Liu Jicong 已提交
342
  if (rsp.pBlockData) {
L
Liu Jicong 已提交
343
    taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);
L
Liu Jicong 已提交
344
    rsp.pBlockData = NULL;
L
Liu Jicong 已提交
345
  }
L
Liu Jicong 已提交
346

L
Liu Jicong 已提交
347 348 349 350
  pMsg->pCont = buf;
  pMsg->contLen = tlen;
  pMsg->code = 0;
  rpcSendResponse(pMsg);
L
Liu Jicong 已提交
351 352 353
  return 0;
}

L
Liu Jicong 已提交
354 355 356 357
int32_t tqProcessRebReq(STQ* pTq, char* msg) {
  SMqMVRebReq req = {0};
  tDecodeSMqMVRebReq(msg, &req);

L
Liu Jicong 已提交
358
  STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
L
Liu Jicong 已提交
359
  ASSERT(pConsumer);
L
Liu Jicong 已提交
360
  pConsumer->consumerId = req.newConsumerId;
L
Liu Jicong 已提交
361 362 363 364 365 366 367
  tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
  tqHandleCommit(pTq->tqMeta, req.newConsumerId);
  tqHandlePurge(pTq->tqMeta, req.oldConsumerId);
  terrno = TSDB_CODE_SUCCESS;
  return 0;
}

L
Liu Jicong 已提交
368
int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
L
Liu Jicong 已提交
369
  SMqSetCVgReq req = {0};
L
Liu Jicong 已提交
370
  tDecodeSMqSetCVgReq(msg, &req);
L
Liu Jicong 已提交
371

372
  /*printf("vg %d set to consumer from %ld to %ld\n", req.vgId, req.oldConsumerId, req.newConsumerId);*/
L
Liu Jicong 已提交
373
  STqConsumer* pConsumer = calloc(1, sizeof(STqConsumer));
L
Liu Jicong 已提交
374
  if (pConsumer == NULL) {
L
Liu Jicong 已提交
375 376
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
L
Liu Jicong 已提交
377
  }
L
Liu Jicong 已提交
378

L
Liu Jicong 已提交
379
  strcpy(pConsumer->cgroup, req.cgroup);
L
Liu Jicong 已提交
380
  pConsumer->topics = taosArrayInit(0, sizeof(STqTopic));
L
Liu Jicong 已提交
381
  pConsumer->consumerId = req.consumerId;
L
Liu Jicong 已提交
382
  pConsumer->epoch = 0;
L
Liu Jicong 已提交
383

L
Liu Jicong 已提交
384
  STqTopic* pTopic = calloc(1, sizeof(STqTopic));
L
Liu Jicong 已提交
385
  if (pTopic == NULL) {
L
Liu Jicong 已提交
386
    taosArrayDestroy(pConsumer->topics);
L
Liu Jicong 已提交
387 388 389
    free(pConsumer);
    return -1;
  }
L
Liu Jicong 已提交
390
  strcpy(pTopic->topicName, req.topicName);
L
Liu Jicong 已提交
391 392 393
  pTopic->sql = req.sql;
  pTopic->logicalPlan = req.logicalPlan;
  pTopic->physicalPlan = req.physicalPlan;
L
Liu Jicong 已提交
394
  pTopic->qmsg = req.qmsg;
L
Liu Jicong 已提交
395 396
  pTopic->committedOffset = -1;
  pTopic->currentOffset = -1;
397

L
Liu Jicong 已提交
398 399
  pTopic->buffer.firstOffset = -1;
  pTopic->buffer.lastOffset = -1;
L
Liu Jicong 已提交
400 401
  pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
  if (pTopic->pReadhandle == NULL) {
L
Liu Jicong 已提交
402
    ASSERT(false);
L
Liu Jicong 已提交
403
  }
L
Liu Jicong 已提交
404 405
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
    pTopic->buffer.output[i].status = 0;
L
Liu Jicong 已提交
406
    STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
L
Liu Jicong 已提交
407
    SReadHandle    handle = {.reader = pReadHandle, .meta = pTq->pMeta};
L
Liu Jicong 已提交
408
    pTopic->buffer.output[i].pReadHandle = pReadHandle;
L
Liu Jicong 已提交
409
    pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
L
Liu Jicong 已提交
410
  }
L
Liu Jicong 已提交
411
  taosArrayPush(pConsumer->topics, pTopic);
L
Liu Jicong 已提交
412 413
  tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer);
  tqHandleCommit(pTq->tqMeta, req.consumerId);
L
Liu Jicong 已提交
414
  terrno = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
415 416
  return 0;
}