tq.c 10.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "tqInt.h"
L
Liu Jicong 已提交
17
#include "tqMetaStore.h"
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19 20 21 22 23 24
//static
//read next version data
//
//send to fetch queue
//
//handle management message
L
Liu Jicong 已提交
25
//
L
Liu Jicong 已提交
26 27

int tqGetgHandleSSize(const TqGroupHandle *gHandle);
L
Liu Jicong 已提交
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
int tqBufHandleSSize();
int tqBufItemSSize();

TqGroupHandle* tqFindHandle(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  TqGroupHandle* gHandle;
  return NULL;
}

void* tqSerializeListHandle(TqListHandle* listHandle, void* ptr);
void* tqSerializeBufHandle(TqBufferHandle* bufHandle, void* ptr);
void* tqSerializeBufItem(TqBufferItem* bufItem, void* ptr);

const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle* bufHandle);
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem* bufItem);

H
refact  
Hongze Cheng 已提交
43
STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac) {
L
Liu Jicong 已提交
44 45 46 47 48 49 50 51
  STQ* pTq = malloc(sizeof(STQ));
  if(pTq == NULL) {
    //TODO: memory error
    return NULL;
  }
  strcpy(pTq->path, path);
  pTq->tqConfig = tqConfig;
  pTq->tqLogReader = tqLogReader;
H
more  
Hongze Cheng 已提交
52 53
  // pTq->tqMemRef.pAlloctorFactory = allocFac;
  // pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
L
Liu Jicong 已提交
54 55 56 57 58 59 60 61 62 63 64 65 66 67
  if(pTq->tqMemRef.pAllocator == NULL) {
    //TODO
  }
  pTq->tqMeta = tqStoreOpen(path,
                            (TqSerializeFun)tqSerializeGroupHandle,
                            (TqDeserializeFun)tqDeserializeGroupHandle,
                            free,
                            0);
  if(pTq->tqMeta == NULL) {
    //TODO: free STQ
    return NULL;
  }
  return pTq;
}
L
Liu Jicong 已提交
68

L
Liu Jicong 已提交
69
static int tqProtoCheck(TmqMsgHead *pMsg) {
L
Liu Jicong 已提交
70
  return pMsg->protoVer == 0;
L
Liu Jicong 已提交
71 72
}

L
Liu Jicong 已提交
73
static int tqAckOneTopic(TqBufferHandle *bHandle, TmqOneAck *pAck, TqQueryMsg** ppQuery) {
L
Liu Jicong 已提交
74 75 76
  //clean old item and move forward
  int32_t consumeOffset = pAck->consumeOffset;
  int idx = consumeOffset % TQ_BUFFER_SIZE;
L
Liu Jicong 已提交
77 78
  ASSERT(bHandle->buffer[idx].content && bHandle->buffer[idx].executor);
  tfree(bHandle->buffer[idx].content);
L
Liu Jicong 已提交
79
  if( 1 /* TODO: need to launch new query */) {
L
Liu Jicong 已提交
80
    TqQueryMsg* pNewQuery = malloc(sizeof(TqQueryMsg));
L
Liu Jicong 已提交
81 82 83 84 85
    if(pNewQuery == NULL) {
      //TODO: memory insufficient
      return -1;
    }
    //TODO: lock executor
L
Liu Jicong 已提交
86
    pNewQuery->exec->executor = bHandle->buffer[idx].executor;
L
Liu Jicong 已提交
87 88
    //TODO: read from wal and assign to src
    pNewQuery->exec->src = 0;
L
Liu Jicong 已提交
89
    pNewQuery->exec->dest = &bHandle->buffer[idx];
L
Liu Jicong 已提交
90 91 92
    pNewQuery->next = *ppQuery;
    *ppQuery = pNewQuery;
  }
L
Liu Jicong 已提交
93 94 95
  return 0;
}

