tq.c 10.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14
 */
L
Liu Jicong 已提交
15
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
16

L
Liu Jicong 已提交
17
#include "tcompare.h"
L
Liu Jicong 已提交
18
#include "tqInt.h"
L
Liu Jicong 已提交
19
#include "tqMetaStore.h"
S
Shengliang Guan 已提交
20

L
Liu Jicong 已提交
21 22
int tqInit() {
  int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
L
Liu Jicong 已提交
23
  if (old == 1) return 0;
L
Liu Jicong 已提交
24 25 26 27 28 29 30

  tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ");
  return 0;
}

void tqCleanUp() {
  int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0);
L
Liu Jicong 已提交
31
  if (old == 0) return;
L
Liu Jicong 已提交
32 33 34 35
  taosTmrStop(tqMgmt.timer);
  taosTmrCleanUp(tqMgmt.timer);
}

L
Liu Jicong 已提交
36
STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) {
L
Liu Jicong 已提交
37
  STQ* pTq = malloc(sizeof(STQ));
L
Liu Jicong 已提交
38
  if (pTq == NULL) {
L
Liu Jicong 已提交
39
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
L
Liu Jicong 已提交
40 41
    return NULL;
  }
H
Hongze Cheng 已提交
42
  pTq->path = strdup(path);
L
Liu Jicong 已提交
43
  pTq->tqConfig = tqConfig;
L
Liu Jicong 已提交
44 45
  pTq->pWal = pWal;
  pTq->pMeta = pMeta;
L
Liu Jicong 已提交
46
#if 0
L
Liu Jicong 已提交
47
  pTq->tqMemRef.pAllocatorFactory = allocFac;
L
Liu Jicong 已提交
48 49
  pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
  if (pTq->tqMemRef.pAllocator == NULL) {
L
Liu Jicong 已提交
50
    // TODO: error code of buffer pool
L
Liu Jicong 已提交
51
  }
L
Liu Jicong 已提交
52
#endif
L
Liu Jicong 已提交
53
  pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, free, 0);
L
Liu Jicong 已提交
54
  if (pTq->tqMeta == NULL) {
L
Liu Jicong 已提交
55
    free(pTq);
L
Liu Jicong 已提交
56 57 58
#if 0
    allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator);
#endif
L
Liu Jicong 已提交
59 60
    return NULL;
  }
L
Liu Jicong 已提交
61

L
Liu Jicong 已提交
62 63
  return pTq;
}
L
Liu Jicong 已提交
64

L
Liu Jicong 已提交
65
void tqClose(STQ* pTq) {
H
Hongze Cheng 已提交
66
  if (pTq) {
H
Hongze Cheng 已提交
67
    tfree(pTq->path);
H
Hongze Cheng 已提交
68 69
    free(pTq);
  }
L
Liu Jicong 已提交
70 71
  // TODO
}
L
Liu Jicong 已提交
72

L
Liu Jicong 已提交
73 74 75
int tqPushMsg(STQ* pTq, void* p, int64_t version) {
  // add reference
  // judge and launch new query
L
Liu Jicong 已提交
76 77 78
  return 0;
}

L
Liu Jicong 已提交
79
int tqCommit(STQ* pTq) {
L
Liu Jicong 已提交
80
  // do nothing
L
Liu Jicong 已提交
81 82 83
  return 0;
}

