tq.c 10.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "tqInt.h"
L
Liu Jicong 已提交
17
#include "tqMetaStore.h"
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19 20 21 22 23 24
//static
//read next version data
//
//send to fetch queue
//
//handle management message
L
Liu Jicong 已提交
25
//
L
Liu Jicong 已提交
26 27

int tqGetgHandleSSize(const TqGroupHandle *gHandle);
L
Liu Jicong 已提交
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
int tqBufHandleSSize();
int tqBufItemSSize();

TqGroupHandle* tqFindHandle(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  TqGroupHandle* gHandle;
  return NULL;
}

void* tqSerializeListHandle(TqListHandle* listHandle, void* ptr);
void* tqSerializeBufHandle(TqBufferHandle* bufHandle, void* ptr);
void* tqSerializeBufItem(TqBufferItem* bufItem, void* ptr);

const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle* bufHandle);
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem* bufItem);

H
refact  
Hongze Cheng 已提交
43
STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac) {
L
Liu Jicong 已提交
44 45 46 47 48
  STQ* pTq = malloc(sizeof(STQ));
  if(pTq == NULL) {
    //TODO: memory error
    return NULL;
  }
H
Hongze Cheng 已提交
49
  pTq->path = strdup(path);
L
Liu Jicong 已提交
50 51
  pTq->tqConfig = tqConfig;
  pTq->tqLogReader = tqLogReader;
L
Liu Jicong 已提交
52
   pTq->tqMemRef.pAlloctorFactory = allocFac;
H
Hongze Cheng 已提交
53
  //  pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
L
Liu Jicong 已提交
54 55 56 57 58 59 60 61 62 63 64 65 66 67
  if(pTq->tqMemRef.pAllocator == NULL) {
    //TODO
  }
  pTq->tqMeta = tqStoreOpen(path,
                            (TqSerializeFun)tqSerializeGroupHandle,
                            (TqDeserializeFun)tqDeserializeGroupHandle,
                            free,
                            0);
  if(pTq->tqMeta == NULL) {
    //TODO: free STQ
    return NULL;
  }
  return pTq;
}
L
Liu Jicong 已提交
68

L
Liu Jicong 已提交
69
static int tqProtoCheck(TmqMsgHead *pMsg) {
L
Liu Jicong 已提交
70
  return pMsg->protoVer == 0;
L
Liu Jicong 已提交
71 72
}

L
Liu Jicong 已提交
73
static int tqAckOneTopic(TqBufferHandle *bHandle, TmqOneAck *pAck, TqQueryMsg** ppQuery) {
L
Liu Jicong 已提交
74 75 76
  //clean old item and move forward
  int32_t consumeOffset = pAck->consumeOffset;
  int idx = consumeOffset % TQ_BUFFER_SIZE;
L
Liu Jicong 已提交
77 78
  ASSERT(bHandle->buffer[idx].content && bHandle->buffer[idx].executor);
  tfree(bHandle->buffer[idx].content);
L
Liu Jicong 已提交
79
  if( 1 /* TODO: need to launch new query */) {
L
Liu Jicong 已提交
80
    TqQueryMsg* pNewQuery = malloc(sizeof(TqQueryMsg));
L
Liu Jicong 已提交
81 82 83 84 85
    if(pNewQuery == NULL) {
      //TODO: memory insufficient
      return -1;
    }
    //TODO: lock executor
L
Liu Jicong 已提交
86
    pNewQuery->exec->executor = bHandle->buffer[idx].executor;
L
Liu Jicong 已提交
87 88
    //TODO: read from wal and assign to src
    pNewQuery->exec->src = 0;
L
Liu Jicong 已提交
89
    pNewQuery->exec->dest = &bHandle->buffer[idx];
L
Liu Jicong 已提交
90 91 92
    pNewQuery->next = *ppQuery;
    *ppQuery = pNewQuery;
  }
L
Liu Jicong 已提交
93 94 95
  return 0;
}

