tq.c 11.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "tqInt.h"
L
Liu Jicong 已提交
17
#include "tqMetaStore.h"
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19 20
// static
// read next version data
L
Liu Jicong 已提交
21
//
L
Liu Jicong 已提交
22
// send to fetch queue
L
Liu Jicong 已提交
23
//
L
Liu Jicong 已提交
24
// handle management message
L
Liu Jicong 已提交
25
//
L
Liu Jicong 已提交
26

L
Liu Jicong 已提交
27
int tqGetgHandleSSize(const STqGroupHandle* gHandle);
L
Liu Jicong 已提交
28 29 30
int tqBufHandleSSize();
int tqBufItemSSize();

L
Liu Jicong 已提交
31 32
STqGroupHandle* tqFindHandle(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  STqGroupHandle* gHandle;
L
Liu Jicong 已提交
33 34 35
  return NULL;
}

L
Liu Jicong 已提交
36 37 38
void* tqSerializeListHandle(STqListHandle* listHandle, void* ptr);
void* tqSerializeBufHandle(STqBufferHandle* bufHandle, void* ptr);
void* tqSerializeBufItem(STqBufferItem* bufItem, void* ptr);
L
Liu Jicong 已提交
39

L
Liu Jicong 已提交
40 41
const void* tqDeserializeBufHandle(const void* pBytes, STqBufferHandle* bufHandle);
const void* tqDeserializeBufItem(const void* pBytes, STqBufferItem* bufItem);
L
Liu Jicong 已提交
42

L
Liu Jicong 已提交
43
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac) {
L
Liu Jicong 已提交
44
  STQ* pTq = malloc(sizeof(STQ));
L
Liu Jicong 已提交
45 46
  if (pTq == NULL) {
    // TODO: memory error
L
Liu Jicong 已提交
47 48
    return NULL;
  }
H
Hongze Cheng 已提交
49
  pTq->path = strdup(path);
L
Liu Jicong 已提交
50 51
  pTq->tqConfig = tqConfig;
  pTq->tqLogReader = tqLogReader;
L
Liu Jicong 已提交
52 53 54 55
  pTq->tqMemRef.pAlloctorFactory = allocFac;
  pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
  if (pTq->tqMemRef.pAllocator == NULL) {
    // TODO
L
Liu Jicong 已提交
56
  }
L
Liu Jicong 已提交
57 58 59 60
  pTq->tqMeta =
      tqStoreOpen(path, (FTqSerialize)tqSerializeGroupHandle, (FTqDeserialize)tqDeserializeGroupHandle, free, 0);
  if (pTq->tqMeta == NULL) {
    // TODO: free STQ
L
Liu Jicong 已提交
61 62 63 64
    return NULL;
  }
  return pTq;
}
L
Liu Jicong 已提交
65

L
Liu Jicong 已提交
66
static int tqProtoCheck(TmqMsgHead* pMsg) { return pMsg->protoVer == 0; }
L
Liu Jicong 已提交
67

L
Liu Jicong 已提交
68 69
static int tqAckOneTopic(STqBufferHandle* bHandle, TmqOneAck* pAck, STqQueryMsg** ppQuery) {
  // clean old item and move forward
L
Liu Jicong 已提交
70
  int32_t consumeOffset = pAck->consumeOffset;
L
Liu Jicong 已提交
71
  int     idx = consumeOffset % TQ_BUFFER_SIZE;
L
Liu Jicong 已提交
72 73
  ASSERT(bHandle->buffer[idx].content && bHandle->buffer[idx].executor);
  tfree(bHandle->buffer[idx].content);
L
Liu Jicong 已提交
74 75 76 77
  if (1 /* TODO: need to launch new query */) {
    STqQueryMsg* pNewQuery = malloc(sizeof(STqQueryMsg));
    if (pNewQuery == NULL) {
      // TODO: memory insufficient
L
Liu Jicong 已提交
78 79
      return -1;
    }
L
Liu Jicong 已提交
80
    // TODO: lock executor
L
Liu Jicong 已提交
81
    pNewQuery->exec->executor = bHandle->buffer[idx].executor;
L
Liu Jicong 已提交
82
    // TODO: read from wal and assign to src
L
Liu Jicong 已提交
83
    pNewQuery->exec->src = 0;
L
Liu Jicong 已提交
84
    pNewQuery->exec->dest = &bHandle->buffer[idx];
L
Liu Jicong 已提交
85 86 87
    pNewQuery->next = *ppQuery;
    *ppQuery = pNewQuery;
  }
L
Liu Jicong 已提交
88 89 90
  return 0;
}