L
Liu Jicong 已提交
96
static int tqAck(TqGroupHandle* gHandle, TmqAcks* pAcks) {
L
Liu Jicong 已提交
97
  int32_t ackNum = pAcks->ackNum;
L
Liu Jicong 已提交
98
  TmqOneAck *acks = pAcks->acks;
L
Liu Jicong 已提交
99 100
  //double ptr for acks and list
  int i = 0;
L
Liu Jicong 已提交
101
  TqListHandle* node = gHandle->head;
L
Liu Jicong 已提交
102
  int ackCnt = 0;
L
Liu Jicong 已提交
103
  TqQueryMsg *pQuery = NULL;
L
Liu Jicong 已提交
104
  while(i < ackNum && node->next) {
105
    if(acks[i].topicId == node->next->bufHandle.topicId) {
L
Liu Jicong 已提交
106
      ackCnt++;
107 108
      tqAckOneTopic(&node->next->bufHandle, &acks[i], &pQuery);
    } else if(acks[i].topicId < node->next->bufHandle.topicId) {
L
Liu Jicong 已提交
109 110 111
      i++;
    } else {
      node = node->next;
L
Liu Jicong 已提交
112 113
    }
  }
L
Liu Jicong 已提交
114 115 116 117 118
  if(pQuery) {
    //post message
  }
  return ackCnt;
}
L
Liu Jicong 已提交
119

L
Liu Jicong 已提交
120
static int tqCommitTCGroup(TqGroupHandle* handle) {
L
Liu Jicong 已提交
121
  //persist modification into disk
L
Liu Jicong 已提交
122 123 124
  return 0;
}

L
Liu Jicong 已提交
125
int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, TqGroupHandle** handle) {
L
Liu Jicong 已提交
126
  //create in disk
L
Liu Jicong 已提交
127 128 129 130 131 132 133
  TqGroupHandle* gHandle = (TqGroupHandle*)malloc(sizeof(TqGroupHandle));
  if(gHandle == NULL) {
    //TODO
    return -1;
  }
  memset(gHandle, 0, sizeof(TqGroupHandle));

L
Liu Jicong 已提交
134 135 136
  return 0;
}

L
Liu Jicong 已提交
137 138 139 140 141 142 143 144 145 146
TqGroupHandle* tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  TqGroupHandle* gHandle = tqHandleGet(pTq->tqMeta, cId);
  if(gHandle == NULL) {
    int code = tqCreateTCGroup(pTq, topicId, cgId, cId, &gHandle);
    if(code != 0) {
      //TODO
      return NULL;
    }
  }

L
Liu Jicong 已提交
147 148
  //create
  //open
L
Liu Jicong 已提交
149
  return gHandle;
L
Liu Jicong 已提交
150
}
L
Liu Jicong 已提交
151

L
Liu Jicong 已提交
152
int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
L
Liu Jicong 已提交
153 154 155
  return 0;
}

L
Liu Jicong 已提交
156 157
int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  //delete from disk
L
Liu Jicong 已提交
158 159 160
  return 0;
}

L
Liu Jicong 已提交
161 162
static int tqFetch(TqGroupHandle* gHandle, void** msg) {
  TqListHandle* head = gHandle->head;
L
Liu Jicong 已提交
163
  TqListHandle* node = head;
L
Liu Jicong 已提交
164 165 166
  int totSize = 0;
  //TODO: make it a macro
  int sizeLimit = 4 * 1024;
L
Liu Jicong 已提交
167
  TmqMsgContent* buffer = malloc(sizeLimit);
L
Liu Jicong 已提交
168 169 170 171 172 173 174 175
  if(buffer == NULL) {
    //TODO:memory insufficient
    return -1;
  }
  //iterate the list to get msgs of all topics
  //until all topic iterated or msgs over sizeLimit
  while(node->next) {
    node = node->next;
L
Liu Jicong 已提交
176
    TqBufferHandle* bufHandle = &node->bufHandle;
L
Liu Jicong 已提交
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
    int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE;
    if(bufHandle->buffer[idx].content != NULL &&
        bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset
        ) {
      totSize += bufHandle->buffer[idx].size;
      if(totSize > sizeLimit) {
        void *ptr = realloc(buffer, totSize);
        if(ptr == NULL) {
          totSize -= bufHandle->buffer[idx].size;
          //TODO:memory insufficient
          //return msgs already copied
          break;
        }
      }
      *((int64_t*)buffer) = bufHandle->topicId;
      buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
      *((int64_t*)buffer) = bufHandle->buffer[idx].size;
      buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
      memcpy(buffer, bufHandle->buffer[idx].content, bufHandle->buffer[idx].size);
      buffer = POINTER_SHIFT(buffer, bufHandle->buffer[idx].size);
      if(totSize > sizeLimit) {
        break;
      }
    }
  }
  return totSize;
L
Liu Jicong 已提交
203 204
}

