tq.c 8.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "tqInt.h"
S
Shengliang Guan 已提交
17

L
Liu Jicong 已提交
18 19 20 21 22 23
//static
//read next version data
//
//send to fetch queue
//
//handle management message
L
Liu Jicong 已提交
24
//
L
Liu Jicong 已提交
25
static int tqProtoCheck(TmqMsgHead *pMsg) {
L
Liu Jicong 已提交
26
  return pMsg->protoVer == 0;
L
Liu Jicong 已提交
27 28
}

L
Liu Jicong 已提交
29
static int tqAckOneTopic(TqBufferHandle *bhandle, TmqOneAck *pAck, TqQueryMsg** ppQuery) {
L
Liu Jicong 已提交
30 31 32 33 34 35
  //clean old item and move forward
  int32_t consumeOffset = pAck->consumeOffset;
  int idx = consumeOffset % TQ_BUFFER_SIZE;
  ASSERT(bhandle->buffer[idx].content && bhandle->buffer[idx].executor);
  tfree(bhandle->buffer[idx].content);
  if( 1 /* TODO: need to launch new query */) {
L
Liu Jicong 已提交
36
    TqQueryMsg* pNewQuery = malloc(sizeof(TqQueryMsg));
L
Liu Jicong 已提交
37 38 39 40 41 42 43 44 45 46 47 48
    if(pNewQuery == NULL) {
      //TODO: memory insufficient
      return -1;
    }
    //TODO: lock executor
    pNewQuery->exec->executor = bhandle->buffer[idx].executor;
    //TODO: read from wal and assign to src
    pNewQuery->exec->src = 0;
    pNewQuery->exec->dest = &bhandle->buffer[idx];
    pNewQuery->next = *ppQuery;
    *ppQuery = pNewQuery;
  }
L
Liu Jicong 已提交
49 50 51
  return 0;
}

L
Liu Jicong 已提交
52
static int tqAck(TqGroupHandle* ghandle, TmqAcks* pAcks) {
L
Liu Jicong 已提交
53
  int32_t ackNum = pAcks->ackNum;
L
Liu Jicong 已提交
54
  TmqOneAck *acks = pAcks->acks;
L
Liu Jicong 已提交
55 56
  //double ptr for acks and list
  int i = 0;
L
Liu Jicong 已提交
57
  TqListHandle* node = ghandle->head;
L
Liu Jicong 已提交
58
  int ackCnt = 0;
L
Liu Jicong 已提交
59
  TqQueryMsg *pQuery = NULL;
L
Liu Jicong 已提交
60
  while(i < ackNum && node->next) {
61
    if(acks[i].topicId == node->next->bufHandle.topicId) {
L
Liu Jicong 已提交
62
      ackCnt++;
63 64
      tqAckOneTopic(&node->next->bufHandle, &acks[i], &pQuery);
    } else if(acks[i].topicId < node->next->bufHandle.topicId) {
L
Liu Jicong 已提交
65 66 67
      i++;
    } else {
      node = node->next;
L
Liu Jicong 已提交
68 69
    }
  }
L
Liu Jicong 已提交
70 71 72 73 74
  if(pQuery) {
    //post message
  }
  return ackCnt;
}
L
Liu Jicong 已提交
75

L
Liu Jicong 已提交
76
static int tqCommitTCGroup(TqGroupHandle* handle) {
L
Liu Jicong 已提交
77
  //persist modification into disk
L
Liu Jicong 已提交
78 79 80
  return 0;
}

L
Liu Jicong 已提交
81
int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, TqGroupHandle** handle) {
L
Liu Jicong 已提交
82
  //create in disk
L
Liu Jicong 已提交
83 84 85
  return 0;
}

L
Liu Jicong 已提交
86 87 88 89 90 91
int tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  //look up in disk
  //create
  //open
  return 0;
}
L
Liu Jicong 已提交
92

L
Liu Jicong 已提交
93
int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
L
Liu Jicong 已提交
94 95 96
  return 0;
}

L
Liu Jicong 已提交
97 98
int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
  //delete from disk
L
Liu Jicong 已提交
99 100 101
  return 0;
}