L
Liu Jicong 已提交
91 92 93 94 95 96 97 98 99 100
static int tqAck(STqGroupHandle* gHandle, TmqAcks* pAcks) {
  int32_t    ackNum = pAcks->ackNum;
  TmqOneAck* acks = pAcks->acks;
  // double ptr for acks and list
  int            i = 0;
  STqListHandle* node = gHandle->head;
  int            ackCnt = 0;
  STqQueryMsg*   pQuery = NULL;
  while (i < ackNum && node->next) {
    if (acks[i].topicId == node->next->bufHandle.topicId) {
L
Liu Jicong 已提交
101
      ackCnt++;
102
      tqAckOneTopic(&node->next->bufHandle, &acks[i], &pQuery);
L
Liu Jicong 已提交
103
    } else if (acks[i].topicId < node->next->bufHandle.topicId) {
L
Liu Jicong 已提交
104 105 106
      i++;
    } else {
      node = node->next;
L
Liu Jicong 已提交
107 108
    }
  }
L
Liu Jicong 已提交
109 110
  if (pQuery) {
    // post message
L
Liu Jicong 已提交
111 112 113
  }
  return ackCnt;
}
L
Liu Jicong 已提交
114

L
Liu Jicong 已提交
115 116
static int tqCommitTCGroup(STqGroupHandle* handle) {
  // persist modification into disk
L
Liu Jicong 已提交
117 118 119
  return 0;
}

L
Liu Jicong 已提交
120 121 122 123 124
int tqCreateTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroupHandle** handle) {
  // create in disk
  STqGroupHandle* gHandle = (STqGroupHandle*)malloc(sizeof(STqGroupHandle));
  if (gHandle == NULL) {
    // TODO
L
Liu Jicong 已提交
125 126
    return -1;
  }
L
Liu Jicong 已提交
127
  memset(gHandle, 0, sizeof(STqGroupHandle));
L
Liu Jicong 已提交
128

L
Liu Jicong 已提交
129 130 131
  return 0;
}

L
Liu Jicong 已提交
132 133 134
STqGroupHandle* tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  STqGroupHandle* gHandle = tqHandleGet(pTq->tqMeta, cId);
  if (gHandle == NULL) {
L
Liu Jicong 已提交
135
    int code = tqCreateTCGroup(pTq, topicId, cgId, cId, &gHandle);
L
Liu Jicong 已提交
136 137
    if (code != 0) {
      // TODO
L
Liu Jicong 已提交
138 139 140 141
      return NULL;
    }
  }

L
Liu Jicong 已提交
142 143
  // create
  // open
L
Liu Jicong 已提交
144
  return gHandle;
L
Liu Jicong 已提交
145
}
L
Liu Jicong 已提交
146

L
Liu Jicong 已提交
147
int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { return 0; }
L
Liu Jicong 已提交
148

L
Liu Jicong 已提交
149
int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
L
Liu Jicong 已提交
150
  // delete from disk
L
Liu Jicong 已提交
151 152 153
  return 0;
}

L
Liu Jicong 已提交
154 155 156 157 158 159
static int tqFetch(STqGroupHandle* gHandle, void** msg) {
  STqListHandle* head = gHandle->head;
  STqListHandle* node = head;
  int            totSize = 0;
  // TODO: make it a macro
  int            sizeLimit = 4 * 1024;
L
Liu Jicong 已提交
160
  TmqMsgContent* buffer = malloc(sizeLimit);
L
Liu Jicong 已提交
161 162
  if (buffer == NULL) {
    // TODO:memory insufficient
L
Liu Jicong 已提交
163 164
    return -1;
  }
L
Liu Jicong 已提交
165 166 167
  // iterate the list to get msgs of all topics
  // until all topic iterated or msgs over sizeLimit
  while (node->next) {
L
Liu Jicong 已提交
168
    node = node->next;
L
Liu Jicong 已提交
169 170 171
    STqBufferHandle* bufHandle = &node->bufHandle;
    int              idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE;
    if (bufHandle->buffer[idx].content != NULL && bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset) {
L
Liu Jicong 已提交
172
      totSize += bufHandle->buffer[idx].size;
L
Liu Jicong 已提交
173 174 175
      if (totSize > sizeLimit) {
        void* ptr = realloc(buffer, totSize);
        if (ptr == NULL) {
L
Liu Jicong 已提交
176
          totSize -= bufHandle->buffer[idx].size;
L
Liu Jicong 已提交
177 178
          // TODO:memory insufficient
          // return msgs already copied
L
Liu Jicong 已提交
179 180 181 182 183 184 185 186 187
          break;
        }
      }
      *((int64_t*)buffer) = bufHandle->topicId;
      buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
      *((int64_t*)buffer) = bufHandle->buffer[idx].size;
      buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
      memcpy(buffer, bufHandle->buffer[idx].content, bufHandle->buffer[idx].size);
      buffer = POINTER_SHIFT(buffer, bufHandle->buffer[idx].size);
L
Liu Jicong 已提交
188
      if (totSize > sizeLimit) {
L
Liu Jicong 已提交
189 190 191 192 193
        break;
      }
    }
  }
  return totSize;
L
Liu Jicong 已提交
194 195
}