L
Liu Jicong 已提交
84 85
int tqSerializeConsumer(const STqConsumerHandle* pConsumer, STqSerializedHead** ppHead) {
  int32_t num = taosArrayGetSize(pConsumer->topics);
L
Liu Jicong 已提交
86 87
  int32_t sz = sizeof(STqSerializedHead) + sizeof(int64_t) * 2 + TSDB_TOPIC_FNAME_LEN +
               num * (sizeof(int64_t) + TSDB_TOPIC_FNAME_LEN);
L
Liu Jicong 已提交
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
  if (sz > (*ppHead)->ssize) {
    void* tmpPtr = realloc(*ppHead, sz);
    if (tmpPtr == NULL) {
      free(*ppHead);
      return -1;
    }
    *ppHead = tmpPtr;
    (*ppHead)->ssize = sz;
  }

  void* ptr = (*ppHead)->content;
  *(int64_t*)ptr = pConsumer->consumerId;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  *(int64_t*)ptr = pConsumer->epoch;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  memcpy(ptr, pConsumer->topics, TSDB_TOPIC_FNAME_LEN);
  ptr = POINTER_SHIFT(ptr, TSDB_TOPIC_FNAME_LEN);
  *(int32_t*)ptr = num;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  for (int32_t i = 0; i < num; i++) {
    STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
    memcpy(ptr, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
    ptr = POINTER_SHIFT(ptr, TSDB_TOPIC_FNAME_LEN);
    *(int64_t*)ptr = pTopic->committedOffset;
    POINTER_SHIFT(ptr, sizeof(int64_t));
  }
L
Liu Jicong 已提交
114

L
Liu Jicong 已提交
115 116 117 118 119
  return 0;
}

const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle** ppConsumer) {
  STqConsumerHandle* pConsumer = *ppConsumer;
L
Liu Jicong 已提交
120
  const void*        ptr = pHead->content;
L
Liu Jicong 已提交
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
  pConsumer->consumerId = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  pConsumer->epoch = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  memcpy(pConsumer->cgroup, ptr, TSDB_TOPIC_FNAME_LEN);
  ptr = POINTER_SHIFT(ptr, TSDB_TOPIC_FNAME_LEN);
  int32_t sz = *(int32_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  pConsumer->topics = taosArrayInit(sz, sizeof(STqTopicHandle));
  for (int32_t i = 0; i < sz; i++) {
    /*STqTopicHandle* topicHandle = */
    /*taosArrayPush(pConsumer->topics, );*/
  }
  return NULL;
}

S
Shengliang 已提交
137
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
L
Liu Jicong 已提交
138 139 140
  SMqConsumeReq* pReq = pMsg->pCont;
  int64_t        reqId = pReq->reqId;
  int64_t        consumerId = pReq->consumerId;
L
Liu Jicong 已提交
141
  int64_t        fetchOffset = pReq->offset;
L
Liu Jicong 已提交
142
  int64_t        blockingTime = pReq->blockingTime;
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144
  SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL};
L
Liu Jicong 已提交
145

146 147
  /*printf("vg %d get consume req\n", pReq->head.vgId);*/

L
Liu Jicong 已提交
148
  STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
L
Liu Jicong 已提交
149 150 151 152 153 154 155
  if (pConsumer == NULL) {
    pMsg->pCont = NULL;
    pMsg->contLen = 0;
    pMsg->code = -1;
    rpcSendResponse(pMsg);
    return 0;
  }
L
Liu Jicong 已提交
156
  int sz = taosArrayGetSize(pConsumer->topics);
L
Liu Jicong 已提交
157

L
Liu Jicong 已提交
158 159
  for (int i = 0; i < sz; i++) {
    STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
L
Liu Jicong 已提交
160
    // TODO: support multiple topic in one req
L
Liu Jicong 已提交
161
    if (strcmp(pTopic->topicName, pReq->topic) != 0) {
L
Liu Jicong 已提交
162
      /*ASSERT(false);*/
L
Liu Jicong 已提交
163 164
      continue;
    }
L
Liu Jicong 已提交
165 166 167 168 169 170 171 172 173 174 175

    if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) {
      pTopic->committedOffset = pReq->offset;
      pMsg->pCont = NULL;
      pMsg->contLen = 0;
      pMsg->code = 0;
      rpcSendResponse(pMsg);
      return 0;
    }

    if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) {
L
Liu Jicong 已提交
176
      pTopic->committedOffset = pReq->offset - 1;
L
Liu Jicong 已提交
177 178
    }

L
Liu Jicong 已提交
179 180 181
    rsp.committedOffset = pTopic->committedOffset;
    rsp.reqOffset = pReq->offset;
    rsp.skipLogNum = 0;
L
Liu Jicong 已提交
182

L
Liu Jicong 已提交
183
    if (fetchOffset <= pTopic->committedOffset) {
L
Liu Jicong 已提交
184 185
      fetchOffset = pTopic->committedOffset + 1;
    }