L
Liu Jicong 已提交
205

L
Liu Jicong 已提交
206
TqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) {
L
Liu Jicong 已提交
207 208 209
  return NULL;
}

L
Liu Jicong 已提交
210
int tqLaunchQuery(TqGroupHandle* gHandle) {
L
Liu Jicong 已提交
211 212 213
  return 0;
}

L
Liu Jicong 已提交
214
int tqSendLaunchQuery(TqGroupHandle* gHandle) {
L
Liu Jicong 已提交
215 216 217
  return 0;
}

L
Liu Jicong 已提交
218
/*int tqMoveOffsetToNext(TqGroupHandle* gHandle) {*/
L
Liu Jicong 已提交
219 220 221
  /*return 0;*/
/*}*/

L
Liu Jicong 已提交
222
int tqPushMsg(STQ* pTq , void* p, int64_t version) {
L
Liu Jicong 已提交
223
  //add reference
L
Liu Jicong 已提交
224
  //judge and launch new query
L
Liu Jicong 已提交
225 226 227
  return 0;
}

L
Liu Jicong 已提交
228
int tqCommit(STQ* pTq) {
L
Liu Jicong 已提交
229 230 231 232
  //do nothing
  return 0;
}

L
Liu Jicong 已提交
233 234
int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) {
  if(!tqProtoCheck((TmqMsgHead *)pMsg)) {
L
Liu Jicong 已提交
235 236 237 238
    //proto version invalid
    return -1;
  }
  int64_t clientId = pMsg->head.clientId;
L
Liu Jicong 已提交
239 240
  TqGroupHandle *gHandle = tqGetGroupHandle(pTq, clientId);
  if(gHandle == NULL) {
L
Liu Jicong 已提交
241 242 243 244
    //client not connect
    return -1;
  }
  if(pMsg->acks.ackNum != 0) {
L
Liu Jicong 已提交
245
    if(tqAck(gHandle, &pMsg->acks) != 0) {
L
Liu Jicong 已提交
246 247 248 249 250
      //ack not success
      return -1;
    }
  }

L
Liu Jicong 已提交
251
  TmqConsumeRsp *pRsp = (TmqConsumeRsp*) pMsg;
L
Liu Jicong 已提交
252

L
Liu Jicong 已提交
253
  if(tqFetch(gHandle, (void**)&pRsp->msgs) <= 0) {
L
Liu Jicong 已提交
254 255 256 257
    //fetch error
    return -1;
  }

L
Liu Jicong 已提交
258
  //judge and launch new query
L
Liu Jicong 已提交
259
  if(tqLaunchQuery(gHandle)) {
L
Liu Jicong 已提交
260 261 262 263 264 265
    //launch query error
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
266
int tqSerializeGroupHandle(const TqGroupHandle *gHandle, TqSerializedHead** ppHead) {
L
Liu Jicong 已提交
267
  //calculate size
L
Liu Jicong 已提交
268 269 270 271 272 273 274 275 276 277
  int sz = tqGetgHandleSSize(gHandle) + sizeof(TqSerializedHead);
  if(sz > (*ppHead)->ssize) {
    void* tmpPtr = realloc(*ppHead, sz);
    if(tmpPtr == NULL) {
      free(*ppHead);
      //TODO: memory err
      return -1;
    }
    *ppHead = tmpPtr;
    (*ppHead)->ssize = sz;
L
Liu Jicong 已提交
278
  }
L
Liu Jicong 已提交
279
  void* ptr = (*ppHead)->content;
280
  //do serialization
L
Liu Jicong 已提交
281 282 283 284 285
  *(int64_t*)ptr = gHandle->cId;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  *(int64_t*)ptr = gHandle->cgId;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  *(int32_t*)ptr = gHandle->topicNum;
286
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
L
Liu Jicong 已提交
287
  if(gHandle->topicNum > 0) {
288
    tqSerializeListHandle(gHandle->head, ptr);
L
Liu Jicong 已提交
289 290 291 292
  }
  return 0;
}

L
Liu Jicong 已提交
293 294
void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr) {
  TqListHandle *node = listHandle;
295 296 297
  ASSERT(node != NULL);
  while(node) {
    ptr = tqSerializeBufHandle(&node->bufHandle, ptr);
L
Liu Jicong 已提交
298 299
    node = node->next;
  }
300
  return ptr;
L
Liu Jicong 已提交
301
}
302

L
Liu Jicong 已提交
303
void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr) {
L
Liu Jicong 已提交
304 305 306 307 308 309 310 311 312
  *(int64_t*)ptr = bufHandle->nextConsumeOffset;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  *(int64_t*)ptr = bufHandle->topicId;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  *(int32_t*)ptr = bufHandle->head;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  *(int32_t*)ptr = bufHandle->tail;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  for(int i = 0; i < TQ_BUFFER_SIZE; i++) {
313
    ptr = tqSerializeBufItem(&bufHandle->buffer[i], ptr);
L
Liu Jicong 已提交
314
  }
315
  return ptr;
L
Liu Jicong 已提交
316 317
}