L
Liu Jicong 已提交
102 103 104
static int tqFetch(TqGroupHandle* ghandle, void** msg) {
  TqListHandle* head = ghandle->head;
  TqListHandle* node = head;
L
Liu Jicong 已提交
105 106 107
  int totSize = 0;
  //TODO: make it a macro
  int sizeLimit = 4 * 1024;
L
Liu Jicong 已提交
108
  TmqMsgContent* buffer = malloc(sizeLimit);
L
Liu Jicong 已提交
109 110 111 112 113 114 115 116
  if(buffer == NULL) {
    //TODO:memory insufficient
    return -1;
  }
  //iterate the list to get msgs of all topics
  //until all topic iterated or msgs over sizeLimit
  while(node->next) {
    node = node->next;
L
Liu Jicong 已提交
117
    TqBufferHandle* bufHandle = &node->bufHandle;
L
Liu Jicong 已提交
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
    int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE;
    if(bufHandle->buffer[idx].content != NULL &&
        bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset
        ) {
      totSize += bufHandle->buffer[idx].size;
      if(totSize > sizeLimit) {
        void *ptr = realloc(buffer, totSize);
        if(ptr == NULL) {
          totSize -= bufHandle->buffer[idx].size;
          //TODO:memory insufficient
          //return msgs already copied
          break;
        }
      }
      *((int64_t*)buffer) = bufHandle->topicId;
      buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
      *((int64_t*)buffer) = bufHandle->buffer[idx].size;
      buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
      memcpy(buffer, bufHandle->buffer[idx].content, bufHandle->buffer[idx].size);
      buffer = POINTER_SHIFT(buffer, bufHandle->buffer[idx].size);
      if(totSize > sizeLimit) {
        break;
      }
    }
  }
  return totSize;
L
Liu Jicong 已提交
144 145
}

L
Liu Jicong 已提交
146

L
Liu Jicong 已提交
147
TqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) {
L
Liu Jicong 已提交
148 149 150
  return NULL;
}

L
Liu Jicong 已提交
151
int tqLaunchQuery(TqGroupHandle* ghandle) {
L
Liu Jicong 已提交
152 153 154
  return 0;
}

L
Liu Jicong 已提交
155
int tqSendLaunchQuery(TqGroupHandle* gHandle) {
L
Liu Jicong 已提交
156 157 158
  return 0;
}

L
Liu Jicong 已提交
159
/*int tqMoveOffsetToNext(TqGroupHandle* ghandle) {*/
L
Liu Jicong 已提交
160 161 162
  /*return 0;*/
/*}*/

L
Liu Jicong 已提交
163
int tqPushMsg(STQ* pTq , void* p, int64_t version) {
L
Liu Jicong 已提交
164
  //add reference
L
Liu Jicong 已提交
165
  //judge and launch new query
L
Liu Jicong 已提交
166 167 168
  return 0;
}

L
Liu Jicong 已提交
169
int tqCommit(STQ* pTq) {
L
Liu Jicong 已提交
170 171 172 173
  //do nothing
  return 0;
}

L
Liu Jicong 已提交
174 175
int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) {
  if(!tqProtoCheck((TmqMsgHead *)pMsg)) {
L
Liu Jicong 已提交
176 177 178 179
    //proto version invalid
    return -1;
  }
  int64_t clientId = pMsg->head.clientId;
L
Liu Jicong 已提交
180
  TqGroupHandle *ghandle = tqGetGroupHandle(pTq, clientId);
L
Liu Jicong 已提交
181 182 183 184 185 186 187 188 189 190 191
  if(ghandle == NULL) {
    //client not connect
    return -1;
  }
  if(pMsg->acks.ackNum != 0) {
    if(tqAck(ghandle, &pMsg->acks) != 0) {
      //ack not success
      return -1;
    }
  }

L
Liu Jicong 已提交
192
  TmqConsumeRsp *pRsp = (TmqConsumeRsp*) pMsg;
L
Liu Jicong 已提交
193

194
  if(tqFetch(ghandle, (void**)&pRsp->msgs) <= 0) {
L
Liu Jicong 已提交
195 196 197 198
    //fetch error
    return -1;
  }

L
Liu Jicong 已提交
199
  //judge and launch new query
L
Liu Jicong 已提交
200 201 202 203 204 205 206
  if(tqLaunchQuery(ghandle)) {
    //launch query error
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
207
int tqSerializeGroupHandle(TqGroupHandle *gHandle, void** ppBytes) {
L
Liu Jicong 已提交
208 209 210 211 212 213 214 215 216
  //calculate size
  int sz = tqGetGHandleSSize(gHandle);
  void* ptr = realloc(*ppBytes, sz);
  if(ptr == NULL) {
    free(ppBytes);
    //TODO: memory err
    return -1;
  }
  *ppBytes = ptr;
217
  //do serialization
L
Liu Jicong 已提交
218 219 220 221 222
  *(int64_t*)ptr = gHandle->cId;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  *(int64_t*)ptr = gHandle->cgId;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  *(int32_t*)ptr = gHandle->topicNum;
223
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
L
Liu Jicong 已提交
224
  if(gHandle->topicNum > 0) {
225
    tqSerializeListHandle(gHandle->head, ptr);
L
Liu Jicong 已提交
226 227 228 229
  }
  return 0;
}

L
Liu Jicong 已提交
230 231
void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr) {
  TqListHandle *node = listHandle;
232 233 234
  ASSERT(node != NULL);
  while(node) {
    ptr = tqSerializeBufHandle(&node->bufHandle, ptr);
L
Liu Jicong 已提交
235 236
    node = node->next;
  }
237
  return ptr;
L
Liu Jicong 已提交
238
}
239

L
Liu Jicong 已提交
240
void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr) {
L
Liu Jicong 已提交
241 242 243 244 245 246 247 248 249
  *(int64_t*)ptr = bufHandle->nextConsumeOffset;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  *(int64_t*)ptr = bufHandle->topicId;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  *(int32_t*)ptr = bufHandle->head;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  *(int32_t*)ptr = bufHandle->tail;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  for(int i = 0; i < TQ_BUFFER_SIZE; i++) {
250
    ptr = tqSerializeBufItem(&bufHandle->buffer[i], ptr);
L
Liu Jicong 已提交
251
  }
252
  return ptr;
L
Liu Jicong 已提交
253 254
}