186
    /*printf("vg %d fetch Offset %ld\n", pReq->head.vgId, fetchOffset);*/
L
Liu Jicong 已提交
187 188
    int8_t    pos;
    int8_t    skip = 0;
L
Liu Jicong 已提交
189
    SWalHead* pHead;
L
Liu Jicong 已提交
190
    while (1) {
L
Liu Jicong 已提交
191 192 193 194 195 196 197 198 199 200 201 202
      pos = fetchOffset % TQ_BUFFER_SIZE;
      skip = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1);
      if (skip == 1) {
        // do nothing
        break;
      }
      if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
        // check err
        atomic_store_8(&pTopic->buffer.output[pos].status, 0);
        skip = 1;
        break;
      }
L
Liu Jicong 已提交
203
      // read until find TDMT_VND_SUBMIT
L
Liu Jicong 已提交
204 205 206 207
      pHead = pTopic->pReadhandle->pHead;
      if (pHead->head.msgType == TDMT_VND_SUBMIT) {
        break;
      }
L
Liu Jicong 已提交
208
      rsp.skipLogNum++;
L
Liu Jicong 已提交
209
      if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
L
Liu Jicong 已提交
210 211 212
        atomic_store_8(&pTopic->buffer.output[pos].status, 0);
        skip = 1;
        break;
L
Liu Jicong 已提交
213
      }
L
Liu Jicong 已提交
214 215
      atomic_store_8(&pTopic->buffer.output[pos].status, 0);
      fetchOffset++;
L
Liu Jicong 已提交
216
    }
L
Liu Jicong 已提交
217
    if (skip == 1) continue;
L
Liu Jicong 已提交
218
    SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
L
Liu Jicong 已提交
219
    qTaskInfo_t task = pTopic->buffer.output[pos].task;
L
Liu Jicong 已提交
220

L
Liu Jicong 已提交
221
    qSetStreamInput(task, pCont);
L
Liu Jicong 已提交
222

L
Liu Jicong 已提交
223
    // SArray<SSDataBlock>
L
Liu Jicong 已提交
224 225 226 227 228 229 230 231 232 233 234 235 236
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    while (1) {
      SSDataBlock* pDataBlock;
      uint64_t     ts;
      if (qExecTask(task, &pDataBlock, &ts) < 0) {
        break;
      }
      if (pDataBlock != NULL) {
        taosArrayPush(pRes, pDataBlock);
      } else {
        break;
      }
    }
L
Liu Jicong 已提交
237
    // TODO copy
L
Liu Jicong 已提交
238
    rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
L
Liu Jicong 已提交
239
    rsp.rspOffset = fetchOffset;
L
Liu Jicong 已提交
240 241 242 243 244 245 246

    atomic_store_8(&pTopic->buffer.output[pos].status, 0);

    if (taosArrayGetSize(pRes) == 0) {
      taosArrayDestroy(pRes);
      fetchOffset++;
      continue;
L
Liu Jicong 已提交
247 248
    } else {
      rsp.numOfTopics++;
249
    }
L
Liu Jicong 已提交
250

L
Liu Jicong 已提交
251 252 253
    rsp.pBlockData = pRes;

#if 0
L
Liu Jicong 已提交
254 255
    pTopic->buffer.output[pos].dst = pRes;
    if (pTopic->buffer.firstOffset == -1 || pReq->offset < pTopic->buffer.firstOffset) {
L
Liu Jicong 已提交
256
      pTopic->buffer.firstOffset = pReq->offset;
L
Liu Jicong 已提交
257
    }
L
Liu Jicong 已提交
258
    if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) {
L
Liu Jicong 已提交
259
      pTopic->buffer.lastOffset = pReq->offset;
L
Liu Jicong 已提交
260
    }
L
Liu Jicong 已提交
261
#endif
L
Liu Jicong 已提交
262
  }
L
Liu Jicong 已提交
263 264 265 266 267 268 269 270
  int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp);
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    pMsg->code = -1;
    return -1;
  }
  void* abuf = buf;
  tEncodeSMqConsumeRsp(&abuf, &rsp);
L
Liu Jicong 已提交
271