L
Liu Jicong 已提交
318
void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr) {
L
Liu Jicong 已提交
319
  //TODO: do we need serialize this? 
320 321
  //mainly for executor
  return ptr;
L
Liu Jicong 已提交
322 323
}

L
Liu Jicong 已提交
324 325 326
const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandle **ppGHandle) {
  TqGroupHandle *gHandle = *ppGHandle;
  const void* ptr = pHead->content;
327 328 329 330 331 332 333 334
  gHandle->cId = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  gHandle->cgId = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  gHandle->ahandle = NULL;
  gHandle->topicNum = *(int32_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  gHandle->head = NULL;
L
Liu Jicong 已提交
335
  TqListHandle *node = gHandle->head;
336 337
  for(int i = 0; i < gHandle->topicNum; i++) {
    if(gHandle->head == NULL) {
L
Liu Jicong 已提交
338
      if((node = malloc(sizeof(TqListHandle))) == NULL) {
339 340 341 342 343 344 345
        //TODO: error
        return NULL;
      }
      node->next= NULL;
      ptr = tqDeserializeBufHandle(ptr, &node->bufHandle); 
      gHandle->head = node;
    } else {
L
Liu Jicong 已提交
346
      node->next = malloc(sizeof(TqListHandle));
347 348 349 350 351 352 353 354 355 356
      if(node->next == NULL) {
        //TODO: error
        return NULL;
      }
      node->next->next = NULL;
      ptr = tqDeserializeBufHandle(ptr, &node->next->bufHandle);
      node = node->next;
    }
  }
  return ptr;
L
Liu Jicong 已提交
357
}
358

L
Liu Jicong 已提交
359
const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle) {
360 361 362 363 364 365 366 367 368 369 370 371 372
  const void* ptr = pBytes;
  bufHandle->nextConsumeOffset = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  bufHandle->topicId = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  bufHandle->head = *(int32_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  bufHandle->tail = *(int32_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  for(int i = 0; i < TQ_BUFFER_SIZE; i++) {
    ptr = tqDeserializeBufItem(ptr, &bufHandle->buffer[i]);
  }
  return ptr;
L
Liu Jicong 已提交
373 374
}

L
Liu Jicong 已提交
375
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem) {
376 377
  return pBytes;
}
L
Liu Jicong 已提交
378

379
//TODO: make this a macro
L
Liu Jicong 已提交
380
int tqGetgHandleSSize(const TqGroupHandle *gHandle) {
L
Liu Jicong 已提交
381 382
  return sizeof(int64_t) * 2 //cId + cgId
    + sizeof(int32_t)        //topicNum
383
    + gHandle->topicNum * tqBufHandleSSize();
L
Liu Jicong 已提交
384
}
385 386 387

//TODO: make this a macro
int tqBufHandleSSize() {
L
Liu Jicong 已提交
388 389
  return sizeof(int64_t) * 2 // nextConsumeOffset + topicId
    + sizeof(int32_t) * 2    // head + tail
390
    + TQ_BUFFER_SIZE * tqBufItemSSize();
L
Liu Jicong 已提交
391
}
392 393 394 395

int tqBufItemSSize() {
  //TODO: do this need serialization?
  //mainly for executor
L
Liu Jicong 已提交
396 397
  return 0;
}