L
Liu Jicong 已提交
96
static int tqAck(TqGroupHandle* gHandle, TmqAcks* pAcks) {
L
Liu Jicong 已提交
97
  int32_t ackNum = pAcks->ackNum;
L
Liu Jicong 已提交
98
  TmqOneAck *acks = pAcks->acks;
L
Liu Jicong 已提交
99 100
  //double ptr for acks and list
  int i = 0;
L
Liu Jicong 已提交
101
  TqListHandle* node = gHandle->head;
L
Liu Jicong 已提交
102
  int ackCnt = 0;
L
Liu Jicong 已提交
103
  TqQueryMsg *pQuery = NULL;
L
Liu Jicong 已提交
104
  while(i < ackNum && node->next) {
105
    if(acks[i].topicId == node->next->bufHandle.topicId) {
L
Liu Jicong 已提交
106
      ackCnt++;
107 108
      tqAckOneTopic(&node->next->bufHandle, &acks[i], &pQuery);
    } else if(acks[i].topicId < node->next->bufHandle.topicId) {
L
Liu Jicong 已提交
109 110 111
      i++;
    } else {
      node = node->next;
L
Liu Jicong 已提交
112 113
    }
  }
L
Liu Jicong 已提交
114 115 116 117 118
  if(pQuery) {
    //post message
  }
  return ackCnt;
}
L
Liu Jicong 已提交
119

L
Liu Jicong 已提交
120
static int tqCommitTCGroup(TqGroupHandle* handle) {
L
Liu Jicong 已提交
121
  //persist modification into disk
L
Liu Jicong 已提交
122 123 124
  return 0;
}

L
Liu Jicong 已提交
125
int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, TqGroupHandle** handle) {
L
Liu Jicong 已提交
126
  //create in disk
L
Liu Jicong 已提交
127 128 129 130 131 132 133
  TqGroupHandle* gHandle = (TqGroupHandle*)malloc(sizeof(TqGroupHandle));
  if(gHandle == NULL) {
    //TODO
    return -1;
  }
  memset(gHandle, 0, sizeof(TqGroupHandle));

L
Liu Jicong 已提交
134 135 136
  return 0;
}

L
Liu Jicong 已提交
137 138 139 140 141 142 143 144 145 146
TqGroupHandle* tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  TqGroupHandle* gHandle = tqHandleGet(pTq->tqMeta, cId);
  if(gHandle == NULL) {
    int code = tqCreateTCGroup(pTq, topicId, cgId, cId, &gHandle);
    if(code != 0) {
      //TODO
      return NULL;
    }
  }

L
Liu Jicong 已提交
147 148
  //create
  //open
L
Liu Jicong 已提交
149
  return gHandle;
L
Liu Jicong 已提交
150
}
L
Liu Jicong 已提交
151

L
Liu Jicong 已提交
152
int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
L
Liu Jicong 已提交
153 154 155
  return 0;
}

L
Liu Jicong 已提交
156 157
int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  //delete from disk
L
Liu Jicong 已提交
158 159 160
  return 0;
}

L
Liu Jicong 已提交
161 162
static int tqFetch(TqGroupHandle* gHandle, void** msg) {
  TqListHandle* head = gHandle->head;
L
Liu Jicong 已提交
163
  TqListHandle* node = head;
L
Liu Jicong 已提交
164 165 166
  int totSize = 0;
  //TODO: make it a macro
  int sizeLimit = 4 * 1024;
L
Liu Jicong 已提交
167
  TmqMsgContent* buffer = malloc(sizeLimit);
L
Liu Jicong 已提交
168 169 170 171 172 173 174 175
  if(buffer == NULL) {
    //TODO:memory insufficient
    return -1;
  }
  //iterate the list to get msgs of all topics
  //until all topic iterated or msgs over sizeLimit
  while(node->next) {
    node = node->next;
L
Liu Jicong 已提交
176
    TqBufferHandle* bufHandle = &node->bufHandle;
L
Liu Jicong 已提交
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
    int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE;
    if(bufHandle->buffer[idx].content != NULL &&
        bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset
        ) {
      totSize += bufHandle->buffer[idx].size;
      if(totSize > sizeLimit) {
        void *ptr = realloc(buffer, totSize);
        if(ptr == NULL) {
          totSize -= bufHandle->buffer[idx].size;
          //TODO:memory insufficient
          //return msgs already copied
          break;
        }
      }
      *((int64_t*)buffer) = bufHandle->topicId;
      buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
      *((int64_t*)buffer) = bufHandle->buffer[idx].size;
      buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
      memcpy(buffer, bufHandle->buffer[idx].content, bufHandle->buffer[idx].size);
      buffer = POINTER_SHIFT(buffer, bufHandle->buffer[idx].size);
      if(totSize > sizeLimit) {
        break;
      }
    }
  }
  return totSize;
L
Liu Jicong 已提交
203 204
}