L
Liu Jicong 已提交
196
STqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) { return NULL; }
L
Liu Jicong 已提交
197

L
Liu Jicong 已提交
198
int tqLaunchQuery(STqGroupHandle* gHandle) { return 0; }
L
Liu Jicong 已提交
199

L
Liu Jicong 已提交
200
int tqSendLaunchQuery(STqGroupHandle* gHandle) { return 0; }
L
Liu Jicong 已提交
201

L
Liu Jicong 已提交
202
/*int tqMoveOffsetToNext(TqGroupHandle* gHandle) {*/
L
Liu Jicong 已提交
203
/*return 0;*/
L
Liu Jicong 已提交
204 205
/*}*/

L
Liu Jicong 已提交
206 207 208
int tqPushMsg(STQ* pTq, void* p, int64_t version) {
  // add reference
  // judge and launch new query
L
Liu Jicong 已提交
209 210 211
  return 0;
}

L
Liu Jicong 已提交
212
int tqCommit(STQ* pTq) {
L
Liu Jicong 已提交
213
  // do nothing
L
Liu Jicong 已提交
214 215 216
  return 0;
}

L
Liu Jicong 已提交
217
int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) {
L
Liu Jicong 已提交
218 219
  if (!tqProtoCheck((TmqMsgHead*)pMsg)) {
    // proto version invalid
L
Liu Jicong 已提交
220 221
    return -1;
  }
L
Liu Jicong 已提交
222 223 224 225
  int64_t         clientId = pMsg->head.clientId;
  STqGroupHandle* gHandle = tqGetGroupHandle(pTq, clientId);
  if (gHandle == NULL) {
    // client not connect
L
Liu Jicong 已提交
226 227
    return -1;
  }
L
Liu Jicong 已提交
228 229 230
  if (pMsg->acks.ackNum != 0) {
    if (tqAck(gHandle, &pMsg->acks) != 0) {
      // ack not success
L
Liu Jicong 已提交
231 232 233 234
      return -1;
    }
  }

L
Liu Jicong 已提交
235
  TmqConsumeRsp* pRsp = (TmqConsumeRsp*)pMsg;
L
Liu Jicong 已提交
236

L
Liu Jicong 已提交
237 238
  if (tqFetch(gHandle, (void**)&pRsp->msgs) <= 0) {
    // fetch error
L
Liu Jicong 已提交
239 240 241
    return -1;
  }

L
Liu Jicong 已提交
242 243 244
  // judge and launch new query
  if (tqLaunchQuery(gHandle)) {
    // launch query error
L
Liu Jicong 已提交
245 246 247 248 249
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
250 251 252 253
int tqSerializeGroupHandle(const STqGroupHandle* gHandle, STqSerializedHead** ppHead) {
  // calculate size
  int sz = tqGetgHandleSSize(gHandle) + sizeof(STqSerializedHead);
  if (sz > (*ppHead)->ssize) {
L
Liu Jicong 已提交
254
    void* tmpPtr = realloc(*ppHead, sz);
L
Liu Jicong 已提交
255
    if (tmpPtr == NULL) {
L
Liu Jicong 已提交
256
      free(*ppHead);
L
Liu Jicong 已提交
257
      // TODO: memory err
L
Liu Jicong 已提交
258 259 260 261
      return -1;
    }
    *ppHead = tmpPtr;
    (*ppHead)->ssize = sz;
L
Liu Jicong 已提交
262
  }
L
Liu Jicong 已提交
263
  void* ptr = (*ppHead)->content;
L
Liu Jicong 已提交
264
  // do serialization
L
Liu Jicong 已提交
265 266 267 268 269
  *(int64_t*)ptr = gHandle->cId;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  *(int64_t*)ptr = gHandle->cgId;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  *(int32_t*)ptr = gHandle->topicNum;
270
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
L
Liu Jicong 已提交
271
  if (gHandle->topicNum > 0) {
272
    tqSerializeListHandle(gHandle->head, ptr);
L
Liu Jicong 已提交
273 274 275 276
  }
  return 0;
}

L
Liu Jicong 已提交
277 278
void* tqSerializeListHandle(STqListHandle* listHandle, void* ptr) {
  STqListHandle* node = listHandle;
279
  ASSERT(node != NULL);
L
Liu Jicong 已提交
280
  while (node) {
281
    ptr = tqSerializeBufHandle(&node->bufHandle, ptr);
L
Liu Jicong 已提交
282 283
    node = node->next;
  }
284
  return ptr;
L
Liu Jicong 已提交
285
}
286

L
Liu Jicong 已提交
287
void* tqSerializeBufHandle(STqBufferHandle* bufHandle, void* ptr) {
L
Liu Jicong 已提交
288 289 290 291 292 293 294 295
  *(int64_t*)ptr = bufHandle->nextConsumeOffset;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  *(int64_t*)ptr = bufHandle->topicId;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  *(int32_t*)ptr = bufHandle->head;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  *(int32_t*)ptr = bufHandle->tail;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
L
Liu Jicong 已提交
296
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
297
    ptr = tqSerializeBufItem(&bufHandle->buffer[i], ptr);
L
Liu Jicong 已提交
298
  }
299
  return ptr;
L
Liu Jicong 已提交
300 301
}

L
Liu Jicong 已提交
302 303 304
void* tqSerializeBufItem(STqBufferItem* bufItem, void* ptr) {
  // TODO: do we need serialize this?
  // mainly for executor
305
  return ptr;
L
Liu Jicong 已提交
306 307
}

L
Liu Jicong 已提交
308 309 310
const void* tqDeserializeGroupHandle(const STqSerializedHead* pHead, STqGroupHandle** ppGHandle) {
  STqGroupHandle* gHandle = *ppGHandle;
  const void*     ptr = pHead->content;
311 312 313 314 315 316 317 318
  gHandle->cId = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  gHandle->cgId = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  gHandle->ahandle = NULL;
  gHandle->topicNum = *(int32_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  gHandle->head = NULL;
L
Liu Jicong 已提交
319 320 321 322 323
  STqListHandle* node = gHandle->head;
  for (int i = 0; i < gHandle->topicNum; i++) {
    if (gHandle->head == NULL) {
      if ((node = malloc(sizeof(STqListHandle))) == NULL) {
        // TODO: error
324 325
        return NULL;
      }
L
Liu Jicong 已提交
326 327
      node->next = NULL;
      ptr = tqDeserializeBufHandle(ptr, &node->bufHandle);
328 329
      gHandle->head = node;
    } else {
L
Liu Jicong 已提交
330 331 332
      node->next = malloc(sizeof(STqListHandle));
      if (node->next == NULL) {
        // TODO: error
333 334 335 336 337 338 339 340
        return NULL;
      }
      node->next->next = NULL;
      ptr = tqDeserializeBufHandle(ptr, &node->next->bufHandle);
      node = node->next;
    }
  }
  return ptr;
L
Liu Jicong 已提交
341
}
342

L
Liu Jicong 已提交
343
const void* tqDeserializeBufHandle(const void* pBytes, STqBufferHandle* bufHandle) {
344 345 346 347 348 349 350 351 352
  const void* ptr = pBytes;
  bufHandle->nextConsumeOffset = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  bufHandle->topicId = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  bufHandle->head = *(int32_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  bufHandle->tail = *(int32_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
L
Liu Jicong 已提交
353
  for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
354 355 356
    ptr = tqDeserializeBufItem(ptr, &bufHandle->buffer[i]);
  }
  return ptr;
L
Liu Jicong 已提交
357 358
}

L
Liu Jicong 已提交
359
const void* tqDeserializeBufItem(const void* pBytes, STqBufferItem* bufItem) { return pBytes; }
L
Liu Jicong 已提交
360

L
Liu Jicong 已提交
361 362 363 364 365
// TODO: make this a macro
int tqGetgHandleSSize(const STqGroupHandle* gHandle) {
  return sizeof(int64_t) * 2  // cId + cgId
         + sizeof(int32_t)    // topicNum
         + gHandle->topicNum * tqBufHandleSSize();
L
Liu Jicong 已提交
366
}
367

L
Liu Jicong 已提交
368
// TODO: make this a macro
369
int tqBufHandleSSize() {
L
Liu Jicong 已提交
370 371 372
  return sizeof(int64_t) * 2    // nextConsumeOffset + topicId
         + sizeof(int32_t) * 2  // head + tail
         + TQ_BUFFER_SIZE * tqBufItemSSize();
L
Liu Jicong 已提交
373
}
374 375

int tqBufItemSSize() {
L
Liu Jicong 已提交
376 377
  // TODO: do this need serialization?
  // mainly for executor
L
Liu Jicong 已提交
378 379
  return 0;
}