L
Liu Jicong 已提交
255
void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr) {
L
Liu Jicong 已提交
256
  //TODO: do we need serialize this? 
257 258
  //mainly for executor
  return ptr;
L
Liu Jicong 已提交
259 260
}

L
Liu Jicong 已提交
261
const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle *gHandle) {
262 263 264 265 266 267 268 269 270
  const void* ptr = pBytes;
  gHandle->cId = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  gHandle->cgId = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  gHandle->ahandle = NULL;
  gHandle->topicNum = *(int32_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  gHandle->head = NULL;
L
Liu Jicong 已提交
271
  TqListHandle *node = gHandle->head;
272 273
  for(int i = 0; i < gHandle->topicNum; i++) {
    if(gHandle->head == NULL) {
L
Liu Jicong 已提交
274
      if((node = malloc(sizeof(TqListHandle))) == NULL) {
275 276 277 278 279 280 281
        //TODO: error
        return NULL;
      }
      node->next= NULL;
      ptr = tqDeserializeBufHandle(ptr, &node->bufHandle); 
      gHandle->head = node;
    } else {
L
Liu Jicong 已提交
282
      node->next = malloc(sizeof(TqListHandle));
283 284 285 286 287 288 289 290 291 292
      if(node->next == NULL) {
        //TODO: error
        return NULL;
      }
      node->next->next = NULL;
      ptr = tqDeserializeBufHandle(ptr, &node->next->bufHandle);
      node = node->next;
    }
  }
  return ptr;
L
Liu Jicong 已提交
293
}
294

L
Liu Jicong 已提交
295
const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle) {
296 297 298 299 300 301 302 303 304 305 306 307 308
  const void* ptr = pBytes;
  bufHandle->nextConsumeOffset = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  bufHandle->topicId = *(int64_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
  bufHandle->head = *(int32_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  bufHandle->tail = *(int32_t*)ptr;
  ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
  for(int i = 0; i < TQ_BUFFER_SIZE; i++) {
    ptr = tqDeserializeBufItem(ptr, &bufHandle->buffer[i]);
  }
  return ptr;
L
Liu Jicong 已提交
309 310
}

L
Liu Jicong 已提交
311
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem) {
312 313
  return pBytes;
}
L
Liu Jicong 已提交
314

315
//TODO: make this a macro
L
Liu Jicong 已提交
316
int tqGetGHandleSSize(const TqGroupHandle *gHandle) {
317 318 319
  return sizeof(int64_t) * 2
    + sizeof(int32_t)
    + gHandle->topicNum * tqBufHandleSSize();
L
Liu Jicong 已提交
320
}
321 322 323 324 325 326

//TODO: make this a macro
int tqBufHandleSSize() {
  return sizeof(int64_t) * 2
    + sizeof(int32_t) * 2
    + TQ_BUFFER_SIZE * tqBufItemSSize();
L
Liu Jicong 已提交
327
}
328 329 330 331

int tqBufItemSSize() {
  //TODO: do this need serialization?
  //mainly for executor
L
Liu Jicong 已提交
332 333
  return 0;
}