L
Liu Jicong 已提交
205
TqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) {
L
Liu Jicong 已提交
206 207 208
  return NULL;
}

L
Liu Jicong 已提交
209
int tqLaunchQuery(TqGroupHandle* gHandle) {
L
Liu Jicong 已提交
210 211 212
  return 0;
}

L
Liu Jicong 已提交
213
int tqSendLaunchQuery(TqGroupHandle* gHandle) {
L
Liu Jicong 已提交
214 215 216
  return 0;
}

L
Liu Jicong 已提交
217
/*int tqMoveOffsetToNext(TqGroupHandle* gHandle) {*/
L
Liu Jicong 已提交
218 219 220
  /*return 0;*/
/*}*/

L
Liu Jicong 已提交
221
int tqPushMsg(STQ* pTq , void* p, int64_t version) {
L
Liu Jicong 已提交
222
  //add reference
L
Liu Jicong 已提交
223
  //judge and launch new query
L
Liu Jicong 已提交
224 225 226
  return 0;
}

L
Liu Jicong 已提交
227
int tqCommit(STQ* pTq) {
L
Liu Jicong 已提交
228 229 230 231
  //do nothing
  return 0;
}

L
Liu Jicong 已提交
232 233
int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) {
  if(!tqProtoCheck((TmqMsgHead *)pMsg)) {
L
Liu Jicong 已提交
234 235 236 237
    //proto version invalid
    return -1;
  }
  int64_t clientId = pMsg->head.clientId;
L
Liu Jicong 已提交
238 239
  TqGroupHandle *gHandle = tqGetGroupHandle(pTq, clientId);
  if(gHandle == NULL) {
L
Liu Jicong 已提交
240 241 242 243
    //client not connect
    return -1;
  }
  if(pMsg->acks.ackNum != 0) {
L
Liu Jicong 已提交
244
    if(tqAck(gHandle, &pMsg->acks) != 0) {
L
Liu Jicong 已提交
245 246 247 248 249
      //ack not success
      return -1;
    }
  }

L
Liu Jicong 已提交
250
  TmqConsumeRsp *pRsp = (TmqConsumeRsp*) pMsg;
L
Liu Jicong 已提交
251

L
Liu Jicong 已提交
252
  if(tqFetch(gHandle, (void**)&pRsp->msgs) <= 0) {
L
Liu Jicong 已提交
253 254 255 256
    //fetch error
    return -1;
  }

L
Liu Jicong 已提交
257
  //judge and launch new query
L
Liu Jicong 已提交
258
  if(tqLaunchQuery(gHandle)) {
L
Liu Jicong 已提交
259 260 261 262 263 264
    //launch query error
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
265
int tqSerializeGroupHandle(const TqGroupHandle *gHandle, TqSerializedHead** ppHead) {
L
Liu Jicong 已提交
266
  //calculate size
L
Liu Jicong 已提交
267 268 269 270 271 272 273 274 275 276
  int sz = tqGetgHandleSSize(gHandle) + sizeof(TqSerializedHead);
  if(sz > (*ppHead)->ssize) {
    void* tmpPtr = realloc(*ppHead, sz);
    if(tmpPtr == NULL) {
      free(*ppHead);
      //TODO: memory err
      return -1;
    }
    *ppHead = tmpPtr;
    (*ppHead)->ssize = sz;
L
Liu Jicong 已提交
277
  }
L
Liu Jicong 已提交
278
  void* ptr = (*ppHead)->content;
279
  //do serialization
L
Liu Jicong 已提交
280 281 282 283 284
  *(int64_t*)ptr = gHandle->cId;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  *(int64_t*)ptr = gHandle->cgId;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  *(int32_t*)ptr = gHandle->topicNum;
285
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
L
Liu Jicong 已提交
286
  if(gHandle->topicNum > 0) {
287
    tqSerializeListHandle(gHandle->head, ptr);
L
Liu Jicong 已提交
288 289 290 291
  }
  return 0;
}

L
Liu Jicong 已提交
292 293
void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr) {
  TqListHandle *node = listHandle;
294 295 296
  ASSERT(node != NULL);
  while(node) {
    ptr = tqSerializeBufHandle(&node->bufHandle, ptr);
L
Liu Jicong 已提交
297 298
    node = node->next;
  }
299
  return ptr;
L
Liu Jicong 已提交
300
}
301

L
Liu Jicong 已提交
302
void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr) {
L
Liu Jicong 已提交
303 304 305 306 307 308 309 310 311
  *(int64_t*)ptr = bufHandle->nextConsumeOffset;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  *(int64_t*)ptr = bufHandle->topicId;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  *(int32_t*)ptr = bufHandle->head;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  *(int32_t*)ptr = bufHandle->tail;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  for(int i = 0; i < TQ_BUFFER_SIZE; i++) {
312
    ptr = tqSerializeBufItem(&bufHandle->buffer[i], ptr);
L
Liu Jicong 已提交
313
  }
314
  return ptr;
L
Liu Jicong 已提交
315 316
}