L
Liu Jicong 已提交
272
  if (rsp.pBlockData) {
L
Liu Jicong 已提交
273
    taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);
L
Liu Jicong 已提交
274
    rsp.pBlockData = NULL;
L
Liu Jicong 已提交
275
  }
L
Liu Jicong 已提交
276

L
Liu Jicong 已提交
277 278 279 280
  pMsg->pCont = buf;
  pMsg->contLen = tlen;
  pMsg->code = 0;
  rpcSendResponse(pMsg);
L
Liu Jicong 已提交
281 282 283
  return 0;
}

L
Liu Jicong 已提交
284 285 286 287 288 289 290 291 292 293 294 295 296
int32_t tqProcessRebReq(STQ* pTq, char* msg) {
  SMqMVRebReq req = {0};
  tDecodeSMqMVRebReq(msg, &req);

  STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
  ASSERT(pConsumer);
  tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
  tqHandleCommit(pTq->tqMeta, req.newConsumerId);
  tqHandlePurge(pTq->tqMeta, req.oldConsumerId);
  terrno = TSDB_CODE_SUCCESS;
  return 0;
}

L
Liu Jicong 已提交
297
int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
L
Liu Jicong 已提交
298
  SMqSetCVgReq req = {0};
L
Liu Jicong 已提交
299
  tDecodeSMqSetCVgReq(msg, &req);
L
Liu Jicong 已提交
300

301
  /*printf("vg %d set to consumer from %ld to %ld\n", req.vgId, req.oldConsumerId, req.newConsumerId);*/
L
Liu Jicong 已提交
302
  STqConsumerHandle* pConsumer = calloc(1, sizeof(STqConsumerHandle));
L
Liu Jicong 已提交
303
  if (pConsumer == NULL) {
L
Liu Jicong 已提交
304 305
    terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
    return -1;
L
Liu Jicong 已提交
306
  }
L
Liu Jicong 已提交
307

L
Liu Jicong 已提交
308 309
  strcpy(pConsumer->cgroup, req.cgroup);
  pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle));
L
Liu Jicong 已提交
310
  pConsumer->consumerId = req.consumerId;
L
Liu Jicong 已提交
311
  pConsumer->epoch = 0;
L
Liu Jicong 已提交
312

L
Liu Jicong 已提交
313
  STqTopicHandle* pTopic = calloc(1, sizeof(STqTopicHandle));
L
Liu Jicong 已提交
314 315 316 317
  if (pTopic == NULL) {
    free(pConsumer);
    return -1;
  }
L
Liu Jicong 已提交
318
  strcpy(pTopic->topicName, req.topicName);
L
Liu Jicong 已提交
319 320 321
  pTopic->sql = req.sql;
  pTopic->logicalPlan = req.logicalPlan;
  pTopic->physicalPlan = req.physicalPlan;
L
Liu Jicong 已提交
322
  pTopic->qmsg = req.qmsg;
L
Liu Jicong 已提交
323 324
  pTopic->committedOffset = -1;
  pTopic->currentOffset = -1;
325

L
Liu Jicong 已提交
326 327
  pTopic->buffer.firstOffset = -1;
  pTopic->buffer.lastOffset = -1;
L
Liu Jicong 已提交
328 329 330
  pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
  if (pTopic->pReadhandle == NULL) {
  }
L
Liu Jicong 已提交
331 332
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
    pTopic->buffer.output[i].status = 0;
L
Liu Jicong 已提交
333
    STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
L
Liu Jicong 已提交
334
    SReadHandle    handle = {.reader = pReadHandle, .meta = pTq->pMeta};
L
Liu Jicong 已提交
335
    pTopic->buffer.output[i].pReadHandle = pReadHandle;
L
Liu Jicong 已提交
336
    pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
L
Liu Jicong 已提交
337
  }
L
Liu Jicong 已提交
338
  taosArrayPush(pConsumer->topics, pTopic);
L
Liu Jicong 已提交
339 340
  tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer);
  tqHandleCommit(pTq->tqMeta, req.consumerId);
L
Liu Jicong 已提交
341
  terrno = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
342 343
  return 0;
}