L
Liu Jicong 已提交
317
void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr) {
L
Liu Jicong 已提交
318
  //TODO: do we need serialize this? 
319 320
  //mainly for executor
  return ptr;
L
Liu Jicong 已提交
321 322
}

L
Liu Jicong 已提交
323 324 325
const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandle **ppGHandle) {
  TqGroupHandle *gHandle = *ppGHandle;
  const void* ptr = pHead->content;
326 327 328 329 330 331 332 333
  gHandle->cId = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  gHandle->cgId = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  gHandle->ahandle = NULL;
  gHandle->topicNum = *(int32_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  gHandle->head = NULL;
L
Liu Jicong 已提交
334
  TqListHandle *node = gHandle->head;
335 336
  for(int i = 0; i < gHandle->topicNum; i++) {
    if(gHandle->head == NULL) {
L
Liu Jicong 已提交
337
      if((node = malloc(sizeof(TqListHandle))) == NULL) {
338 339 340 341 342 343 344
        //TODO: error
        return NULL;
      }
      node->next= NULL;
      ptr = tqDeserializeBufHandle(ptr, &node->bufHandle); 
      gHandle->head = node;
    } else {
L
Liu Jicong 已提交
345
      node->next = malloc(sizeof(TqListHandle));
346 347 348 349 350 351 352 353 354 355
      if(node->next == NULL) {
        //TODO: error
        return NULL;
      }
      node->next->next = NULL;
      ptr = tqDeserializeBufHandle(ptr, &node->next->bufHandle);
      node = node->next;
    }
  }
  return ptr;
L
Liu Jicong 已提交
356
}
357

L
Liu Jicong 已提交
358
const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle) {
359 360 361 362 363 364 365 366 367 368 369 370 371
  const void* ptr = pBytes;
  bufHandle->nextConsumeOffset = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  bufHandle->topicId = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  bufHandle->head = *(int32_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  bufHandle->tail = *(int32_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  for(int i = 0; i < TQ_BUFFER_SIZE; i++) {
    ptr = tqDeserializeBufItem(ptr, &bufHandle->buffer[i]);
  }
  return ptr;
L
Liu Jicong 已提交
372 373
}

L
Liu Jicong 已提交
374
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem) {
375 376
  return pBytes;
}
L
Liu Jicong 已提交
377

378
//TODO: make this a macro
L
Liu Jicong 已提交
379
int tqGetgHandleSSize(const TqGroupHandle *gHandle) {
L
Liu Jicong 已提交
380 381
  return sizeof(int64_t) * 2 //cId + cgId
    + sizeof(int32_t)        //topicNum
382
    + gHandle->topicNum * tqBufHandleSSize();
L
Liu Jicong 已提交
383
}
384 385 386

//TODO: make this a macro
int tqBufHandleSSize() {
L
Liu Jicong 已提交
387 388
  return sizeof(int64_t) * 2 // nextConsumeOffset + topicId
    + sizeof(int32_t) * 2    // head + tail
389
    + TQ_BUFFER_SIZE * tqBufItemSSize();
L
Liu Jicong 已提交
390
}
391 392 393 394

int tqBufItemSSize() {
  //TODO: do this need serialization?
  //mainly for executor
L
Liu Jicong 已提交
395 396
  return 0;
}